diff --git a/event/bus.go b/event/bus.go index 39f907a8..0228c27e 100644 --- a/event/bus.go +++ b/event/bus.go @@ -90,6 +90,7 @@ func (bus *Bus) Trigger(evt string, context map[string]interface{}) (bool, error bytes, err := bus.Runner.Execute(command, args) bus.Logger.Debugf("hook[%s]: %s\n", name, string(bytes)) + if err != nil { return false, fmt.Errorf("hook[%s]: command `%s` failed: %v", name, command, err) } diff --git a/state/state.go b/state/state.go index ccf23c8b..23cea5f4 100644 --- a/state/state.go +++ b/state/state.go @@ -182,6 +182,11 @@ func (state *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalV if concurrency < 1 { concurrency = numReleases } + + // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. + var waitGroup sync.WaitGroup + waitGroup.Add(concurrency) + for w := 1; w <= concurrency; w++ { go func() { for release := range jobs { @@ -213,6 +218,7 @@ func (state *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalV results <- syncPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}} } + waitGroup.Done() }() } @@ -234,6 +240,8 @@ func (state *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalV } } + waitGroup.Wait() + return res, errs } @@ -275,6 +283,13 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [ if workerLimit < 1 { workerLimit = len(preps) } + + // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. + // Otherwise, cleanup hooks won't run fully. + // See #363 for more context. + var waitGroup sync.WaitGroup + waitGroup.Add(workerLimit) + for w := 1; w <= workerLimit; w++ { go func() { for prep := range jobQueue { @@ -299,6 +314,7 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [ state.logger.Warnf("warn: %v\n", err) } } + waitGroup.Done() }() } @@ -320,6 +336,8 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [ i++ } + waitGroup.Wait() + if len(errs) > 0 { return errs } @@ -370,8 +388,8 @@ func (state *HelmState) downloadCharts(helm helmexec.Interface, dir string, work chartPath = path.Join(chartPath, chartNameWithoutRepository(release.Chart)) } results <- &downloadResults{release.Name, chartPath} - wgFetch.Done() } + wgFetch.Done() }() } for i := 0; i < len(state.Releases); i++ { @@ -540,6 +558,10 @@ func (state *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalV concurrency = numReleases } + // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. + var waitGroup sync.WaitGroup + waitGroup.Add(concurrency) + for w := 1; w <= concurrency; w++ { go func() { for release := range jobs { @@ -582,6 +604,7 @@ func (state *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalV results <- diffPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}} } } + waitGroup.Done() }() } @@ -605,6 +628,9 @@ func (state *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalV } i++ } + + waitGroup.Wait() + return rs, errs } @@ -623,6 +649,12 @@ func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues [ workerLimit = len(preps) } + // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. + // Otherwise, cleanup hooks won't run fully. + // See #363 for more context. + var waitGroup sync.WaitGroup + waitGroup.Add(workerLimit) + for w := 1; w <= workerLimit; w++ { go func() { for prep := range jobQueue { @@ -646,6 +678,7 @@ func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues [ state.logger.Warnf("warn: %v\n", err) } } + waitGroup.Done() }() } @@ -669,6 +702,9 @@ func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues [ } } close(results) + + waitGroup.Wait() + return rs, errs } @@ -681,6 +717,11 @@ func (state *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int if workerLimit < 1 { workerLimit = len(state.Releases) } + + // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. + var waitGroup sync.WaitGroup + waitGroup.Add(workerLimit) + for w := 1; w <= workerLimit; w++ { go func() { for release := range jobQueue { @@ -689,6 +730,7 @@ func (state *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int } doneQueue <- true } + waitGroup.Done() }() } @@ -708,6 +750,8 @@ func (state *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int } } + waitGroup.Wait() + if len(errs) != 0 { return errs }