Merge branch 'feature/simplify-pod-events-processing' into client-go-3
# Conflicts: # pkg/cluster/cluster.go
This commit is contained in:
		
						commit
						289980f1e1
					
				|  | @ -17,6 +17,7 @@ import ( | ||||||
| 	"k8s.io/client-go/pkg/api/v1" | 	"k8s.io/client-go/pkg/api/v1" | ||||||
| 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | ||||||
| 	"k8s.io/client-go/rest" | 	"k8s.io/client-go/rest" | ||||||
|  | 	"k8s.io/client-go/tools/cache" | ||||||
| 
 | 
 | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util" | 	"github.com/zalando-incubator/postgres-operator/pkg/util" | ||||||
|  | @ -63,9 +64,9 @@ type Cluster struct { | ||||||
| 	pgDb                 *sql.DB | 	pgDb                 *sql.DB | ||||||
| 	mu                   sync.Mutex | 	mu                   sync.Mutex | ||||||
| 	masterLess           bool | 	masterLess           bool | ||||||
| 	podDispatcherRunning bool |  | ||||||
| 	userSyncStrategy     spec.UserSyncer | 	userSyncStrategy     spec.UserSyncer | ||||||
| 	deleteOptions        *meta_v1.DeleteOptions | 	deleteOptions        *meta_v1.DeleteOptions | ||||||
|  | 	podEventsQueue   *cache.FIFO | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||||
|  | @ -73,6 +74,15 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||||
| 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} | 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} | ||||||
| 	orphanDependents := true | 	orphanDependents := true | ||||||
| 
 | 
 | ||||||
|  | 	podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { | ||||||
|  | 		e, ok := obj.(spec.PodEvent) | ||||||
|  | 		if !ok { | ||||||
|  | 			return "", fmt.Errorf("could not cast to PodEvent") | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return fmt.Sprintf("%s-%s", e.PodName, e.ResourceVersion), nil | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
| 	cluster := &Cluster{ | 	cluster := &Cluster{ | ||||||
| 		Config:               cfg, | 		Config:               cfg, | ||||||
| 		Postgresql:           pgSpec, | 		Postgresql:           pgSpec, | ||||||
|  | @ -83,9 +93,9 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||||
| 		podSubscribers:       make(map[spec.NamespacedName]chan spec.PodEvent), | 		podSubscribers:       make(map[spec.NamespacedName]chan spec.PodEvent), | ||||||
| 		kubeResources:        kubeResources, | 		kubeResources:        kubeResources, | ||||||
| 		masterLess:           false, | 		masterLess:           false, | ||||||
| 		podDispatcherRunning: false, |  | ||||||
| 		userSyncStrategy:     users.DefaultUserSyncStrategy{}, | 		userSyncStrategy:     users.DefaultUserSyncStrategy{}, | ||||||
| 		deleteOptions:        &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, | 		deleteOptions:        &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, | ||||||
|  | 		podEventsQueue:   podEventsQueue, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return cluster | 	return cluster | ||||||
|  | @ -143,16 +153,11 @@ func (c *Cluster) initUsers() error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) Create(stopCh <-chan struct{}) error { | func (c *Cluster) Create() error { | ||||||
| 	c.mu.Lock() | 	c.mu.Lock() | ||||||
| 	defer c.mu.Unlock() | 	defer c.mu.Unlock() | ||||||
| 	var err error | 	var err error | ||||||
| 
 | 
 | ||||||
| 	if !c.podDispatcherRunning { |  | ||||||
| 		go c.podEventsDispatcher(stopCh) |  | ||||||
| 		c.podDispatcherRunning = true |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			c.setStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running?
 | 			c.setStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running?
 | ||||||
|  | @ -460,7 +465,38 @@ func (c *Cluster) Delete() error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { | func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { | ||||||
| 	c.podEvents <- event | 	c.podEventsQueue.Add(event) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) podEventProcess(obj interface{}) error { | ||||||
|  | 	event, ok := obj.(spec.PodEvent) | ||||||
|  | 	if !ok { | ||||||
|  | 		return fmt.Errorf("could not cast to PodEvent") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	c.podSubscribersMu.RLock() | ||||||
|  | 	subscriber, ok := c.podSubscribers[event.PodName] | ||||||
|  | 	c.podSubscribersMu.RUnlock() | ||||||
|  | 	if ok { | ||||||
|  | 		subscriber <- event | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) Run(stopCh <-chan struct{}) { | ||||||
|  | 	go c.processPodEventQueue(stopCh) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) processPodEventQueue(stopCh <-chan struct{}) { | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-stopCh: | ||||||
|  | 			return | ||||||
|  | 		default: | ||||||
|  | 			c.podEventsQueue.Pop(cache.PopProcessFunc(c.podEventProcess)) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) initSystemUsers() { | func (c *Cluster) initSystemUsers() { | ||||||
|  |  | ||||||
|  | @ -147,23 +147,6 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) podEventsDispatcher(stopCh <-chan struct{}) { |  | ||||||
| 	c.logger.Infof("Watching '%s' cluster", c.ClusterName()) |  | ||||||
| 	for { |  | ||||||
| 		select { |  | ||||||
| 		case event := <-c.podEvents: |  | ||||||
| 			c.podSubscribersMu.RLock() |  | ||||||
| 			subscriber, ok := c.podSubscribers[event.PodName] |  | ||||||
| 			c.podSubscribersMu.RUnlock() |  | ||||||
| 			if ok { |  | ||||||
| 				go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel?
 |  | ||||||
| 			} |  | ||||||
| 		case <-stopCh: |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Cluster) recreatePods() error { | func (c *Cluster) recreatePods() error { | ||||||
| 	ls := c.labelsSet() | 	ls := c.labelsSet() | ||||||
| 	namespace := c.Metadata.Namespace | 	namespace := c.Metadata.Namespace | ||||||
|  |  | ||||||
|  | @ -7,7 +7,7 @@ import ( | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) Sync(stopCh <-chan struct{}) error { | func (c *Cluster) Sync() error { | ||||||
| 	c.mu.Lock() | 	c.mu.Lock() | ||||||
| 	defer c.mu.Unlock() | 	defer c.mu.Unlock() | ||||||
| 
 | 
 | ||||||
|  | @ -16,11 +16,6 @@ func (c *Cluster) Sync(stopCh <-chan struct{}) error { | ||||||
| 		c.logger.Errorf("could not load resources: %v", err) | 		c.logger.Errorf("could not load resources: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if !c.podDispatcherRunning { |  | ||||||
| 		go c.podEventsDispatcher(stopCh) |  | ||||||
| 		c.podDispatcherRunning = true |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	c.logger.Debugf("Syncing secrets") | 	c.logger.Debugf("Syncing secrets") | ||||||
| 	if err := c.syncSecrets(); err != nil { | 	if err := c.syncSecrets(); err != nil { | ||||||
| 		if !k8sutil.ResourceAlreadyExists(err) { | 		if !k8sutil.ResourceAlreadyExists(err) { | ||||||
|  |  | ||||||
|  | @ -25,10 +25,11 @@ func (c *Controller) podAdd(obj interface{}) { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	podEvent := spec.PodEvent{ | 	podEvent := spec.PodEvent{ | ||||||
| 		ClusterName: c.PodClusterName(pod), | 		ClusterName:     c.PodClusterName(pod), | ||||||
| 		PodName:     util.NameFromMeta(pod.ObjectMeta), | 		PodName:         util.NameFromMeta(pod.ObjectMeta), | ||||||
| 		CurPod:      pod, | 		CurPod:          pod, | ||||||
| 		EventType:   spec.EventAdd, | 		EventType:       spec.EventAdd, | ||||||
|  | 		ResourceVersion: pod.ResourceVersion, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.podCh <- podEvent | 	c.podCh <- podEvent | ||||||
|  | @ -46,11 +47,12 @@ func (c *Controller) podUpdate(prev, cur interface{}) { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	podEvent := spec.PodEvent{ | 	podEvent := spec.PodEvent{ | ||||||
| 		ClusterName: c.PodClusterName(curPod), | 		ClusterName:     c.PodClusterName(curPod), | ||||||
| 		PodName:     util.NameFromMeta(curPod.ObjectMeta), | 		PodName:         util.NameFromMeta(curPod.ObjectMeta), | ||||||
| 		PrevPod:     prevPod, | 		PrevPod:         prevPod, | ||||||
| 		CurPod:      curPod, | 		CurPod:          curPod, | ||||||
| 		EventType:   spec.EventUpdate, | 		EventType:       spec.EventUpdate, | ||||||
|  | 		ResourceVersion: curPod.ResourceVersion, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.podCh <- podEvent | 	c.podCh <- podEvent | ||||||
|  | @ -63,27 +65,28 @@ func (c *Controller) podDelete(obj interface{}) { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	podEvent := spec.PodEvent{ | 	podEvent := spec.PodEvent{ | ||||||
| 		ClusterName: c.PodClusterName(pod), | 		ClusterName:     c.PodClusterName(pod), | ||||||
| 		PodName:     util.NameFromMeta(pod.ObjectMeta), | 		PodName:         util.NameFromMeta(pod.ObjectMeta), | ||||||
| 		CurPod:      pod, | 		CurPod:          pod, | ||||||
| 		EventType:   spec.EventDelete, | 		EventType:       spec.EventDelete, | ||||||
|  | 		ResourceVersion: pod.ResourceVersion, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.podCh <- podEvent | 	c.podCh <- podEvent | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { | func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { | ||||||
| 	c.logger.Infof("Watching all pod events") | 	c.logger.Debugln("Watching all pod events") | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case event := <-c.podCh: | 		case event := <-c.podCh: | ||||||
| 			c.clustersMu.RLock() | 			c.clustersMu.RLock() | ||||||
| 			subscriber, ok := c.clusters[event.ClusterName] | 			cluster, ok := c.clusters[event.ClusterName] | ||||||
| 			c.clustersMu.RUnlock() | 			c.clustersMu.RUnlock() | ||||||
| 
 | 
 | ||||||
| 			if ok { | 			if ok { | ||||||
| 				c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) | 				c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) | ||||||
| 				go subscriber.ReceivePodEvent(event) | 				cluster.ReceivePodEvent(event) | ||||||
| 			} | 			} | ||||||
| 		case <-stopCh: | 		case <-stopCh: | ||||||
| 			return | 			return | ||||||
|  |  | ||||||
|  | @ -92,7 +92,6 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 
 | 
 | ||||||
| 	c.clustersMu.RLock() | 	c.clustersMu.RLock() | ||||||
| 	cl, clusterFound := c.clusters[clusterName] | 	cl, clusterFound := c.clusters[clusterName] | ||||||
| 	stopCh := c.stopChs[clusterName] |  | ||||||
| 	c.clustersMu.RUnlock() | 	c.clustersMu.RUnlock() | ||||||
| 
 | 
 | ||||||
| 	switch event.EventType { | 	switch event.EventType { | ||||||
|  | @ -106,13 +105,14 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 
 | 
 | ||||||
| 		stopCh := make(chan struct{}) | 		stopCh := make(chan struct{}) | ||||||
| 		cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) | 		cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) | ||||||
|  | 		cl.Run(stopCh) | ||||||
| 
 | 
 | ||||||
| 		c.clustersMu.Lock() | 		c.clustersMu.Lock() | ||||||
| 		c.clusters[clusterName] = cl | 		c.clusters[clusterName] = cl | ||||||
| 		c.stopChs[clusterName] = stopCh | 		c.stopChs[clusterName] = stopCh | ||||||
| 		c.clustersMu.Unlock() | 		c.clustersMu.Unlock() | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Create(stopCh); err != nil { | 		if err := cl.Create(); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not create cluster: %v", err) | 			cl.Error = fmt.Errorf("could not create cluster: %v", err) | ||||||
| 			logger.Errorf("%v", cl.Error) | 			logger.Errorf("%v", cl.Error) | ||||||
| 
 | 
 | ||||||
|  | @ -133,6 +133,7 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 
 | 
 | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
|  | 		cl.Error = nil | ||||||
| 		logger.Infof("Cluster '%s' has been updated", clusterName) | 		logger.Infof("Cluster '%s' has been updated", clusterName) | ||||||
| 	case spec.EventDelete: | 	case spec.EventDelete: | ||||||
| 		logger.Infof("Deletion of the '%s' cluster started", clusterName) | 		logger.Infof("Deletion of the '%s' cluster started", clusterName) | ||||||
|  | @ -158,8 +159,9 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 
 | 
 | ||||||
| 		// no race condition because a cluster is always processed by single worker
 | 		// no race condition because a cluster is always processed by single worker
 | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
|  | 			stopCh := make(chan struct{}) | ||||||
| 			cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) | 			cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) | ||||||
| 			stopCh = make(chan struct{}) | 			cl.Run(stopCh) | ||||||
| 
 | 
 | ||||||
| 			c.clustersMu.Lock() | 			c.clustersMu.Lock() | ||||||
| 			c.clusters[clusterName] = cl | 			c.clusters[clusterName] = cl | ||||||
|  | @ -167,11 +169,12 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 			c.clustersMu.Unlock() | 			c.clustersMu.Unlock() | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Sync(stopCh); err != nil { | 		if err := cl.Sync(); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) | 			cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) | ||||||
| 			logger.Errorf("%v", cl) | 			logger.Errorf("%v", cl) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
|  | 		cl.Error = nil | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Cluster '%s' has been synced", clusterName) | 		logger.Infof("Cluster '%s' has been synced", clusterName) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -34,11 +34,12 @@ const ( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type PodEvent struct { | type PodEvent struct { | ||||||
| 	ClusterName NamespacedName | 	ResourceVersion string | ||||||
| 	PodName     NamespacedName | 	ClusterName     NamespacedName | ||||||
| 	PrevPod     *v1.Pod | 	PodName         NamespacedName | ||||||
| 	CurPod      *v1.Pod | 	PrevPod         *v1.Pod | ||||||
| 	EventType   EventType | 	CurPod          *v1.Pod | ||||||
|  | 	EventType       EventType | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type PgUser struct { | type PgUser struct { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue