diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 8edb6869f..a8a313aeb 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -17,6 +17,7 @@ import ( "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -63,9 +64,9 @@ type Cluster struct { pgDb *sql.DB mu sync.Mutex masterLess bool - podDispatcherRunning bool userSyncStrategy spec.UserSyncer deleteOptions *meta_v1.DeleteOptions + podEventsQueue *cache.FIFO } func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { @@ -73,6 +74,15 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} orphanDependents := true + podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { + e, ok := obj.(spec.PodEvent) + if !ok { + return "", fmt.Errorf("could not cast to PodEvent") + } + + return fmt.Sprintf("%s-%s", e.PodName, e.ResourceVersion), nil + }) + cluster := &Cluster{ Config: cfg, Postgresql: pgSpec, @@ -83,9 +93,9 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), kubeResources: kubeResources, masterLess: false, - podDispatcherRunning: false, userSyncStrategy: users.DefaultUserSyncStrategy{}, deleteOptions: &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, + podEventsQueue: podEventsQueue, } return cluster @@ -143,16 +153,11 @@ func (c *Cluster) initUsers() error { return nil } -func (c *Cluster) Create(stopCh <-chan struct{}) error { +func (c *Cluster) Create() error { c.mu.Lock() defer c.mu.Unlock() var err error - if !c.podDispatcherRunning { - go c.podEventsDispatcher(stopCh) - c.podDispatcherRunning = true - } - defer func() { if err == nil { c.setStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running? @@ -460,7 +465,38 @@ func (c *Cluster) Delete() error { } func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { - c.podEvents <- event + c.podEventsQueue.Add(event) +} + +func (c *Cluster) podEventProcess(obj interface{}) error { + event, ok := obj.(spec.PodEvent) + if !ok { + return fmt.Errorf("could not cast to PodEvent") + } + + c.podSubscribersMu.RLock() + subscriber, ok := c.podSubscribers[event.PodName] + c.podSubscribersMu.RUnlock() + if ok { + subscriber <- event + } + + return nil +} + +func (c *Cluster) Run(stopCh <-chan struct{}) { + go c.processPodEventQueue(stopCh) +} + +func (c *Cluster) processPodEventQueue(stopCh <-chan struct{}) { + for { + select { + case <-stopCh: + return + default: + c.podEventsQueue.Pop(cache.PopProcessFunc(c.podEventProcess)) + } + } } func (c *Cluster) initSystemUsers() { diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 2154ed1c3..16aa60e81 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -147,23 +147,6 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { return nil } -func (c *Cluster) podEventsDispatcher(stopCh <-chan struct{}) { - c.logger.Infof("Watching '%s' cluster", c.ClusterName()) - for { - select { - case event := <-c.podEvents: - c.podSubscribersMu.RLock() - subscriber, ok := c.podSubscribers[event.PodName] - c.podSubscribersMu.RUnlock() - if ok { - go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel? - } - case <-stopCh: - return - } - } -} - func (c *Cluster) recreatePods() error { ls := c.labelsSet() namespace := c.Metadata.Namespace diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index c16c1f315..3ac35a274 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -7,7 +7,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) -func (c *Cluster) Sync(stopCh <-chan struct{}) error { +func (c *Cluster) Sync() error { c.mu.Lock() defer c.mu.Unlock() @@ -16,11 +16,6 @@ func (c *Cluster) Sync(stopCh <-chan struct{}) error { c.logger.Errorf("could not load resources: %v", err) } - if !c.podDispatcherRunning { - go c.podEventsDispatcher(stopCh) - c.podDispatcherRunning = true - } - c.logger.Debugf("Syncing secrets") if err := c.syncSecrets(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index e561a03e5..38d945990 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -25,10 +25,11 @@ func (c *Controller) podAdd(obj interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.PodClusterName(pod), - PodName: util.NameFromMeta(pod.ObjectMeta), - CurPod: pod, - EventType: spec.EventAdd, + ClusterName: c.PodClusterName(pod), + PodName: util.NameFromMeta(pod.ObjectMeta), + CurPod: pod, + EventType: spec.EventAdd, + ResourceVersion: pod.ResourceVersion, } c.podCh <- podEvent @@ -46,11 +47,12 @@ func (c *Controller) podUpdate(prev, cur interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.PodClusterName(curPod), - PodName: util.NameFromMeta(curPod.ObjectMeta), - PrevPod: prevPod, - CurPod: curPod, - EventType: spec.EventUpdate, + ClusterName: c.PodClusterName(curPod), + PodName: util.NameFromMeta(curPod.ObjectMeta), + PrevPod: prevPod, + CurPod: curPod, + EventType: spec.EventUpdate, + ResourceVersion: curPod.ResourceVersion, } c.podCh <- podEvent @@ -63,27 +65,28 @@ func (c *Controller) podDelete(obj interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.PodClusterName(pod), - PodName: util.NameFromMeta(pod.ObjectMeta), - CurPod: pod, - EventType: spec.EventDelete, + ClusterName: c.PodClusterName(pod), + PodName: util.NameFromMeta(pod.ObjectMeta), + CurPod: pod, + EventType: spec.EventDelete, + ResourceVersion: pod.ResourceVersion, } c.podCh <- podEvent } func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { - c.logger.Infof("Watching all pod events") + c.logger.Debugln("Watching all pod events") for { select { case event := <-c.podCh: c.clustersMu.RLock() - subscriber, ok := c.clusters[event.ClusterName] + cluster, ok := c.clusters[event.ClusterName] c.clustersMu.RUnlock() if ok { c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) - go subscriber.ReceivePodEvent(event) + cluster.ReceivePodEvent(event) } case <-stopCh: return diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 047455230..06ecf64b7 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -92,7 +92,6 @@ func (c *Controller) processEvent(obj interface{}) error { c.clustersMu.RLock() cl, clusterFound := c.clusters[clusterName] - stopCh := c.stopChs[clusterName] c.clustersMu.RUnlock() switch event.EventType { @@ -106,13 +105,14 @@ func (c *Controller) processEvent(obj interface{}) error { stopCh := make(chan struct{}) cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl.Run(stopCh) c.clustersMu.Lock() c.clusters[clusterName] = cl c.stopChs[clusterName] = stopCh c.clustersMu.Unlock() - if err := cl.Create(stopCh); err != nil { + if err := cl.Create(); err != nil { cl.Error = fmt.Errorf("could not create cluster: %v", err) logger.Errorf("%v", cl.Error) @@ -133,6 +133,7 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } + cl.Error = nil logger.Infof("Cluster '%s' has been updated", clusterName) case spec.EventDelete: logger.Infof("Deletion of the '%s' cluster started", clusterName) @@ -158,8 +159,9 @@ func (c *Controller) processEvent(obj interface{}) error { // no race condition because a cluster is always processed by single worker if !clusterFound { + stopCh := make(chan struct{}) cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) - stopCh = make(chan struct{}) + cl.Run(stopCh) c.clustersMu.Lock() c.clusters[clusterName] = cl @@ -167,11 +169,12 @@ func (c *Controller) processEvent(obj interface{}) error { c.clustersMu.Unlock() } - if err := cl.Sync(stopCh); err != nil { + if err := cl.Sync(); err != nil { cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) logger.Errorf("%v", cl) return nil } + cl.Error = nil logger.Infof("Cluster '%s' has been synced", clusterName) } diff --git a/pkg/spec/types.go b/pkg/spec/types.go index fdeb36510..5d5837ac6 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -34,11 +34,12 @@ const ( ) type PodEvent struct { - ClusterName NamespacedName - PodName NamespacedName - PrevPod *v1.Pod - CurPod *v1.Pod - EventType EventType + ResourceVersion string + ClusterName NamespacedName + PodName NamespacedName + PrevPod *v1.Pod + CurPod *v1.Pod + EventType EventType } type PgUser struct {