From 571f351a8f1a988a0e0a999c26be12e2a63eeaec Mon Sep 17 00:00:00 2001 From: KUOKA Yusuke Date: Wed, 23 Jan 2019 15:46:24 +0900 Subject: [PATCH] feat: configurable concurrency for `helmfile test` (#442) `helmfile test --concurency N` to set a concurrency number. It is automatically ceiled at the number of releases just to reduce wasting computing resources. Also, I've refactored the scatter-gather logic scattered across the code-base. Resolves #433 --- main.go | 8 +- state/state.go | 389 +++++++++++++++----------------------------- state/state_run.go | 88 ++++++++++ state/state_test.go | 5 +- 4 files changed, 234 insertions(+), 256 deletions(-) create mode 100644 state/state_run.go diff --git a/main.go b/main.go index 53a05132..cb987e31 100644 --- a/main.go +++ b/main.go @@ -496,18 +496,24 @@ Do you really want to delete? Value: 300, Usage: "maximum time for tests to run before being considered failed", }, + cli.IntFlag{ + Name: "concurrency", + Value: 0, + Usage: "maximum number of concurrent helm processes to run, 0 is unlimited", + }, }, Action: func(c *cli.Context) error { return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface, _ context) []error { cleanup := c.Bool("cleanup") timeout := c.Int("timeout") + concurrency := c.Int("concurrency") args := args.GetArgs(c.String("args"), state) if len(args) > 0 { helm.SetExtraArgs(args...) } - return state.TestReleases(helm, cleanup, timeout) + return state.TestReleases(helm, cleanup, timeout, concurrency) }) }, }, diff --git a/state/state.go b/state/state.go index 59b7bf5e..d00b3d3f 100644 --- a/state/state.go +++ b/state/state.go @@ -3,6 +3,7 @@ package state import ( "errors" "fmt" + "github.com/roboll/helmfile/helmexec" "io/ioutil" "os" "path" @@ -10,9 +11,6 @@ import ( "sort" "strconv" "strings" - "sync" - - "github.com/roboll/helmfile/helmexec" "regexp" @@ -188,16 +186,19 @@ func (st *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalValu jobs := make(chan *ReleaseSpec, numReleases) results := make(chan syncPrepareResult, numReleases) - if concurrency < 1 { - concurrency = numReleases - } + res := []syncPrepareResult{} + errs := []error{} - // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. - var waitGroup sync.WaitGroup - waitGroup.Add(concurrency) - - for w := 1; w <= concurrency; w++ { - go func() { + st.scatterGather( + concurrency, + numReleases, + func() { + for i := 0; i < numReleases; i++ { + jobs <- &releases[i] + } + close(jobs) + }, + func(_ int) { for release := range jobs { st.applyDefaultsTo(release) @@ -227,29 +228,20 @@ func (st *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalValu results <- syncPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}} } - waitGroup.Done() - }() - } - - for i := 0; i < numReleases; i++ { - jobs <- &releases[i] - } - close(jobs) - - res := []syncPrepareResult{} - errs := []error{} - for i := 0; i < numReleases; { - select { - case r := <-results: - for _, e := range r.errors { - errs = append(errs, e) + }, + func() { + for i := 0; i < numReleases; { + select { + case r := <-results: + for _, e := range r.errors { + errs = append(errs, e) + } + res = append(res, r) + i++ + } } - res = append(res, r) - i++ - } - } - - waitGroup.Wait() + }, + ) return res, errs } @@ -286,21 +278,20 @@ func (st *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []st return prepErrs } + errs := []error{} jobQueue := make(chan *syncPrepareResult, len(preps)) results := make(chan syncResult, len(preps)) - if workerLimit < 1 { - workerLimit = len(preps) - } - - // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. - // Otherwise, cleanup hooks won't run fully. - // See #363 for more context. - var waitGroup sync.WaitGroup - waitGroup.Add(workerLimit) - - for w := 1; w <= workerLimit; w++ { - go func() { + st.scatterGather( + workerLimit, + len(preps), + func() { + for i := 0; i < len(preps); i++ { + jobQueue <- &preps[i] + } + close(jobQueue) + }, + func(_ int) { for prep := range jobQueue { release := prep.release flags := prep.flags @@ -323,29 +314,21 @@ func (st *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []st st.logger.Warnf("warn: %v\n", err) } } - waitGroup.Done() - }() - } - - for i := 0; i < len(preps); i++ { - jobQueue <- &preps[i] - } - close(jobQueue) - - errs := []error{} - for i := 0; i < len(preps); { - select { - case res := <-results: - if len(res.errors) > 0 { - for _, e := range res.errors { - errs = append(errs, e) + }, + func() { + for i := 0; i < len(preps); { + select { + case res := <-results: + if len(res.errors) > 0 { + for _, e := range res.errors { + errs = append(errs, e) + } + } } + i++ } - } - i++ - } - - waitGroup.Wait() + }, + ) if len(errs) > 0 { return errs @@ -355,7 +338,7 @@ func (st *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []st } // downloadCharts will download and untar charts for Lint and Template -func (st *HelmState) downloadCharts(helm helmexec.Interface, dir string, workerLimit int, helmfileCommand string) (map[string]string, []error) { +func (st *HelmState) downloadCharts(helm helmexec.Interface, dir string, concurrency int, helmfileCommand string) (map[string]string, []error) { temp := make(map[string]string, len(st.Releases)) type downloadResults struct { releaseName string @@ -363,17 +346,19 @@ func (st *HelmState) downloadCharts(helm helmexec.Interface, dir string, workerL } errs := []error{} - var wgFetch sync.WaitGroup jobQueue := make(chan *ReleaseSpec, len(st.Releases)) results := make(chan *downloadResults, len(st.Releases)) - wgFetch.Add(len(st.Releases)) - if workerLimit < 1 { - workerLimit = len(st.Releases) - } - - for w := 1; w <= workerLimit; w++ { - go func() { + st.scatterGather( + concurrency, + len(st.Releases), + func() { + for i := 0; i < len(st.Releases); i++ { + jobQueue <- &st.Releases[i] + } + close(jobQueue) + }, + func(_ int) { for release := range jobQueue { chartPath := "" if pathExists(normalizeChart(st.basePath, release.Chart)) { @@ -403,20 +388,14 @@ func (st *HelmState) downloadCharts(helm helmexec.Interface, dir string, workerL results <- &downloadResults{release.Name, chartPath} } - wgFetch.Done() - }() - } - for i := 0; i < len(st.Releases); i++ { - jobQueue <- &st.Releases[i] - } - close(jobQueue) - - for i := 0; i < len(st.Releases); i++ { - downloadRes := <-results - temp[downloadRes.releaseName] = downloadRes.chartPath - } - - wgFetch.Wait() + }, + func() { + for i := 0; i < len(st.Releases); i++ { + downloadRes := <-results + temp[downloadRes.releaseName] = downloadRes.chartPath + } + }, + ) if len(errs) > 0 { return nil, errs @@ -568,16 +547,19 @@ func (st *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalValu jobs := make(chan *ReleaseSpec, numReleases) results := make(chan diffPrepareResult, numReleases) - if concurrency < 1 { - concurrency = numReleases - } + rs := []diffPrepareResult{} + errs := []error{} - // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. - var waitGroup sync.WaitGroup - waitGroup.Add(concurrency) - - for w := 1; w <= concurrency; w++ { - go func() { + st.scatterGather( + concurrency, + numReleases, + func() { + for i := 0; i < numReleases; i++ { + jobs <- &releases[i] + } + close(jobs) + }, + func(_ int) { for release := range jobs { errs := []error{} @@ -618,32 +600,20 @@ func (st *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalValu results <- diffPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}} } } - waitGroup.Done() - }() - } - - for i := 0; i < numReleases; i++ { - jobs <- &releases[i] - } - close(jobs) - - rs := []diffPrepareResult{} - errs := []error{} - for i := 0; i < numReleases; { - select { - case res := <-results: - if res.errors != nil && len(res.errors) > 0 { - for _, e := range res.errors { - errs = append(errs, e) + }, + func() { + for i := 0; i < numReleases; i++ { + res := <-results + if res.errors != nil && len(res.errors) > 0 { + for _, e := range res.errors { + errs = append(errs, e) + } + } else if res.release != nil { + rs = append(rs, res) } - } else if res.release != nil { - rs = append(rs, res) } - } - i++ - } - - waitGroup.Wait() + }, + ) return rs, errs } @@ -659,18 +629,19 @@ func (st *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []st jobQueue := make(chan *diffPrepareResult, len(preps)) results := make(chan diffResult, len(preps)) - if workerLimit < 1 { - workerLimit = len(preps) - } + rs := []*ReleaseSpec{} + errs := []error{} - // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. - // Otherwise, cleanup hooks won't run fully. - // See #363 for more context. - var waitGroup sync.WaitGroup - waitGroup.Add(workerLimit) - - for w := 1; w <= workerLimit; w++ { - go func() { + st.scatterGather( + workerLimit, + len(preps), + func() { + for i := 0; i < len(preps); i++ { + jobQueue <- &preps[i] + } + close(jobQueue) + }, + func(_ int) { for prep := range jobQueue { flags := prep.flags release := prep.release @@ -694,140 +665,50 @@ func (st *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []st } } } - waitGroup.Done() - }() - } - - for i := 0; i < len(preps); i++ { - jobQueue <- &preps[i] - } - close(jobQueue) - - rs := []*ReleaseSpec{} - errs := []error{} - for i := 0; i < len(preps); { - select { - case res := <-results: - if res.err != nil { - errs = append(errs, res.err) - if res.err.Code == 2 { - rs = append(rs, res.err.ReleaseSpec) + }, + func() { + for i := 0; i < len(preps); i++ { + res := <-results + if res.err != nil { + errs = append(errs, res.err) + if res.err.Code == 2 { + rs = append(rs, res.err.ReleaseSpec) + } } } - i++ - } - } - close(results) - - waitGroup.Wait() + }, + ) return rs, errs } func (st *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int) []error { - var errs []error - jobQueue := make(chan ReleaseSpec) - doneQueue := make(chan bool) - errQueue := make(chan error) - - if workerLimit < 1 { - workerLimit = len(st.Releases) - } - - // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. - var waitGroup sync.WaitGroup - waitGroup.Add(workerLimit) - - for w := 1; w <= workerLimit; w++ { - go func() { - for release := range jobQueue { - if err := helm.ReleaseStatus(release.Name); err != nil { - errQueue <- err - } - doneQueue <- true - } - waitGroup.Done() - }() - } - - go func() { - for _, release := range st.Releases { - jobQueue <- release - } - close(jobQueue) - }() - - for i := 0; i < len(st.Releases); { - select { - case err := <-errQueue: - errs = append(errs, err) - case <-doneQueue: - i++ - } - } - - waitGroup.Wait() - - if len(errs) != 0 { - return errs - } - - return nil + return st.scatterGatherReleases(helm, workerLimit, func(release ReleaseSpec) error { + return helm.ReleaseStatus(release.Name) + }) } // DeleteReleases wrapper for executing helm delete on the releases func (st *HelmState) DeleteReleases(helm helmexec.Interface, purge bool) []error { - var wg sync.WaitGroup - errs := []error{} - - for _, release := range st.Releases { - wg.Add(1) - go func(wg *sync.WaitGroup, release ReleaseSpec) { - flags := []string{} - if purge { - flags = append(flags, "--purge") - } - if err := helm.DeleteRelease(release.Name, flags...); err != nil { - errs = append(errs, err) - } - wg.Done() - }(&wg, release) - } - wg.Wait() - - if len(errs) != 0 { - return errs - } - - return nil + return st.scatterGatherReleases(helm, len(st.Releases), func(release ReleaseSpec) error { + flags := []string{} + if purge { + flags = append(flags, "--purge") + } + return helm.DeleteRelease(release.Name, flags...) + }) } // TestReleases wrapper for executing helm test on the releases -func (st *HelmState) TestReleases(helm helmexec.Interface, cleanup bool, timeout int) []error { - var wg sync.WaitGroup - errs := []error{} - - for _, release := range st.Releases { - wg.Add(1) - go func(wg *sync.WaitGroup, release ReleaseSpec) { - flags := []string{} - if cleanup { - flags = append(flags, "--cleanup") - } - flags = append(flags, "--timeout", strconv.Itoa(timeout)) - if err := helm.TestRelease(release.Name, flags...); err != nil { - errs = append(errs, err) - } - wg.Done() - }(&wg, release) - } - wg.Wait() - - if len(errs) != 0 { - return errs - } - - return nil +func (st *HelmState) TestReleases(helm helmexec.Interface, cleanup bool, timeout int, concurrency int) []error { + return st.scatterGatherReleases(helm, concurrency, func(release ReleaseSpec) error { + flags := []string{} + if cleanup { + flags = append(flags, "--cleanup") + } + flags = append(flags, "--timeout", strconv.Itoa(timeout)) + return helm.TestRelease(release.Name, flags...) + }) } // Clean will remove any generated secrets diff --git a/state/state_run.go b/state/state_run.go new file mode 100644 index 00000000..8607ba46 --- /dev/null +++ b/state/state_run.go @@ -0,0 +1,88 @@ +package state + +import ( + "fmt" + "github.com/roboll/helmfile/helmexec" + "sync" +) + +type result struct { + release ReleaseSpec + err error +} + +func (st *HelmState) scatterGather(concurrency int, items int, produceInputs func(), receiveInputsAndProduceIntermediates func(int), aggregateIntermediates func()) { + numReleases := len(st.Releases) + if concurrency < 1 { + concurrency = numReleases + } else if concurrency > numReleases { + concurrency = numReleases + } + + // WaitGroup is required to wait until goroutine per job in job queue cleanly stops. + var waitGroup sync.WaitGroup + waitGroup.Add(concurrency) + + go produceInputs() + + for w := 1; w <= concurrency; w++ { + go func(id int) { + st.logger.Debugf("worker %d/%d started", id, concurrency) + receiveInputsAndProduceIntermediates(id) + st.logger.Debugf("worker %d/%d finished", id, concurrency) + waitGroup.Done() + }(w) + } + + aggregateIntermediates() + + // Wait until all the goroutines to gracefully finish + waitGroup.Wait() +} + +func (st *HelmState) scatterGatherReleases(helm helmexec.Interface, concurrency int, do func(ReleaseSpec) error) []error { + var errs []error + + inputs := st.Releases + inputsSize := len(inputs) + + releases := make(chan ReleaseSpec) + results := make(chan result) + + st.scatterGather( + concurrency, + inputsSize, + func() { + for _, release := range inputs { + releases <- release + } + close(releases) + }, + func(id int) { + for release := range releases { + err := do(release) + st.logger.Debugf("sending result for release: %s\n", release.Name) + results <- result{release: release, err: err} + st.logger.Debugf("sent result for release: %s\n", release.Name) + } + }, + func() { + for i := range inputs { + st.logger.Debugf("receiving result %d", i) + r := <-results + if r.err != nil { + errs = append(errs, fmt.Errorf("release \"%s\" failed: %v", r.release.Name, r.err)) + } else { + st.logger.Debugf("received result for release \"%s\"", r.release.Name) + } + st.logger.Debugf("received result for %d", i) + } + }, + ) + + if len(errs) != 0 { + return errs + } + + return nil +} diff --git a/state/state_test.go b/state/state_test.go index 6102aec7..46cdeb7a 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -738,6 +738,7 @@ func TestHelmState_SyncReleases(t *testing.T) { t.Run(tt.name, func(t *testing.T) { state := &HelmState{ Releases: tt.releases, + logger: logger, } if _ = state.SyncReleases(tt.helm, []string{}, 1); !reflect.DeepEqual(tt.helm.releases, tt.wantReleases) { t.Errorf("HelmState.SyncReleases() for [%s] = %v, want %v", tt.name, tt.helm.releases, tt.wantReleases) @@ -815,6 +816,7 @@ func TestHelmState_ReleaseStatuses(t *testing.T) { i := func(t *testing.T) { state := &HelmState{ Releases: tt.releases, + logger: logger, } errs := state.ReleaseStatuses(tt.helm, 1) if (errs != nil) != tt.wantErr { @@ -874,8 +876,9 @@ func TestHelmState_TestReleasesNoCleanUp(t *testing.T) { i := func(t *testing.T) { state := &HelmState{ Releases: tt.releases, + logger: logger, } - errs := state.TestReleases(tt.helm, tt.cleanup, 1) + errs := state.TestReleases(tt.helm, tt.cleanup, 1, 1) if (errs != nil) != tt.wantErr { t.Errorf("TestReleases() for %s error = %v, wantErr %v", tt.name, errs, tt.wantErr) return