Merge pull request #345 from stek29/concurrency-preparedcharts

feat: honor concurrency in withPreparedCharts
This commit is contained in:
yxxhero 2022-09-04 17:51:13 +08:00 committed by GitHub
commit d9ee3afd6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 61 additions and 23 deletions

View File

@ -33,6 +33,7 @@ func NewDepsCmd(globalCfg *config.GlobalImpl) *cobra.Command {
f := cmd.Flags() f := cmd.Flags()
f.StringVar(&depsOptions.Args, "args", "", "pass args to helm exec") f.StringVar(&depsOptions.Args, "args", "", "pass args to helm exec")
f.BoolVar(&depsOptions.SkipRepos, "skip-deps", false, `skip running "helm repo update" and "helm dependency build"`) f.BoolVar(&depsOptions.SkipRepos, "skip-deps", false, `skip running "helm repo update" and "helm dependency build"`)
f.IntVar(&depsOptions.Concurrency, "concurrency", 0, "maximum number of concurrent helm processes to run, 0 is unlimited")
return cmd return cmd
} }

View File

@ -34,7 +34,7 @@ func NewDiffCmd(globalCfg *config.GlobalImpl) *cobra.Command {
f.StringVar(&diffOptions.Args, "args", "", "pass args to helm diff") f.StringVar(&diffOptions.Args, "args", "", "pass args to helm diff")
f.StringArrayVar(&diffOptions.Set, "set", nil, "additional values to be merged into the command") f.StringArrayVar(&diffOptions.Set, "set", nil, "additional values to be merged into the command")
f.StringArrayVar(&diffOptions.Values, "values", nil, "additional value files to be merged into the command") f.StringArrayVar(&diffOptions.Values, "values", nil, "additional value files to be merged into the command")
f.IntVar(&diffOptions.Concurrency, "concurrency", 0, "maximum number of concurrent downloads of release charts") f.IntVar(&diffOptions.Concurrency, "concurrency", 0, "maximum number of concurrent helm processes to run, 0 is unlimited")
f.BoolVar(&diffOptions.Validate, "validate", false, "validate your manifests against the Kubernetes cluster you are currently pointing at. Note that this requires access to a Kubernetes cluster to obtain information necessary for validating, like the diff of available API versions") f.BoolVar(&diffOptions.Validate, "validate", false, "validate your manifests against the Kubernetes cluster you are currently pointing at. Note that this requires access to a Kubernetes cluster to obtain information necessary for validating, like the diff of available API versions")
f.BoolVar(&diffOptions.SkipNeeds, "skip-needs", false, `do not automatically include releases from the target release's "needs" when --selector/-l flag is provided. Does nothing when when --selector/-l flag is not provided. Defaults to true when --include-needs or --include-transitive-needs is not provided`) f.BoolVar(&diffOptions.SkipNeeds, "skip-needs", false, `do not automatically include releases from the target release's "needs" when --selector/-l flag is provided. Does nothing when when --selector/-l flag is not provided. Defaults to true when --include-needs or --include-transitive-needs is not provided`)
f.BoolVar(&diffOptions.IncludeTests, "include-tests", false, "enable the diffing of the helm test hooks") f.BoolVar(&diffOptions.IncludeTests, "include-tests", false, "enable the diffing of the helm test hooks")

View File

@ -31,7 +31,7 @@ func NewLintCmd(globalCfg *config.GlobalImpl) *cobra.Command {
} }
f := cmd.Flags() f := cmd.Flags()
f.IntVar(&lintOptions.Concurrency, "concurrency", 0, "maximum number of concurrent downloads of release charts") f.IntVar(&lintOptions.Concurrency, "concurrency", 0, "maximum number of concurrent helm processes to run, 0 is unlimited")
f.BoolVar(&lintOptions.SkipDeps, "skip-deps", false, `skip running "helm repo update" and "helm dependency build"`) f.BoolVar(&lintOptions.SkipDeps, "skip-deps", false, `skip running "helm repo update" and "helm dependency build"`)
f.StringVar(&lintOptions.Args, "args", "", "pass args to helm exec") f.StringVar(&lintOptions.Args, "args", "", "pass args to helm exec")
f.StringArrayVar(&lintOptions.Set, "set", nil, "additional values to be merged into the command") f.StringArrayVar(&lintOptions.Set, "set", nil, "additional values to be merged into the command")

View File

@ -34,7 +34,7 @@ func NewSyncCmd(globalCfg *config.GlobalImpl) *cobra.Command {
f.StringVar(&syncOptions.Args, "args", "", "pass args to helm sync") f.StringVar(&syncOptions.Args, "args", "", "pass args to helm sync")
f.StringArrayVar(&syncOptions.Set, "set", nil, "additional values to be merged into the command") f.StringArrayVar(&syncOptions.Set, "set", nil, "additional values to be merged into the command")
f.StringArrayVar(&syncOptions.Values, "values", nil, "additional value files to be merged into the command") f.StringArrayVar(&syncOptions.Values, "values", nil, "additional value files to be merged into the command")
f.IntVar(&syncOptions.Concurrency, "concurrency", 0, "maximum number of concurrent downloads of release charts") f.IntVar(&syncOptions.Concurrency, "concurrency", 0, "maximum number of concurrent helm processes to run, 0 is unlimited")
f.BoolVar(&syncOptions.Validate, "validate", false, "validate your manifests against the Kubernetes cluster you are currently pointing at. Note that this requires access to a Kubernetes cluster to obtain information necessary for validating, like the sync of available API versions") f.BoolVar(&syncOptions.Validate, "validate", false, "validate your manifests against the Kubernetes cluster you are currently pointing at. Note that this requires access to a Kubernetes cluster to obtain information necessary for validating, like the sync of available API versions")
f.BoolVar(&syncOptions.SkipNeeds, "skip-needs", false, `do not automatically include releases from the target release's "needs" when --selector/-l flag is provided. Does nothing when when --selector/-l flag is not provided. Defaults to true when --include-needs or --include-transitive-needs is not provided`) f.BoolVar(&syncOptions.SkipNeeds, "skip-needs", false, `do not automatically include releases from the target release's "needs" when --selector/-l flag is provided. Does nothing when when --selector/-l flag is not provided. Defaults to true when --include-needs or --include-transitive-needs is not provided`)
f.BoolVar(&syncOptions.SkipCRDs, "skip-crds", false, "if set, no CRDs will be installed on sync. By default, CRDs are installed if not already present") f.BoolVar(&syncOptions.SkipCRDs, "skip-crds", false, "if set, no CRDs will be installed on sync. By default, CRDs are installed if not already present")

View File

@ -36,7 +36,7 @@ func NewTemplateCmd(globalCfg *config.GlobalImpl) *cobra.Command {
f.StringArrayVar(&templateOptions.Values, "values", nil, "additional value files to be merged into the command") f.StringArrayVar(&templateOptions.Values, "values", nil, "additional value files to be merged into the command")
f.StringVar(&templateOptions.OutputDir, "output-dir", "", "output directory to pass to helm template (helm template --output-dir)") f.StringVar(&templateOptions.OutputDir, "output-dir", "", "output directory to pass to helm template (helm template --output-dir)")
f.StringVar(&templateOptions.OutputDirTemplate, "output-dir-template", "", "go text template for generating the output directory. Default: {{ .OutputDir }}/{{ .State.BaseName }}-{{ .State.AbsPathSHA1 }}-{{ .Release.Name}}") f.StringVar(&templateOptions.OutputDirTemplate, "output-dir-template", "", "go text template for generating the output directory. Default: {{ .OutputDir }}/{{ .State.BaseName }}-{{ .State.AbsPathSHA1 }}-{{ .Release.Name}}")
f.IntVar(&templateOptions.Concurrency, "concurrency", 0, "maximum number of concurrent downloads of release charts") f.IntVar(&templateOptions.Concurrency, "concurrency", 0, "maximum number of concurrent helm processes to run, 0 is unlimited")
f.BoolVar(&templateOptions.Validate, "validate", false, "validate your manifests against the Kubernetes cluster you are currently pointing at. Note that this requires access to a Kubernetes cluster to obtain information necessary for validating, like the template of available API versions") f.BoolVar(&templateOptions.Validate, "validate", false, "validate your manifests against the Kubernetes cluster you are currently pointing at. Note that this requires access to a Kubernetes cluster to obtain information necessary for validating, like the template of available API versions")
f.BoolVar(&templateOptions.IncludeCRDs, "include-crds", false, "include CRDs in the templated output") f.BoolVar(&templateOptions.IncludeCRDs, "include-crds", false, "include CRDs in the templated output")
f.BoolVar(&templateOptions.SkipTests, "skip-tests", false, "skip tests from templated output") f.BoolVar(&templateOptions.SkipTests, "skip-tests", false, "skip tests from templated output")

View File

@ -32,7 +32,7 @@ func NewTestCmd(globalCfg *config.GlobalImpl) *cobra.Command {
testImpl.Cmd = cmd testImpl.Cmd = cmd
f := cmd.Flags() f := cmd.Flags()
f.IntVar(&testOptions.Concurrency, "concurrency", 0, "maximum number of concurrent downloads of release charts") f.IntVar(&testOptions.Concurrency, "concurrency", 0, "maximum number of concurrent helm processes to run, 0 is unlimited")
f.BoolVar(&testOptions.SkipDeps, "skip-deps", false, `skip running "helm repo update" and "helm dependency build"`) f.BoolVar(&testOptions.SkipDeps, "skip-deps", false, `skip running "helm repo update" and "helm dependency build"`)
f.BoolVar(&testOptions.Cleanup, "cleanup", false, "delete test pods upon completion") f.BoolVar(&testOptions.Cleanup, "cleanup", false, "delete test pods upon completion")
f.BoolVar(&testOptions.Logs, "logs", false, "Dump the logs from test pods (this runs after all tests are complete, but before any cleanup)") f.BoolVar(&testOptions.Logs, "logs", false, "Dump the logs from test pods (this runs after all tests are complete, but before any cleanup)")

View File

@ -31,7 +31,7 @@ func NewWriteValuesCmd(globalCfg *config.GlobalImpl) *cobra.Command {
} }
f := cmd.Flags() f := cmd.Flags()
f.IntVar(&writeValuesOptions.Concurrency, "concurrency", 0, "maximum number of concurrent downloads of release charts") f.IntVar(&writeValuesOptions.Concurrency, "concurrency", 0, "maximum number of concurrent helm processes to run, 0 is unlimited")
f.BoolVar(&writeValuesOptions.SkipDeps, "skip-deps", false, `skip running "helm repo update" and "helm dependency build"`) f.BoolVar(&writeValuesOptions.SkipDeps, "skip-deps", false, `skip running "helm repo update" and "helm dependency build"`)
f.StringArrayVar(&writeValuesOptions.Set, "set", nil, "additional values to be merged into the command") f.StringArrayVar(&writeValuesOptions.Set, "set", nil, "additional values to be merged into the command")
f.StringArrayVar(&writeValuesOptions.Values, "values", nil, "additional value files to be merged into the command") f.StringArrayVar(&writeValuesOptions.Values, "values", nil, "additional value files to be merged into the command")

View File

@ -93,6 +93,7 @@ func (a *App) Deps(c DepsConfigProvider) error {
SkipRepos: c.SkipRepos(), SkipRepos: c.SkipRepos(),
SkipDeps: true, SkipDeps: true,
SkipResolve: true, SkipResolve: true,
Concurrency: c.Concurrency(),
}, func() { }, func() {
errs = run.Deps(c) errs = run.Deps(c)
}) })
@ -120,8 +121,9 @@ func (a *App) Repos(c ReposConfigProvider) error {
func (a *App) DeprecatedSyncCharts(c DeprecatedChartsConfigProvider) error { func (a *App) DeprecatedSyncCharts(c DeprecatedChartsConfigProvider) error {
return a.ForEachState(func(run *Run) (_ bool, errs []error) { return a.ForEachState(func(run *Run) (_ bool, errs []error) {
err := run.withPreparedCharts("charts", state.ChartPrepareOptions{ err := run.withPreparedCharts("charts", state.ChartPrepareOptions{
SkipRepos: true, SkipRepos: true,
SkipDeps: true, SkipDeps: true,
Concurrency: 2,
}, func() { }, func() {
errs = run.DeprecatedSyncCharts(c) errs = run.DeprecatedSyncCharts(c)
}) })
@ -155,6 +157,7 @@ func (a *App) Diff(c DiffConfigProvider) error {
SkipDeps: c.SkipDeps(), SkipDeps: c.SkipDeps(),
IncludeCRDs: &includeCRDs, IncludeCRDs: &includeCRDs,
Validate: c.Validate(), Validate: c.Validate(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
msg, matched, affected, errs = a.diff(run, c) msg, matched, affected, errs = a.diff(run, c)
}) })
@ -219,6 +222,7 @@ func (a *App) Template(c TemplateConfigProvider) error {
IncludeCRDs: &includeCRDs, IncludeCRDs: &includeCRDs,
SkipCleanup: c.SkipCleanup(), SkipCleanup: c.SkipCleanup(),
Validate: c.Validate(), Validate: c.Validate(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
ok, errs = a.template(run, c) ok, errs = a.template(run, c)
}) })
@ -240,6 +244,7 @@ func (a *App) WriteValues(c WriteValuesConfigProvider) error {
SkipRepos: c.SkipDeps(), SkipRepos: c.SkipDeps(),
SkipDeps: c.SkipDeps(), SkipDeps: c.SkipDeps(),
SkipCleanup: c.SkipCleanup(), SkipCleanup: c.SkipCleanup(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
ok, errs = a.writeValues(run, c) ok, errs = a.writeValues(run, c)
}) })
@ -290,6 +295,7 @@ func (a *App) Lint(c LintConfigProvider) error {
SkipRepos: c.SkipDeps(), SkipRepos: c.SkipDeps(),
SkipDeps: c.SkipDeps(), SkipDeps: c.SkipDeps(),
SkipCleanup: c.SkipCleanup(), SkipCleanup: c.SkipCleanup(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
ok, lintErrs, errs = a.lint(run, c) ok, lintErrs, errs = a.lint(run, c)
}) })
@ -323,6 +329,7 @@ func (a *App) Fetch(c FetchConfigProvider) error {
SkipRepos: c.SkipDeps(), SkipRepos: c.SkipDeps(),
SkipDeps: c.SkipDeps(), SkipDeps: c.SkipDeps(),
OutputDir: c.OutputDir(), OutputDir: c.OutputDir(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
}) })
@ -346,6 +353,7 @@ func (a *App) Sync(c SyncConfigProvider) error {
IncludeCRDs: &includeCRDs, IncludeCRDs: &includeCRDs,
IncludeTransitiveNeeds: c.IncludeTransitiveNeeds(), IncludeTransitiveNeeds: c.IncludeTransitiveNeeds(),
Validate: c.Validate(), Validate: c.Validate(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
ok, errs = a.sync(run, c) ok, errs = a.sync(run, c)
}) })
@ -378,6 +386,7 @@ func (a *App) Apply(c ApplyConfigProvider) error {
IncludeCRDs: &includeCRDs, IncludeCRDs: &includeCRDs,
SkipCleanup: c.RetainValuesFiles() || c.SkipCleanup(), SkipCleanup: c.RetainValuesFiles() || c.SkipCleanup(),
Validate: c.Validate(), Validate: c.Validate(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
matched, updated, es := a.apply(run, c) matched, updated, es := a.apply(run, c)
@ -411,8 +420,9 @@ func (a *App) Apply(c ApplyConfigProvider) error {
func (a *App) Status(c StatusesConfigProvider) error { func (a *App) Status(c StatusesConfigProvider) error {
return a.ForEachState(func(run *Run) (ok bool, errs []error) { return a.ForEachState(func(run *Run) (ok bool, errs []error) {
err := run.withPreparedCharts("status", state.ChartPrepareOptions{ err := run.withPreparedCharts("status", state.ChartPrepareOptions{
SkipRepos: true, SkipRepos: true,
SkipDeps: true, SkipDeps: true,
Concurrency: c.Concurrency(),
}, func() { }, func() {
ok, errs = a.status(run, c) ok, errs = a.status(run, c)
}) })
@ -428,8 +438,9 @@ func (a *App) Status(c StatusesConfigProvider) error {
func (a *App) Delete(c DeleteConfigProvider) error { func (a *App) Delete(c DeleteConfigProvider) error {
return a.ForEachState(func(run *Run) (ok bool, errs []error) { return a.ForEachState(func(run *Run) (ok bool, errs []error) {
err := run.withPreparedCharts("delete", state.ChartPrepareOptions{ err := run.withPreparedCharts("delete", state.ChartPrepareOptions{
SkipRepos: c.SkipDeps(), SkipRepos: c.SkipDeps(),
SkipDeps: c.SkipDeps(), SkipDeps: c.SkipDeps(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
ok, errs = a.delete(run, c.Purge(), c) ok, errs = a.delete(run, c.Purge(), c)
}) })
@ -445,8 +456,9 @@ func (a *App) Delete(c DeleteConfigProvider) error {
func (a *App) Destroy(c DestroyConfigProvider) error { func (a *App) Destroy(c DestroyConfigProvider) error {
return a.ForEachState(func(run *Run) (ok bool, errs []error) { return a.ForEachState(func(run *Run) (ok bool, errs []error) {
err := run.withPreparedCharts("destroy", state.ChartPrepareOptions{ err := run.withPreparedCharts("destroy", state.ChartPrepareOptions{
SkipRepos: c.SkipDeps(), SkipRepos: c.SkipDeps(),
SkipDeps: c.SkipDeps(), SkipDeps: c.SkipDeps(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
ok, errs = a.delete(run, true, c) ok, errs = a.delete(run, true, c)
}) })
@ -468,8 +480,9 @@ func (a *App) Test(c TestConfigProvider) error {
} }
err := run.withPreparedCharts("test", state.ChartPrepareOptions{ err := run.withPreparedCharts("test", state.ChartPrepareOptions{
SkipRepos: c.SkipDeps(), SkipRepos: c.SkipDeps(),
SkipDeps: c.SkipDeps(), SkipDeps: c.SkipDeps(),
Concurrency: c.Concurrency(),
}, func() { }, func() {
errs = a.test(run, c) errs = a.test(run, c)
}) })
@ -485,8 +498,9 @@ func (a *App) Test(c TestConfigProvider) error {
func (a *App) PrintState(c StateConfigProvider) error { func (a *App) PrintState(c StateConfigProvider) error {
return a.ForEachState(func(run *Run) (_ bool, errs []error) { return a.ForEachState(func(run *Run) (_ bool, errs []error) {
err := run.withPreparedCharts("build", state.ChartPrepareOptions{ err := run.withPreparedCharts("build", state.ChartPrepareOptions{
SkipRepos: true, SkipRepos: true,
SkipDeps: true, SkipDeps: true,
Concurrency: 2,
}, func() { }, func() {
if c.EmbedValues() { if c.EmbedValues() {
for i := range run.state.Releases { for i := range run.state.Releases {
@ -534,8 +548,9 @@ func (a *App) ListReleases(c ListConfigProvider) error {
err := a.ForEachState(func(run *Run) (_ bool, errs []error) { err := a.ForEachState(func(run *Run) (_ bool, errs []error) {
err := run.withPreparedCharts("list", state.ChartPrepareOptions{ err := run.withPreparedCharts("list", state.ChartPrepareOptions{
SkipRepos: true, SkipRepos: true,
SkipDeps: true, SkipDeps: true,
Concurrency: 2,
}, func() { }, func() {
// var releases m // var releases m
for _, r := range run.state.Releases { for _, r := range run.state.Releases {

View File

@ -2470,6 +2470,10 @@ func (d depsConfig) Args() string {
return "" return ""
} }
func (d depsConfig) Concurrency() int {
return 2
}
// Mocking the command-line runner // Mocking the command-line runner
type mockRunner struct { type mockRunner struct {

View File

@ -30,6 +30,8 @@ type DepsConfigProvider interface {
Args() string Args() string
SkipRepos() bool SkipRepos() bool
IncludeTransitiveNeeds() bool IncludeTransitiveNeeds() bool
concurrencyConfig
} }
type ReposConfigProvider interface { type ReposConfigProvider interface {
@ -206,6 +208,8 @@ type WriteValuesConfigProvider interface {
SkipDeps() bool SkipDeps() bool
SkipCleanup() bool SkipCleanup() bool
IncludeTransitiveNeeds() bool IncludeTransitiveNeeds() bool
concurrencyConfig
} }
type StatusesConfigProvider interface { type StatusesConfigProvider interface {

View File

@ -68,7 +68,9 @@ func (r *Run) withPreparedCharts(helmfileCommand string, opts state.ChartPrepare
return err return err
} }
releaseToChart, errs := r.state.PrepareCharts(r.helm, dir, 2, helmfileCommand, opts) concurrency := opts.Concurrency
releaseToChart, errs := r.state.PrepareCharts(r.helm, dir, concurrency, helmfileCommand, opts)
if len(errs) > 0 { if len(errs) > 0 {
return fmt.Errorf("%v", errs) return fmt.Errorf("%v", errs)

View File

@ -6,6 +6,8 @@ type DepsOptions struct {
Args string Args string
// SkipRepos is the skip repos flag // SkipRepos is the skip repos flag
SkipRepos bool SkipRepos bool
// Concurrency is the maximum number of concurrent helm processes to run
Concurrency int
} }
// NewDepsOptions creates a new Apply // NewDepsOptions creates a new Apply
@ -41,3 +43,8 @@ func (d *DepsImpl) SkipRepos() bool {
func (d *DepsImpl) IncludeTransitiveNeeds() bool { func (d *DepsImpl) IncludeTransitiveNeeds() bool {
return false return false
} }
// Concurrency returns the concurrency
func (c *DepsImpl) Concurrency() int {
return c.DepsOptions.Concurrency
}

View File

@ -978,6 +978,7 @@ type ChartPrepareOptions struct {
WaitForJobs bool WaitForJobs bool
OutputDir string OutputDir string
IncludeTransitiveNeeds bool IncludeTransitiveNeeds bool
Concurrency int
} }
type chartPrepareResult struct { type chartPrepareResult struct {

1
test/e2e/template/helmfile/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/*.tgz

View File

@ -106,12 +106,15 @@ func TestHelmfileTemplateWithBuildCommand(t *testing.T) {
hostPort = 5000 hostPort = 5000
} }
execDocker(t, "run", "-d", "-p", fmt.Sprintf("%d:5000", hostPort), "--restart=always", "--name", containerName, "registry:2") execDocker(t, "run", "--rm", "-d", "-p", fmt.Sprintf("%d:5000", hostPort), "--name", containerName, "registry:2")
t.Cleanup(func() { t.Cleanup(func() {
execDocker(t, "stop", containerName) execDocker(t, "stop", containerName)
execDocker(t, "rm", containerName)
}) })
// FIXME: this is a hack to wait for registry to be up and running
// please replace with proper wait for registry
time.Sleep(5 * time.Second)
// We helm-package and helm-push every test chart saved in the ./testdata/charts directory // We helm-package and helm-push every test chart saved in the ./testdata/charts directory
// to the local registry, so that they can be accessed by helmfile and helm invoked while testing. // to the local registry, so that they can be accessed by helmfile and helm invoked while testing.
charts, err := os.ReadDir(chartsDir) charts, err := os.ReadDir(chartsDir)