diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 398ee086e..5ab0e0fef 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -169,25 +169,30 @@ func (c *Controller) initController() { } func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer wg.Done() - wg.Add(1) - c.initController() - go c.runInformers(stopCh) + wg.Add(4) + go c.runPodInformer(stopCh, wg) + go c.runPostgresqlInformer(stopCh, wg) + go c.podEventsDispatcher(stopCh, wg) + go c.clusterResync(stopCh, wg) for i := range c.clusterEventQueues { - go c.processClusterEventsQueue(i) + wg.Add(1) + go c.processClusterEventsQueue(i, stopCh, wg) } c.logger.Info("Started working in background") } -func (c *Controller) runInformers(stopCh <-chan struct{}) { - go c.postgresqlInformer.Run(stopCh) - go c.podInformer.Run(stopCh) - go c.podEventsDispatcher(stopCh) - go c.clusterResync(stopCh) +func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() - <-stopCh + c.podInformer.Run(stopCh) } + +func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + + c.postgresqlInformer.Run(stopCh) +} \ No newline at end of file diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 6575455db..a8308e86e 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -1,6 +1,8 @@ package controller import ( + "sync" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -97,7 +99,9 @@ func (c *Controller) podDelete(obj interface{}) { c.podCh <- podEvent } -func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { +func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + c.logger.Debugln("Watching all pod events") for { select { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 5b3d97080..d5c0773b9 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "reflect" + "sync" "sync/atomic" "time" @@ -19,7 +20,8 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/constants" ) -func (c *Controller) clusterResync(stopCh <-chan struct{}) { +func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() ticker := time.NewTicker(c.opConfig.ResyncPeriod) for { @@ -226,12 +228,17 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } -func (c *Controller) processClusterEventsQueue(idx int) { - for { +func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + + go func() { if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { c.logger.Errorf("error when processing cluster events queue: %v", err) } - } + }() + + <-stopCh + c.clusterEventQueues[idx].Close() } func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) {