From 2b07b9715e90fdefb0ca43db59c85238baa41951 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 6 Apr 2018 16:05:32 +0200 Subject: [PATCH] Fix race condition. Add concurrency option similiar to DiffReleases --- main.go | 8 ++++- state/state.go | 86 ++++++++++++++++++++++++++++++++++---------------- 2 files changed, 65 insertions(+), 29 deletions(-) diff --git a/main.go b/main.go index 9f2ed1ae..bf2e4bec 100644 --- a/main.go +++ b/main.go @@ -134,6 +134,11 @@ func main() { Name: "sync-repos", Usage: "enable a repo sync prior to diffing", }, + cli.IntFlag{ + Name: "concurrency", + Value: 0, + Usage: "maximum number of concurrent helm processes to run, 0 is unlimited", + }, }, Action: func(c *cli.Context) error { state, helm, err := before(c) @@ -156,8 +161,9 @@ func main() { } values := c.StringSlice("values") + workers := c.Int("concurrency") - errs := state.DiffReleases(helm, values) + errs := state.DiffReleases(helm, values, workers) return clean(state, errs) }, }, diff --git a/state/state.go b/state/state.go index 519ec6bd..ef02d247 100644 --- a/state/state.go +++ b/state/state.go @@ -230,45 +230,75 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [ return nil } -func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []string) []error { - var wg sync.WaitGroup +func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []string, workerLimit int) []error { + var wgRelease sync.WaitGroup + var wgError sync.WaitGroup errs := []error{} + jobQueue := make(chan *ReleaseSpec, len(state.Releases)) + errQueue := make(chan error) - for i := 0; i < len(state.Releases); i++ { - release := &state.Releases[i] - renderedName, err := renderTemplateString(release.Name) - if err != nil { - errs = append(errs, err) - } - wg.Add(1) - go func(wg *sync.WaitGroup, release *ReleaseSpec) { - // Plugin command doesn't support explicit namespace - release.Namespace = "" - flags, flagsErr := flagsForRelease(helm, state.BaseChartPath, release) - if flagsErr != nil { - errs = append(errs, flagsErr) - } + if workerLimit < 1 { + workerLimit = len(state.Releases) + } - for _, value := range additionalValues { - valfile, err := filepath.Abs(value) + wgRelease.Add(len(state.Releases)) + + for w := 1; w <= workerLimit; w++ { + go func() { + for release := range jobQueue { + errs := []error{} + renderedName, err := renderTemplateString(release.Name) + if err != nil { + errs = append(errs, err) + } + // Plugin command doesn't support explicit namespace + release.Namespace = "" + flags, err := flagsForRelease(helm, state.BaseChartPath, release) if err != nil { errs = append(errs, err) } - if _, err := os.Stat(valfile); os.IsNotExist(err) { - errs = append(errs, err) + for _, value := range additionalValues { + valfile, err := filepath.Abs(value) + if err != nil { + errs = append(errs, err) + } + + if _, err := os.Stat(valfile); os.IsNotExist(err) { + errs = append(errs, err) + } + flags = append(flags, "--values", valfile) } - flags = append(flags, "--values", valfile) - } - if len(errs) == 0 { - if err := helm.DiffRelease(renderedName, normalizeChart(state.BaseChartPath, release.Chart), flags...); err != nil { - errs = append(errs, err) + + if len(errs) == 0 { + if err := helm.DiffRelease(renderedName, normalizeChart(state.BaseChartPath, release.Chart), flags...); err != nil { + errs = append(errs, err) + } } + for _, err := range errs { + errQueue <- err + } + wgRelease.Done() } - wg.Done() - }(&wg, release) + }() } - wg.Wait() + wgError.Add(1) + go func() { + for err := range errQueue { + errs = append(errs, err) + } + wgError.Done() + }() + + for i := 0; i < len(state.Releases); i++ { + jobQueue <- &state.Releases[i] + } + + close(jobQueue) + wgRelease.Wait() + + close(errQueue) + wgError.Wait() if len(errs) != 0 { return errs