Use queues for the pod events (#30)
This commit is contained in:
		
							parent
							
								
									132c8425e6
								
							
						
					
					
						commit
						009db16c7c
					
				|  | @ -17,6 +17,7 @@ import ( | |||
| 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | ||||
| 	"k8s.io/client-go/pkg/types" | ||||
| 	"k8s.io/client-go/rest" | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
| 
 | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util" | ||||
|  | @ -57,15 +58,14 @@ type Cluster struct { | |||
| 	logger           *logrus.Entry | ||||
| 	pgUsers          map[string]spec.PgUser | ||||
| 	systemUsers      map[string]spec.PgUser | ||||
| 	podEvents            chan spec.PodEvent | ||||
| 	podSubscribers   map[spec.NamespacedName]chan spec.PodEvent | ||||
| 	podSubscribersMu sync.RWMutex | ||||
| 	pgDb             *sql.DB | ||||
| 	mu               sync.Mutex | ||||
| 	masterLess       bool | ||||
| 	podDispatcherRunning bool | ||||
| 	userSyncStrategy spec.UserSyncer | ||||
| 	deleteOptions    *v1.DeleteOptions | ||||
| 	podEventsQueue   *cache.FIFO | ||||
| } | ||||
| 
 | ||||
| func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||
|  | @ -73,19 +73,27 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | |||
| 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} | ||||
| 	orphanDependents := true | ||||
| 
 | ||||
| 	podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { | ||||
| 		e, ok := obj.(spec.PodEvent) | ||||
| 		if !ok { | ||||
| 			return "", fmt.Errorf("could not cast to PodEvent") | ||||
| 		} | ||||
| 
 | ||||
| 		return fmt.Sprintf("%s-%s", e.PodName, e.ResourceVersion), nil | ||||
| 	}) | ||||
| 
 | ||||
| 	cluster := &Cluster{ | ||||
| 		Config:           cfg, | ||||
| 		Postgresql:       pgSpec, | ||||
| 		logger:           lg, | ||||
| 		pgUsers:          make(map[string]spec.PgUser), | ||||
| 		systemUsers:      make(map[string]spec.PgUser), | ||||
| 		podEvents:            make(chan spec.PodEvent), | ||||
| 		podSubscribers:   make(map[spec.NamespacedName]chan spec.PodEvent), | ||||
| 		kubeResources:    kubeResources, | ||||
| 		masterLess:       false, | ||||
| 		podDispatcherRunning: false, | ||||
| 		userSyncStrategy: users.DefaultUserSyncStrategy{}, | ||||
| 		deleteOptions:    &v1.DeleteOptions{OrphanDependents: &orphanDependents}, | ||||
| 		podEventsQueue:   podEventsQueue, | ||||
| 	} | ||||
| 
 | ||||
| 	return cluster | ||||
|  | @ -143,16 +151,11 @@ func (c *Cluster) initUsers() error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) Create(stopCh <-chan struct{}) error { | ||||
| func (c *Cluster) Create() error { | ||||
| 	c.mu.Lock() | ||||
| 	defer c.mu.Unlock() | ||||
| 	var err error | ||||
| 
 | ||||
| 	if !c.podDispatcherRunning { | ||||
| 		go c.podEventsDispatcher(stopCh) | ||||
| 		c.podDispatcherRunning = true | ||||
| 	} | ||||
| 
 | ||||
| 	defer func() { | ||||
| 		if err == nil { | ||||
| 			c.setStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running?
 | ||||
|  | @ -460,7 +463,38 @@ func (c *Cluster) Delete() error { | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { | ||||
| 	c.podEvents <- event | ||||
| 	c.podEventsQueue.Add(event) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) processPodEvent(obj interface{}) error { | ||||
| 	event, ok := obj.(spec.PodEvent) | ||||
| 	if !ok { | ||||
| 		return fmt.Errorf("could not cast to PodEvent") | ||||
| 	} | ||||
| 
 | ||||
| 	c.podSubscribersMu.RLock() | ||||
| 	subscriber, ok := c.podSubscribers[event.PodName] | ||||
| 	c.podSubscribersMu.RUnlock() | ||||
| 	if ok { | ||||
| 		subscriber <- event | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) Run(stopCh <-chan struct{}) { | ||||
| 	go c.processPodEventQueue(stopCh) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) processPodEventQueue(stopCh <-chan struct{}) { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-stopCh: | ||||
| 			return | ||||
| 		default: | ||||
| 			c.podEventsQueue.Pop(cache.PopProcessFunc(c.processPodEvent)) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) initSystemUsers() { | ||||
|  |  | |||
|  | @ -146,23 +146,6 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) podEventsDispatcher(stopCh <-chan struct{}) { | ||||
| 	c.logger.Infof("Watching '%s' cluster", c.ClusterName()) | ||||
| 	for { | ||||
| 		select { | ||||
| 		case event := <-c.podEvents: | ||||
| 			c.podSubscribersMu.RLock() | ||||
| 			subscriber, ok := c.podSubscribers[event.PodName] | ||||
| 			c.podSubscribersMu.RUnlock() | ||||
| 			if ok { | ||||
| 				go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel?
 | ||||
| 			} | ||||
| 		case <-stopCh: | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) recreatePods() error { | ||||
| 	ls := c.labelsSet() | ||||
| 	namespace := c.Metadata.Namespace | ||||
|  |  | |||
|  | @ -7,7 +7,7 @@ import ( | |||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||
| ) | ||||
| 
 | ||||
| func (c *Cluster) Sync(stopCh <-chan struct{}) error { | ||||
| func (c *Cluster) Sync() error { | ||||
| 	c.mu.Lock() | ||||
| 	defer c.mu.Unlock() | ||||
| 
 | ||||
|  | @ -16,11 +16,6 @@ func (c *Cluster) Sync(stopCh <-chan struct{}) error { | |||
| 		c.logger.Errorf("could not load resources: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if !c.podDispatcherRunning { | ||||
| 		go c.podEventsDispatcher(stopCh) | ||||
| 		c.podDispatcherRunning = true | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Debugf("Syncing secrets") | ||||
| 	if err := c.syncSecrets(); err != nil { | ||||
| 		if !k8sutil.ResourceAlreadyExists(err) { | ||||
|  |  | |||
|  | @ -66,6 +66,7 @@ func (c *Controller) podAdd(obj interface{}) { | |||
| 		PodName:         util.NameFromMeta(pod.ObjectMeta), | ||||
| 		CurPod:          pod, | ||||
| 		EventType:       spec.EventAdd, | ||||
| 		ResourceVersion: pod.ResourceVersion, | ||||
| 	} | ||||
| 
 | ||||
| 	c.podCh <- podEvent | ||||
|  | @ -88,6 +89,7 @@ func (c *Controller) podUpdate(prev, cur interface{}) { | |||
| 		PrevPod:         prevPod, | ||||
| 		CurPod:          curPod, | ||||
| 		EventType:       spec.EventUpdate, | ||||
| 		ResourceVersion: curPod.ResourceVersion, | ||||
| 	} | ||||
| 
 | ||||
| 	c.podCh <- podEvent | ||||
|  | @ -104,23 +106,24 @@ func (c *Controller) podDelete(obj interface{}) { | |||
| 		PodName:         util.NameFromMeta(pod.ObjectMeta), | ||||
| 		CurPod:          pod, | ||||
| 		EventType:       spec.EventDelete, | ||||
| 		ResourceVersion: pod.ResourceVersion, | ||||
| 	} | ||||
| 
 | ||||
| 	c.podCh <- podEvent | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { | ||||
| 	c.logger.Infof("Watching all pod events") | ||||
| 	c.logger.Debugln("Watching all pod events") | ||||
| 	for { | ||||
| 		select { | ||||
| 		case event := <-c.podCh: | ||||
| 			c.clustersMu.RLock() | ||||
| 			subscriber, ok := c.clusters[event.ClusterName] | ||||
| 			cluster, ok := c.clusters[event.ClusterName] | ||||
| 			c.clustersMu.RUnlock() | ||||
| 
 | ||||
| 			if ok { | ||||
| 				c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) | ||||
| 				go subscriber.ReceivePodEvent(event) | ||||
| 				cluster.ReceivePodEvent(event) | ||||
| 			} | ||||
| 		case <-stopCh: | ||||
| 			return | ||||
|  |  | |||
|  | @ -91,7 +91,6 @@ func (c *Controller) processEvent(obj interface{}) error { | |||
| 
 | ||||
| 	c.clustersMu.RLock() | ||||
| 	cl, clusterFound := c.clusters[clusterName] | ||||
| 	stopCh := c.stopChs[clusterName] | ||||
| 	c.clustersMu.RUnlock() | ||||
| 
 | ||||
| 	switch event.EventType { | ||||
|  | @ -105,13 +104,14 @@ func (c *Controller) processEvent(obj interface{}) error { | |||
| 
 | ||||
| 		stopCh := make(chan struct{}) | ||||
| 		cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) | ||||
| 		cl.Run(stopCh) | ||||
| 
 | ||||
| 		c.clustersMu.Lock() | ||||
| 		c.clusters[clusterName] = cl | ||||
| 		c.stopChs[clusterName] = stopCh | ||||
| 		c.clustersMu.Unlock() | ||||
| 
 | ||||
| 		if err := cl.Create(stopCh); err != nil { | ||||
| 		if err := cl.Create(); err != nil { | ||||
| 			cl.Error = fmt.Errorf("could not create cluster: %v", err) | ||||
| 			logger.Errorf("%v", cl.Error) | ||||
| 
 | ||||
|  | @ -158,8 +158,9 @@ func (c *Controller) processEvent(obj interface{}) error { | |||
| 
 | ||||
| 		// no race condition because a cluster is always processed by single worker
 | ||||
| 		if !clusterFound { | ||||
| 			stopCh := make(chan struct{}) | ||||
| 			cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) | ||||
| 			stopCh = make(chan struct{}) | ||||
| 			cl.Run(stopCh) | ||||
| 
 | ||||
| 			c.clustersMu.Lock() | ||||
| 			c.clusters[clusterName] = cl | ||||
|  | @ -167,7 +168,7 @@ func (c *Controller) processEvent(obj interface{}) error { | |||
| 			c.clustersMu.Unlock() | ||||
| 		} | ||||
| 
 | ||||
| 		if err := cl.Sync(stopCh); err != nil { | ||||
| 		if err := cl.Sync(); err != nil { | ||||
| 			cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) | ||||
| 			logger.Errorf("%v", cl) | ||||
| 			return nil | ||||
|  |  | |||
|  | @ -34,6 +34,7 @@ const ( | |||
| ) | ||||
| 
 | ||||
| type PodEvent struct { | ||||
| 	ResourceVersion string | ||||
| 	ClusterName     NamespacedName | ||||
| 	PodName         NamespacedName | ||||
| 	PrevPod         *v1.Pod | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue