From 17711d5087db728e475b4a416ea0d42d329cdc51 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Fri, 14 Jul 2017 16:00:49 +0200 Subject: [PATCH] graceful shutdown goroutines on operator exit --- pkg/controller/controller.go | 24 ++++++++++++++++++------ pkg/controller/pod.go | 7 ++++++- pkg/controller/postgresql.go | 16 ++++++++++++---- 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b0e0962bd..f346454ac 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -68,10 +68,13 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { c.initController() - go c.runInformers(stopCh) + go c.runPodInformer(stopCh, wg) + go c.runPostgresqlInformer(stopCh, wg) + go c.podEventsDispatcher(stopCh, wg) + go c.clusterResync(stopCh, wg) for i := range c.clusterEventQueues { - go c.processClusterEventsQueue(i) + go c.processClusterEventsQueue(stopCh, i, wg) } c.logger.Info("Started working in background") @@ -140,11 +143,20 @@ func (c *Controller) initController() { } } -func (c *Controller) runInformers(stopCh <-chan struct{}) { - go c.postgresqlInformer.Run(stopCh) +func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + go c.podInformer.Run(stopCh) - go c.podEventsDispatcher(stopCh) - go c.clusterResync(stopCh) + + <-stopCh +} + +func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + + go c.postgresqlInformer.Run(stopCh) <-stopCh } diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 3b31d439f..ba44625b6 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -1,6 +1,8 @@ package controller import ( + "sync" + "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/runtime" @@ -112,7 +114,10 @@ func (c *Controller) podDelete(obj interface{}) { c.podCh <- podEvent } -func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { +func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + c.logger.Debugln("Watching all pod events") for { select { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 14997c37f..d01d53eb9 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -3,6 +3,7 @@ package controller import ( "fmt" "reflect" + "sync" "sync/atomic" "time" @@ -20,7 +21,9 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/constants" ) -func (c *Controller) clusterResync(stopCh <-chan struct{}) { +func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) ticker := time.NewTicker(c.opConfig.ResyncPeriod) for { @@ -203,12 +206,17 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } -func (c *Controller) processClusterEventsQueue(idx int) { - for { +func (c *Controller) processClusterEventsQueue(stopCh <-chan struct{}, idx int, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + + go func() { if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { c.logger.Errorf("error when processing cluster events queue: %v", err) } - } + }() + + <-stopCh } func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) {