From 6183203f4d354e05dd15c27efb8e1a0cf857569f Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 31 Jul 2017 10:30:49 +0200 Subject: [PATCH] fix cluster event queue processing --- pkg/controller/postgresql.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index d5c0773b9..fe595e547 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -55,12 +55,12 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object return &list, err } - for _, pg := range list.Items { + for i, pg := range list.Items { if pg.Error != nil { failedClustersCnt++ continue } - c.queueClusterEvent(nil, &pg, spec.EventSync) + c.queueClusterEvent(nil, &list.Items[i], spec.EventSync) activeClustersCnt++ } if len(list.Items) > 0 { @@ -232,13 +232,19 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, defer wg.Done() go func() { - if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { - c.logger.Errorf("error when processing cluster events queue: %v", err) - } + <-stopCh + c.clusterEventQueues[idx].Close() }() - <-stopCh - c.clusterEventQueues[idx].Close() + for { + if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { + if err == cache.FIFOClosedError { + return + } + + c.logger.Errorf("error when processing cluster events queue: %v", err) + } + } } func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) {