diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index d5c0773b9..fe595e547 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -55,12 +55,12 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object return &list, err } - for _, pg := range list.Items { + for i, pg := range list.Items { if pg.Error != nil { failedClustersCnt++ continue } - c.queueClusterEvent(nil, &pg, spec.EventSync) + c.queueClusterEvent(nil, &list.Items[i], spec.EventSync) activeClustersCnt++ } if len(list.Items) > 0 { @@ -232,13 +232,19 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, defer wg.Done() go func() { - if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { - c.logger.Errorf("error when processing cluster events queue: %v", err) - } + <-stopCh + c.clusterEventQueues[idx].Close() }() - <-stopCh - c.clusterEventQueues[idx].Close() + for { + if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { + if err == cache.FIFOClosedError { + return + } + + c.logger.Errorf("error when processing cluster events queue: %v", err) + } + } } func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) {