diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 64c2a32d4..8dc142e14 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -142,13 +142,9 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam return cl } -func (c *Controller) processEvent(obj interface{}) error { +func (c *Controller) processEvent(event spec.ClusterEvent) { var clusterName spec.NamespacedName - event, ok := obj.(spec.ClusterEvent) - if !ok { - return fmt.Errorf("could not cast to ClusterEvent") - } lg := c.logger.WithField("worker", event.WorkerID) if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { @@ -166,7 +162,7 @@ func (c *Controller) processEvent(obj interface{}) error { case spec.EventAdd: if clusterFound { lg.Debugf("cluster already exists") - return nil + return } lg.Infof("creation of the cluster started") @@ -177,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error { cl.Error = fmt.Errorf("could not create cluster: %v", err) lg.Error(cl.Error) - return nil + return } lg.Infoln("cluster has been created") @@ -186,13 +182,13 @@ func (c *Controller) processEvent(obj interface{}) error { if !clusterFound { lg.Warnln("cluster does not exist") - return nil + return } if err := cl.Update(event.NewSpec); err != nil { cl.Error = fmt.Errorf("could not update cluster: %v", err) lg.Error(cl.Error) - return nil + return } cl.Error = nil lg.Infoln("cluster has been updated") @@ -202,12 +198,12 @@ func (c *Controller) processEvent(obj interface{}) error { lg.Infoln("Deletion of the cluster started") if !clusterFound { lg.Errorf("unknown cluster: %q", clusterName) - return nil + return } if err := cl.Delete(); err != nil { lg.Errorf("could not delete cluster: %v", err) - return nil + return } func() { @@ -238,14 +234,12 @@ func (c *Controller) processEvent(obj interface{}) error { if err := cl.Sync(); err != nil { cl.Error = fmt.Errorf("could not sync cluster: %v", err) lg.Error(cl.Error) - return nil + return } cl.Error = nil lg.Infof("cluster has been synced") } - - return nil } func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) { @@ -257,13 +251,20 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, }() for { - if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { + obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}) error { return nil })) + if err != nil { if err == cache.FIFOClosedError { return } - c.logger.Errorf("error when processing cluster events queue: %v", err) + continue } + event, ok := obj.(spec.ClusterEvent) + if !ok { + c.logger.Errorf("could not cast to ClusterEvent") + } + + c.processEvent(event) } }