Fix graceful shutdown

graceful shutdown of goroutines on operator exit
This commit is contained in:
Murat Kabilov 2017-07-27 12:54:22 +02:00 committed by GitHub
parent 1f8b37f33d
commit 3ad4b127c4
3 changed files with 32 additions and 16 deletions

View File

@ -169,25 +169,30 @@ func (c *Controller) initController() {
} }
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
wg.Add(1)
c.initController() c.initController()
go c.runInformers(stopCh) wg.Add(4)
go c.runPodInformer(stopCh, wg)
go c.runPostgresqlInformer(stopCh, wg)
go c.podEventsDispatcher(stopCh, wg)
go c.clusterResync(stopCh, wg)
for i := range c.clusterEventQueues { for i := range c.clusterEventQueues {
go c.processClusterEventsQueue(i) wg.Add(1)
go c.processClusterEventsQueue(i, stopCh, wg)
} }
c.logger.Info("Started working in background") c.logger.Info("Started working in background")
} }
func (c *Controller) runInformers(stopCh <-chan struct{}) { func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
go c.postgresqlInformer.Run(stopCh) defer wg.Done()
go c.podInformer.Run(stopCh)
go c.podEventsDispatcher(stopCh)
go c.clusterResync(stopCh)
<-stopCh c.podInformer.Run(stopCh)
}
func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
c.postgresqlInformer.Run(stopCh)
} }

View File

@ -1,6 +1,8 @@
package controller package controller
import ( import (
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@ -97,7 +99,9 @@ func (c *Controller) podDelete(obj interface{}) {
c.podCh <- podEvent c.podCh <- podEvent
} }
func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
c.logger.Debugln("Watching all pod events") c.logger.Debugln("Watching all pod events")
for { for {
select { select {

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"reflect" "reflect"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -19,7 +20,8 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
) )
func (c *Controller) clusterResync(stopCh <-chan struct{}) { func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(c.opConfig.ResyncPeriod) ticker := time.NewTicker(c.opConfig.ResyncPeriod)
for { for {
@ -226,12 +228,17 @@ func (c *Controller) processEvent(obj interface{}) error {
return nil return nil
} }
func (c *Controller) processClusterEventsQueue(idx int) { func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
for { defer wg.Done()
go func() {
if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil {
c.logger.Errorf("error when processing cluster events queue: %v", err) c.logger.Errorf("error when processing cluster events queue: %v", err)
} }
} }()
<-stopCh
c.clusterEventQueues[idx].Close()
} }
func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) { func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) {