add waiting group in the main thread
This commit is contained in:
parent
7a2643386c
commit
f326e58456
|
|
@ -63,17 +63,17 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
c.initController()
|
c.initController()
|
||||||
|
|
||||||
|
wg.Add(4)
|
||||||
go c.runPodInformer(stopCh, wg)
|
go c.runPodInformer(stopCh, wg)
|
||||||
go c.runPostgresqlInformer(stopCh, wg)
|
go c.runPostgresqlInformer(stopCh, wg)
|
||||||
go c.podEventsDispatcher(stopCh, wg)
|
go c.podEventsDispatcher(stopCh, wg)
|
||||||
go c.clusterResync(stopCh, wg)
|
go c.clusterResync(stopCh, wg)
|
||||||
|
|
||||||
for i := range c.clusterEventQueues {
|
for i := range c.clusterEventQueues {
|
||||||
|
wg.Add(1)
|
||||||
go c.processClusterEventsQueue(stopCh, i, wg)
|
go c.processClusterEventsQueue(stopCh, i, wg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -145,7 +145,6 @@ func (c *Controller) initController() {
|
||||||
|
|
||||||
func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go c.podInformer.Run(stopCh)
|
go c.podInformer.Run(stopCh)
|
||||||
|
|
||||||
|
|
@ -154,7 +153,6 @@ func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup)
|
||||||
|
|
||||||
func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go c.postgresqlInformer.Run(stopCh)
|
go c.postgresqlInformer.Run(stopCh)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -116,7 +116,6 @@ func (c *Controller) podDelete(obj interface{}) {
|
||||||
|
|
||||||
func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
c.logger.Debugln("Watching all pod events")
|
c.logger.Debugln("Watching all pod events")
|
||||||
for {
|
for {
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import (
|
||||||
|
|
||||||
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
wg.Add(1)
|
|
||||||
ticker := time.NewTicker(c.opConfig.ResyncPeriod)
|
ticker := time.NewTicker(c.opConfig.ResyncPeriod)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
@ -208,7 +207,6 @@ func (c *Controller) processEvent(obj interface{}) error {
|
||||||
|
|
||||||
func (c *Controller) processClusterEventsQueue(stopCh <-chan struct{}, idx int, wg *sync.WaitGroup) {
|
func (c *Controller) processClusterEventsQueue(stopCh <-chan struct{}, idx int, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil {
|
if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue