From b0cc7ba96c1c1e7d79d236d3c251708908596bce Mon Sep 17 00:00:00 2001 From: KUOKA Yusuke Date: Mon, 3 Sep 2018 16:43:57 +0900 Subject: [PATCH] fix: `sync` partially run `helm upgrade` even one of releases failed while loading values.yaml (#281) Fixes #280 --- state/state.go | 137 ++++++++++++++++++++++++++++----------- valuesfile/valuesfile.go | 1 - 2 files changed, 98 insertions(+), 40 deletions(-) diff --git a/state/state.go b/state/state.go index b70eeb05..cc987fa6 100644 --- a/state/state.go +++ b/state/state.go @@ -141,52 +141,104 @@ func (e *ReleaseError) Error() string { return e.underlying.Error() } +type syncResult struct { + errors []*ReleaseError +} + +type syncPrepareResult struct { + release *ReleaseSpec + flags []string + errors []*ReleaseError +} + +// SyncReleases wrapper for executing helm upgrade on the releases +func (state *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalValues []string, concurrency int) ([]syncPrepareResult, []error) { + jobs := make(chan *ReleaseSpec) + results := make(chan syncPrepareResult) + + if concurrency < 1 { + concurrency = len(state.Releases) + } + for w := 1; w <= concurrency; w++ { + go func() { + for release := range jobs { + state.applyDefaultsTo(release) + flags, flagsErr := state.flagsForUpgrade(helm, release) + if flagsErr != nil { + results <- syncPrepareResult{errors: []*ReleaseError{&ReleaseError{release, flagsErr}}} + continue + } + + errs := []*ReleaseError{} + for _, value := range additionalValues { + valfile, err := filepath.Abs(value) + if err != nil { + errs = append(errs, &ReleaseError{release, err}) + } + + if _, err := os.Stat(valfile); os.IsNotExist(err) { + errs = append(errs, &ReleaseError{release, err}) + } + flags = append(flags, "--values", valfile) + } + + if len(errs) > 0 { + results <- syncPrepareResult{errors: errs} + continue + } + + results <- syncPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}} + } + }() + } + + go func() { + for i := 0; i < len(state.Releases); i++ { + jobs <- &state.Releases[i] + } + close(jobs) + }() + + res := []syncPrepareResult{} + errs := []error{} + for i := 0; i < len(state.Releases); { + select { + case r := <-results: + for _, e := range r.errors { + errs = append(errs, e) + } + res = append(res, r) + i++ + } + } + + return res, errs +} + // SyncReleases wrapper for executing helm upgrade on the releases func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []string, workerLimit int) []error { - errs := []error{} + preps, prepErrs := state.prepareSyncReleases(helm, additionalValues, workerLimit) + if len(prepErrs) > 0 { + return prepErrs + } + jobQueue := make(chan *ReleaseSpec) - doneQueue := make(chan bool) - errQueue := make(chan *ReleaseError) + results := make(chan syncResult) if workerLimit < 1 { workerLimit = len(state.Releases) } for w := 1; w <= workerLimit; w++ { go func() { - for release := range jobQueue { - state.applyDefaultsTo(release) - flags, flagsErr := state.flagsForUpgrade(helm, release) - if flagsErr != nil { - errQueue <- &ReleaseError{release, flagsErr} - doneQueue <- true - continue - } - - haveValueErr := false - for _, value := range additionalValues { - valfile, err := filepath.Abs(value) - if err != nil { - errQueue <- &ReleaseError{release, err} - haveValueErr = true - } - - if _, err := os.Stat(valfile); os.IsNotExist(err) { - errQueue <- &ReleaseError{release, err} - haveValueErr = true - } - flags = append(flags, "--values", valfile) - } - - if haveValueErr { - doneQueue <- true - continue - } - + for _, prep := range preps { + release := prep.release + flags := prep.flags chart := normalizeChart(state.basePath, release.Chart) if err := helm.SyncRelease(release.Name, chart, flags...); err != nil { - errQueue <- &ReleaseError{release, err} + results <- syncResult{errors: []*ReleaseError{&ReleaseError{release, err}}} + } else { + results <- syncResult{} } - doneQueue <- true } }() } @@ -198,16 +250,20 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [ close(jobQueue) }() + errs := []error{} for i := 0; i < len(state.Releases); { select { - case err := <-errQueue: - errs = append(errs, err) - case <-doneQueue: - i++ + case res := <-results: + if len(res.errors) > 0 { + for _, e := range res.errors { + errs = append(errs, e) + } + } } + i++ } - if len(errs) != 0 { + if len(errs) > 0 { return errs } @@ -675,6 +731,9 @@ func (state *HelmState) namespaceAndValuesFlags(helm helmexec.Interface, release } yamlBytes, err := state.RenderValuesFileToBytes(path) + if err != nil { + return nil, fmt.Errorf("faield to render values files \"%s\": %v", typedValue, err) + } valfile, err := ioutil.TempFile("", "values") if err != nil { diff --git a/valuesfile/valuesfile.go b/valuesfile/valuesfile.go index bb0673e5..f964cb99 100644 --- a/valuesfile/valuesfile.go +++ b/valuesfile/valuesfile.go @@ -25,7 +25,6 @@ func (r *renderer) RenderToBytes(path string) ([]byte, error) { if len(splits) > 0 && splits[len(splits)-1] == "gotmpl" { yamlBuf, err := r.tmplFileRenderer.RenderTemplateFileToBuffer(path) if err != nil { - return nil, fmt.Errorf("failed to render [%s], because of %v", path, err) } yamlBytes = yamlBuf.Bytes()