From effc747081b62d82f04538f7a112ef6946bb0cee Mon Sep 17 00:00:00 2001 From: Thomas O'Donnell Date: Thu, 1 Mar 2018 13:39:23 +0000 Subject: [PATCH] Add option to limit concurrent helm calls (#24) * Add option to throttle concurrent `helm repo update` calls Have added a new flag `--concurrency N` to `helmfile sync charts` that can be used to set a limit on the number of concurrent calls to helm. Implementation details: Switched `SyncCharts` from using a WaitGroup to using a pool of workers and a queue of jobs. To ensure that this is thread safe and an attempt is made to sync each chart at the end. Fixes #23 * Fix formatting and update CI to catch these errors Have fixed the formatting so that `make pristine` now passes. Have also added this to the Circle CI config to catch these errors in the future. --- circle.yml | 1 + main.go | 16 +++++++-- state/state.go | 98 ++++++++++++++++++++++++++++++++------------------ 3 files changed, 79 insertions(+), 36 deletions(-) diff --git a/circle.yml b/circle.yml index 96f5022c..67330564 100644 --- a/circle.yml +++ b/circle.yml @@ -18,6 +18,7 @@ dependencies: test: pre: - cd "$WORK" && make check + - cd "$WORK" && make pristine override: - cd "$WORK" && make test diff --git a/main.go b/main.go index ac6ba583..c4603132 100644 --- a/main.go +++ b/main.go @@ -84,6 +84,11 @@ func main() { Name: "values", Usage: "additional value files to be merged into the command", }, + cli.IntFlag{ + Name: "concurrency", + Value: 0, + Usage: "maximum number of concurrent helm processes to run, 0 is unlimited", + }, }, Action: func(c *cli.Context) error { state, helm, err := before(c) @@ -97,8 +102,9 @@ func main() { } values := c.StringSlice("values") + workers := c.Int("concurrency") - if errs := state.SyncCharts(helm, values); errs != nil && len(errs) > 0 { + if errs := state.SyncCharts(helm, values, workers); errs != nil && len(errs) > 0 { for _, err := range errs { fmt.Printf("err: %s\n", err.Error()) } @@ -164,6 +170,11 @@ func main() { Name: "values", Usage: "additional value files to be merged into the command", }, + cli.IntFlag{ + Name: "concurrency", + Value: 0, + Usage: "maximum number of concurrent helm processes to run, 0 is unlimited", + }, }, Action: func(c *cli.Context) error { state, helm, err := before(c) @@ -179,8 +190,9 @@ func main() { } values := c.StringSlice("values") + workers := c.Int("concurrency") - if errs := state.SyncCharts(helm, values); errs != nil && len(errs) > 0 { + if errs := state.SyncCharts(helm, values, workers); errs != nil && len(errs) > 0 { for _, err := range errs { fmt.Printf("err: %s\n", err.Error()) } diff --git a/state/state.go b/state/state.go index d1c52831..077da75d 100644 --- a/state/state.go +++ b/state/state.go @@ -12,10 +12,10 @@ import ( "github.com/roboll/helmfile/helmexec" + "bytes" yaml "gopkg.in/yaml.v1" "path" "regexp" - "bytes" ) type HelmState struct { @@ -31,9 +31,9 @@ type RepositorySpec struct { } type ChartSpec struct { - Chart string `yaml:"chart"` - Version string `yaml:"version"` - Verify bool `yaml:"verify"` + Chart string `yaml:"chart"` + Version string `yaml:"version"` + Verify bool `yaml:"verify"` Name string `yaml:"name"` Namespace string `yaml:"namespace"` @@ -64,13 +64,11 @@ func ReadFromFile(file string) (*HelmState, error) { return &state, nil } -var /* const */ - stringTemplateFuncMap = template.FuncMap{ - "env": getEnvVar, - } +var stringTemplateFuncMap = template.FuncMap{ + "env": getEnvVar, +} -var /* const */ - stringTemplate = template.New("stringTemplate").Funcs(stringTemplateFuncMap) +var stringTemplate = template.New("stringTemplate").Funcs(stringTemplateFuncMap) func getEnvVar(envVarName string) (string, error) { envVarValue, isSet := os.LookupEnv(envVarName) @@ -124,33 +122,65 @@ func (state *HelmState) SyncRepos(helm helmexec.Interface) []error { return nil } -func (state *HelmState) SyncCharts(helm helmexec.Interface, additonalValues []string) []error { - var wg sync.WaitGroup +func (state *HelmState) SyncCharts(helm helmexec.Interface, additonalValues []string, workerLimit int) []error { errs := []error{} + jobQueue := make(chan ChartSpec) + doneQueue := make(chan bool) + errQueue := make(chan error) - for _, chart := range state.Charts { - wg.Add(1) - go func(wg *sync.WaitGroup, chart ChartSpec) { - flags, flagsErr := flagsForChart(state.BaseChartPath, &chart) - if flagsErr != nil { - errs = append(errs, flagsErr) - } - for _, value := range additonalValues { - valfile, err := filepath.Abs(value) - if err != nil { - errs = append(errs, err) - } - flags = append(flags, "--values", valfile) - } - if len(errs) == 0 { - if err := helm.SyncChart(chart.Name, normalizeChart(state.BaseChartPath, chart.Chart), flags...); err != nil { - errs = append(errs, err) - } - } - wg.Done() - }(&wg, chart) + if workerLimit < 1 { + workerLimit = len(state.Charts) + } + + for w := 1; w <= workerLimit; w++ { + go func() { + for chart := range jobQueue { + + flags, flagsErr := flagsForChart(state.BaseChartPath, &chart) + if flagsErr != nil { + errQueue <- flagsErr + doneQueue <- true + continue + } + + haveValueErr := false + for _, value := range additonalValues { + valfile, err := filepath.Abs(value) + if err != nil { + errQueue <- err + haveValueErr = true + } + flags = append(flags, "--values", valfile) + } + + if haveValueErr { + doneQueue <- true + continue + } + + if err := helm.SyncChart(chart.Name, normalizeChart(state.BaseChartPath, chart.Chart), flags...); err != nil { + errQueue <- err + } + doneQueue <- true + } + }() + } + + go func() { + for _, chart := range state.Charts { + jobQueue <- chart + } + close(jobQueue) + }() + + for i := 0; i < len(state.Charts); { + select { + case err := <-errQueue: + errs = append(errs, err) + case <-doneQueue: + i++ + } } - wg.Wait() if len(errs) != 0 { return errs