From af121b85b510423f2f9a6209cbf75911766820dc Mon Sep 17 00:00:00 2001 From: Karl Stoney Date: Wed, 5 Sep 2018 00:19:57 +0100 Subject: [PATCH] feat: Concurrent chart download in template and lint commands This enables `helmfile lint` and `helmfile template` commands to fetch and untar all the required charts concurrently. The concurrency is configurable via the `--concurrency` flag, that defaults to `0`. Ref #292 --- main.go | 16 ++++++++-- state/state.go | 82 ++++++++++++++++++++++++++++++++------------------ 2 files changed, 67 insertions(+), 31 deletions(-) diff --git a/main.go b/main.go index 67b00bda..ab3306cf 100644 --- a/main.go +++ b/main.go @@ -202,6 +202,11 @@ func main() { Name: "values", Usage: "additional value files to be merged into the command", }, + cli.IntFlag{ + Name: "concurrency", + Value: 0, + Usage: "maximum number of concurrent downloads of release charts", + }, }, Action: func(c *cli.Context) error { return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface) []error { @@ -222,6 +227,11 @@ func main() { Name: "values", Usage: "additional value files to be merged into the command", }, + cli.IntFlag{ + Name: "concurrency", + Value: 0, + Usage: "maximum number of concurrent downloads of release charts", + }, }, Action: func(c *cli.Context) error { return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface) []error { @@ -231,8 +241,9 @@ func main() { values := c.StringSlice("values") args := args.GetArgs(c.String("args"), state) + workers := c.Int("concurrency") - return state.LintReleases(helm, values, args) + return state.LintReleases(helm, values, args, workers) }) }, }, @@ -494,8 +505,9 @@ func executeTemplateCommand(c *cli.Context, state *state.HelmState, helm helmexe args := args.GetArgs(c.String("args"), state) values := c.StringSlice("values") + workers := c.Int("concurrency") - return state.TemplateReleases(helm, values, args) + return state.TemplateReleases(helm, values, args, workers) } func executeDiffCommand(c *cli.Context, st *state.HelmState, helm helmexec.Interface, detailedExitCode, suppressSecrets bool) ([]*state.ReleaseSpec, []error) { diff --git a/state/state.go b/state/state.go index 1b3e8433..6d0c14b5 100644 --- a/state/state.go +++ b/state/state.go @@ -272,39 +272,62 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [ } // downloadCharts will download and untar charts for Lint and Template -func (state *HelmState) downloadCharts(helm helmexec.Interface, dir string) (map[string]string, error) { +func (state *HelmState) downloadCharts(helm helmexec.Interface, dir string, workerLimit int) (map[string]string, []error) { temp := make(map[string]string, len(state.Releases)) + errs := []error{} - for _, release := range state.Releases { - chartPath := "" - if pathExists(normalizeChart(state.basePath, release.Chart)) { - chartPath = normalizeChart(state.basePath, release.Chart) - } else { - fetchFlags := []string{} - if release.Version != "" { - chartPath = path.Join(dir, release.Name, release.Version, release.Chart) - fetchFlags = append(fetchFlags, "--version", release.Version) - } else { - chartPath = path.Join(dir, release.Name, "latest", release.Chart) - } + var wgFetch sync.WaitGroup + jobQueue := make(chan *ReleaseSpec, len(state.Releases)) + wgFetch.Add(len(state.Releases)) - // only fetch chart if it is not already fetched - if _, err := os.Stat(chartPath); os.IsNotExist(err) { - fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath) - if err := helm.Fetch(release.Chart, fetchFlags...); err != nil { - return nil, err - } - } - chartPath = path.Join(chartPath, chartNameWithoutRepository(release.Chart)) - } - temp[release.Name] = chartPath + if workerLimit < 1 { + workerLimit = len(state.Releases) } + for w := 1; w <= workerLimit; w++ { + go func() { + for release := range jobQueue { + chartPath := "" + if pathExists(normalizeChart(state.basePath, release.Chart)) { + chartPath = normalizeChart(state.basePath, release.Chart) + } else { + fetchFlags := []string{} + if release.Version != "" { + chartPath = path.Join(dir, release.Name, release.Version, release.Chart) + fetchFlags = append(fetchFlags, "--version", release.Version) + } else { + chartPath = path.Join(dir, release.Name, "latest", release.Chart) + } + + // only fetch chart if it is not already fetched + if _, err := os.Stat(chartPath); os.IsNotExist(err) { + fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath) + if err := helm.Fetch(release.Chart, fetchFlags...); err != nil { + errs = append(errs, err) + } + } + chartPath = path.Join(chartPath, chartNameWithoutRepository(release.Chart)) + } + temp[release.Name] = chartPath + wgFetch.Done() + } + }() + } + for i := 0; i < len(state.Releases); i++ { + jobQueue <- &state.Releases[i] + } + + close(jobQueue) + wgFetch.Wait() + + if len(errs) > 0 { + return nil, errs + } return temp, nil } // TemplateReleases wrapper for executing helm template on the releases -func (state *HelmState) TemplateReleases(helm helmexec.Interface, additionalValues []string, args []string) []error { +func (state *HelmState) TemplateReleases(helm helmexec.Interface, additionalValues []string, args []string, workerLimit int) []error { errs := []error{} // Create tmp directory and bail immediately if it fails dir, err := ioutil.TempDir("", "") @@ -314,8 +337,9 @@ func (state *HelmState) TemplateReleases(helm helmexec.Interface, additionalValu } defer os.RemoveAll(dir) - temp, err := state.downloadCharts(helm, dir) - if err != nil { + temp, errs := state.downloadCharts(helm, dir, workerLimit) + + if errs != nil { errs = append(errs, err) return errs } @@ -356,7 +380,7 @@ func (state *HelmState) TemplateReleases(helm helmexec.Interface, additionalValu } // LintReleases wrapper for executing helm lint on the releases -func (state *HelmState) LintReleases(helm helmexec.Interface, additionalValues []string, args []string) []error { +func (state *HelmState) LintReleases(helm helmexec.Interface, additionalValues []string, args []string, workerLimit int) []error { errs := []error{} // Create tmp directory and bail immediately if it fails dir, err := ioutil.TempDir("", "") @@ -366,8 +390,8 @@ func (state *HelmState) LintReleases(helm helmexec.Interface, additionalValues [ } defer os.RemoveAll(dir) - temp, err := state.downloadCharts(helm, dir) - if err != nil { + temp, errs := state.downloadCharts(helm, dir, workerLimit) + if errs != nil { errs = append(errs, err) return errs }