graceful shutdown goroutines on operator exit
This commit is contained in:
		
							parent
							
								
									a8ed1e25b4
								
							
						
					
					
						commit
						17711d5087
					
				|  | @ -68,10 +68,13 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||||
| 
 | 
 | ||||||
| 	c.initController() | 	c.initController() | ||||||
| 
 | 
 | ||||||
| 	go c.runInformers(stopCh) | 	go c.runPodInformer(stopCh, wg) | ||||||
|  | 	go c.runPostgresqlInformer(stopCh, wg) | ||||||
|  | 	go c.podEventsDispatcher(stopCh, wg) | ||||||
|  | 	go c.clusterResync(stopCh, wg) | ||||||
| 
 | 
 | ||||||
| 	for i := range c.clusterEventQueues { | 	for i := range c.clusterEventQueues { | ||||||
| 		go c.processClusterEventsQueue(i) | 		go c.processClusterEventsQueue(stopCh, i, wg) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Info("Started working in background") | 	c.logger.Info("Started working in background") | ||||||
|  | @ -140,11 +143,20 @@ func (c *Controller) initController() { | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) runInformers(stopCh <-chan struct{}) { | func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||||
| 	go c.postgresqlInformer.Run(stopCh) | 	defer wg.Done() | ||||||
|  | 	wg.Add(1) | ||||||
|  | 
 | ||||||
| 	go c.podInformer.Run(stopCh) | 	go c.podInformer.Run(stopCh) | ||||||
| 	go c.podEventsDispatcher(stopCh) | 
 | ||||||
| 	go c.clusterResync(stopCh) | 	<-stopCh | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||||
|  | 	defer wg.Done() | ||||||
|  | 	wg.Add(1) | ||||||
|  | 
 | ||||||
|  | 	go c.postgresqlInformer.Run(stopCh) | ||||||
| 
 | 
 | ||||||
| 	<-stopCh | 	<-stopCh | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -1,6 +1,8 @@ | ||||||
| package controller | package controller | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"sync" | ||||||
|  | 
 | ||||||
| 	"k8s.io/client-go/pkg/api" | 	"k8s.io/client-go/pkg/api" | ||||||
| 	"k8s.io/client-go/pkg/api/v1" | 	"k8s.io/client-go/pkg/api/v1" | ||||||
| 	"k8s.io/client-go/pkg/runtime" | 	"k8s.io/client-go/pkg/runtime" | ||||||
|  | @ -112,7 +114,10 @@ func (c *Controller) podDelete(obj interface{}) { | ||||||
| 	c.podCh <- podEvent | 	c.podCh <- podEvent | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { | func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||||
|  | 	defer wg.Done() | ||||||
|  | 	wg.Add(1) | ||||||
|  | 
 | ||||||
| 	c.logger.Debugln("Watching all pod events") | 	c.logger.Debugln("Watching all pod events") | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
|  |  | ||||||
|  | @ -3,6 +3,7 @@ package controller | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"reflect" | 	"reflect" | ||||||
|  | 	"sync" | ||||||
| 	"sync/atomic" | 	"sync/atomic" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | @ -20,7 +21,9 @@ import ( | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (c *Controller) clusterResync(stopCh <-chan struct{}) { | func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||||
|  | 	defer wg.Done() | ||||||
|  | 	wg.Add(1) | ||||||
| 	ticker := time.NewTicker(c.opConfig.ResyncPeriod) | 	ticker := time.NewTicker(c.opConfig.ResyncPeriod) | ||||||
| 
 | 
 | ||||||
| 	for { | 	for { | ||||||
|  | @ -203,12 +206,17 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) processClusterEventsQueue(idx int) { | func (c *Controller) processClusterEventsQueue(stopCh <-chan struct{}, idx int, wg *sync.WaitGroup) { | ||||||
| 	for { | 	defer wg.Done() | ||||||
|  | 	wg.Add(1) | ||||||
|  | 
 | ||||||
|  | 	go func() { | ||||||
| 		if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { | 		if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { | ||||||
| 			c.logger.Errorf("error when processing cluster events queue: %v", err) | 			c.logger.Errorf("error when processing cluster events queue: %v", err) | ||||||
| 		} | 		} | ||||||
| 	} | 	}() | ||||||
|  | 
 | ||||||
|  | 	<-stopCh | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) { | func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue