Remove pod dispatcher
This commit is contained in:
		
							parent
							
								
									3ad4b127c4
								
							
						
					
					
						commit
						2fe22ff614
					
				|  | @ -171,10 +171,9 @@ func (c *Controller) initController() { | |||
| func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	c.initController() | ||||
| 
 | ||||
| 	wg.Add(4) | ||||
| 	wg.Add(3) | ||||
| 	go c.runPodInformer(stopCh, wg) | ||||
| 	go c.runPostgresqlInformer(stopCh, wg) | ||||
| 	go c.podEventsDispatcher(stopCh, wg) | ||||
| 	go c.clusterResync(stopCh, wg) | ||||
| 
 | ||||
| 	for i := range c.clusterEventQueues { | ||||
|  | @ -195,4 +194,4 @@ func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.Wait | |||
| 	defer wg.Done() | ||||
| 
 | ||||
| 	c.postgresqlInformer.Run(stopCh) | ||||
| } | ||||
| } | ||||
|  |  | |||
|  | @ -1,8 +1,6 @@ | |||
| package controller | ||||
| 
 | ||||
| import ( | ||||
| 	"sync" | ||||
| 
 | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/watch" | ||||
|  | @ -42,6 +40,16 @@ func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, | |||
| 	return c.KubeClient.Pods(c.opConfig.Namespace).Watch(opts) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event spec.PodEvent) { | ||||
| 	c.clustersMu.RLock() | ||||
| 	cluster, ok := c.clusters[clusterName] | ||||
| 	c.clustersMu.RUnlock() | ||||
| 	if ok { | ||||
| 		c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, clusterName) | ||||
| 		cluster.ReceivePodEvent(event) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) podAdd(obj interface{}) { | ||||
| 	pod, ok := obj.(*v1.Pod) | ||||
| 	if !ok { | ||||
|  | @ -49,14 +57,13 @@ func (c *Controller) podAdd(obj interface{}) { | |||
| 	} | ||||
| 
 | ||||
| 	podEvent := spec.PodEvent{ | ||||
| 		ClusterName:     c.podClusterName(pod), | ||||
| 		PodName:         util.NameFromMeta(pod.ObjectMeta), | ||||
| 		CurPod:          pod, | ||||
| 		EventType:       spec.EventAdd, | ||||
| 		ResourceVersion: pod.ResourceVersion, | ||||
| 	} | ||||
| 
 | ||||
| 	c.podCh <- podEvent | ||||
| 	c.dispatchPodEvent(c.podClusterName(pod), podEvent) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) podUpdate(prev, cur interface{}) { | ||||
|  | @ -71,7 +78,6 @@ func (c *Controller) podUpdate(prev, cur interface{}) { | |||
| 	} | ||||
| 
 | ||||
| 	podEvent := spec.PodEvent{ | ||||
| 		ClusterName:     c.podClusterName(curPod), | ||||
| 		PodName:         util.NameFromMeta(curPod.ObjectMeta), | ||||
| 		PrevPod:         prevPod, | ||||
| 		CurPod:          curPod, | ||||
|  | @ -79,7 +85,7 @@ func (c *Controller) podUpdate(prev, cur interface{}) { | |||
| 		ResourceVersion: curPod.ResourceVersion, | ||||
| 	} | ||||
| 
 | ||||
| 	c.podCh <- podEvent | ||||
| 	c.dispatchPodEvent(c.podClusterName(curPod), podEvent) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) podDelete(obj interface{}) { | ||||
|  | @ -89,33 +95,11 @@ func (c *Controller) podDelete(obj interface{}) { | |||
| 	} | ||||
| 
 | ||||
| 	podEvent := spec.PodEvent{ | ||||
| 		ClusterName:     c.podClusterName(pod), | ||||
| 		PodName:         util.NameFromMeta(pod.ObjectMeta), | ||||
| 		CurPod:          pod, | ||||
| 		EventType:       spec.EventDelete, | ||||
| 		ResourceVersion: pod.ResourceVersion, | ||||
| 	} | ||||
| 
 | ||||
| 	c.podCh <- podEvent | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 
 | ||||
| 	c.logger.Debugln("Watching all pod events") | ||||
| 	for { | ||||
| 		select { | ||||
| 		case event := <-c.podCh: | ||||
| 			c.clustersMu.RLock() | ||||
| 			cluster, ok := c.clusters[event.ClusterName] | ||||
| 			c.clustersMu.RUnlock() | ||||
| 
 | ||||
| 			if ok { | ||||
| 				c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, event.ClusterName) | ||||
| 				cluster.ReceivePodEvent(event) | ||||
| 			} | ||||
| 		case <-stopCh: | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	c.dispatchPodEvent(c.podClusterName(pod), podEvent) | ||||
| } | ||||
|  |  | |||
|  | @ -43,7 +43,6 @@ const ( | |||
| // PodEvent describes the event for a single Pod
 | ||||
| type PodEvent struct { | ||||
| 	ResourceVersion string | ||||
| 	ClusterName     NamespacedName | ||||
| 	PodName         NamespacedName | ||||
| 	PrevPod         *v1.Pod | ||||
| 	CurPod          *v1.Pod | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue