fix cluster event queue processing

This commit is contained in:
Murat Kabilov 2017-07-31 10:30:49 +02:00 committed by GitHub
parent 2fe22ff614
commit 6183203f4d
1 changed files with 13 additions and 7 deletions

View File

@ -55,12 +55,12 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object
return &list, err return &list, err
} }
for _, pg := range list.Items { for i, pg := range list.Items {
if pg.Error != nil { if pg.Error != nil {
failedClustersCnt++ failedClustersCnt++
continue continue
} }
c.queueClusterEvent(nil, &pg, spec.EventSync) c.queueClusterEvent(nil, &list.Items[i], spec.EventSync)
activeClustersCnt++ activeClustersCnt++
} }
if len(list.Items) > 0 { if len(list.Items) > 0 {
@ -232,13 +232,19 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
defer wg.Done() defer wg.Done()
go func() { go func() {
if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { <-stopCh
c.logger.Errorf("error when processing cluster events queue: %v", err) c.clusterEventQueues[idx].Close()
}
}() }()
<-stopCh for {
c.clusterEventQueues[idx].Close() if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil {
if err == cache.FIFOClosedError {
return
}
c.logger.Errorf("error when processing cluster events queue: %v", err)
}
}
} }
func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) { func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) {