diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f444dd219..8847bad2f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -175,10 +175,9 @@ func (c *Controller) initController() { func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { c.initController() - wg.Add(5) + wg.Add(4) go c.runPodInformer(stopCh, wg) go c.runPostgresqlInformer(stopCh, wg) - go c.podEventsDispatcher(stopCh, wg) go c.clusterResync(stopCh, wg) go c.apiserver.Run(stopCh, wg) @@ -200,4 +199,4 @@ func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.Wait defer wg.Done() c.postgresqlInformer.Run(stopCh) -} \ No newline at end of file +} diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index a8308e86e..c24be3241 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -1,8 +1,6 @@ package controller import ( - "sync" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -42,6 +40,16 @@ func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, return c.KubeClient.Pods(c.opConfig.Namespace).Watch(opts) } +func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event spec.PodEvent) { + c.clustersMu.RLock() + cluster, ok := c.clusters[clusterName] + c.clustersMu.RUnlock() + if ok { + c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, clusterName) + cluster.ReceivePodEvent(event) + } +} + func (c *Controller) podAdd(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { @@ -49,14 +57,13 @@ func (c *Controller) podAdd(obj interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.podClusterName(pod), PodName: util.NameFromMeta(pod.ObjectMeta), CurPod: pod, EventType: spec.EventAdd, ResourceVersion: pod.ResourceVersion, } - c.podCh <- podEvent + c.dispatchPodEvent(c.podClusterName(pod), podEvent) } func (c *Controller) podUpdate(prev, cur interface{}) { @@ -71,7 +78,6 @@ func (c *Controller) podUpdate(prev, cur interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.podClusterName(curPod), PodName: util.NameFromMeta(curPod.ObjectMeta), PrevPod: prevPod, CurPod: curPod, @@ -79,7 +85,7 @@ func (c *Controller) podUpdate(prev, cur interface{}) { ResourceVersion: curPod.ResourceVersion, } - c.podCh <- podEvent + c.dispatchPodEvent(c.podClusterName(curPod), podEvent) } func (c *Controller) podDelete(obj interface{}) { @@ -89,33 +95,11 @@ func (c *Controller) podDelete(obj interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.podClusterName(pod), PodName: util.NameFromMeta(pod.ObjectMeta), CurPod: pod, EventType: spec.EventDelete, ResourceVersion: pod.ResourceVersion, } - c.podCh <- podEvent -} - -func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer wg.Done() - - c.logger.Debugln("Watching all pod events") - for { - select { - case event := <-c.podCh: - c.clustersMu.RLock() - cluster, ok := c.clusters[event.ClusterName] - c.clustersMu.RUnlock() - - if ok { - c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, event.ClusterName) - cluster.ReceivePodEvent(event) - } - case <-stopCh: - return - } - } + c.dispatchPodEvent(c.podClusterName(pod), podEvent) } diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 1a43cde05..aefb0350d 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -43,7 +43,6 @@ const ( // PodEvent describes the event for a single Pod type PodEvent struct { ResourceVersion string - ClusterName NamespacedName PodName NamespacedName PrevPod *v1.Pod CurPod *v1.Pod