fix: `sync` partially run `helm upgrade` even one of releases failed while loading values.yaml (#281)
Fixes #280
This commit is contained in:
parent
54f1567294
commit
b0cc7ba96c
137
state/state.go
137
state/state.go
|
|
@ -141,52 +141,104 @@ func (e *ReleaseError) Error() string {
|
||||||
return e.underlying.Error()
|
return e.underlying.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type syncResult struct {
|
||||||
|
errors []*ReleaseError
|
||||||
|
}
|
||||||
|
|
||||||
|
type syncPrepareResult struct {
|
||||||
|
release *ReleaseSpec
|
||||||
|
flags []string
|
||||||
|
errors []*ReleaseError
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncReleases wrapper for executing helm upgrade on the releases
|
||||||
|
func (state *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalValues []string, concurrency int) ([]syncPrepareResult, []error) {
|
||||||
|
jobs := make(chan *ReleaseSpec)
|
||||||
|
results := make(chan syncPrepareResult)
|
||||||
|
|
||||||
|
if concurrency < 1 {
|
||||||
|
concurrency = len(state.Releases)
|
||||||
|
}
|
||||||
|
for w := 1; w <= concurrency; w++ {
|
||||||
|
go func() {
|
||||||
|
for release := range jobs {
|
||||||
|
state.applyDefaultsTo(release)
|
||||||
|
flags, flagsErr := state.flagsForUpgrade(helm, release)
|
||||||
|
if flagsErr != nil {
|
||||||
|
results <- syncPrepareResult{errors: []*ReleaseError{&ReleaseError{release, flagsErr}}}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := []*ReleaseError{}
|
||||||
|
for _, value := range additionalValues {
|
||||||
|
valfile, err := filepath.Abs(value)
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, &ReleaseError{release, err})
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := os.Stat(valfile); os.IsNotExist(err) {
|
||||||
|
errs = append(errs, &ReleaseError{release, err})
|
||||||
|
}
|
||||||
|
flags = append(flags, "--values", valfile)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
results <- syncPrepareResult{errors: errs}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
results <- syncPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < len(state.Releases); i++ {
|
||||||
|
jobs <- &state.Releases[i]
|
||||||
|
}
|
||||||
|
close(jobs)
|
||||||
|
}()
|
||||||
|
|
||||||
|
res := []syncPrepareResult{}
|
||||||
|
errs := []error{}
|
||||||
|
for i := 0; i < len(state.Releases); {
|
||||||
|
select {
|
||||||
|
case r := <-results:
|
||||||
|
for _, e := range r.errors {
|
||||||
|
errs = append(errs, e)
|
||||||
|
}
|
||||||
|
res = append(res, r)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, errs
|
||||||
|
}
|
||||||
|
|
||||||
// SyncReleases wrapper for executing helm upgrade on the releases
|
// SyncReleases wrapper for executing helm upgrade on the releases
|
||||||
func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []string, workerLimit int) []error {
|
func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []string, workerLimit int) []error {
|
||||||
errs := []error{}
|
preps, prepErrs := state.prepareSyncReleases(helm, additionalValues, workerLimit)
|
||||||
|
if len(prepErrs) > 0 {
|
||||||
|
return prepErrs
|
||||||
|
}
|
||||||
|
|
||||||
jobQueue := make(chan *ReleaseSpec)
|
jobQueue := make(chan *ReleaseSpec)
|
||||||
doneQueue := make(chan bool)
|
results := make(chan syncResult)
|
||||||
errQueue := make(chan *ReleaseError)
|
|
||||||
|
|
||||||
if workerLimit < 1 {
|
if workerLimit < 1 {
|
||||||
workerLimit = len(state.Releases)
|
workerLimit = len(state.Releases)
|
||||||
}
|
}
|
||||||
for w := 1; w <= workerLimit; w++ {
|
for w := 1; w <= workerLimit; w++ {
|
||||||
go func() {
|
go func() {
|
||||||
for release := range jobQueue {
|
for _, prep := range preps {
|
||||||
state.applyDefaultsTo(release)
|
release := prep.release
|
||||||
flags, flagsErr := state.flagsForUpgrade(helm, release)
|
flags := prep.flags
|
||||||
if flagsErr != nil {
|
|
||||||
errQueue <- &ReleaseError{release, flagsErr}
|
|
||||||
doneQueue <- true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
haveValueErr := false
|
|
||||||
for _, value := range additionalValues {
|
|
||||||
valfile, err := filepath.Abs(value)
|
|
||||||
if err != nil {
|
|
||||||
errQueue <- &ReleaseError{release, err}
|
|
||||||
haveValueErr = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := os.Stat(valfile); os.IsNotExist(err) {
|
|
||||||
errQueue <- &ReleaseError{release, err}
|
|
||||||
haveValueErr = true
|
|
||||||
}
|
|
||||||
flags = append(flags, "--values", valfile)
|
|
||||||
}
|
|
||||||
|
|
||||||
if haveValueErr {
|
|
||||||
doneQueue <- true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
chart := normalizeChart(state.basePath, release.Chart)
|
chart := normalizeChart(state.basePath, release.Chart)
|
||||||
if err := helm.SyncRelease(release.Name, chart, flags...); err != nil {
|
if err := helm.SyncRelease(release.Name, chart, flags...); err != nil {
|
||||||
errQueue <- &ReleaseError{release, err}
|
results <- syncResult{errors: []*ReleaseError{&ReleaseError{release, err}}}
|
||||||
|
} else {
|
||||||
|
results <- syncResult{}
|
||||||
}
|
}
|
||||||
doneQueue <- true
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
@ -198,16 +250,20 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [
|
||||||
close(jobQueue)
|
close(jobQueue)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
errs := []error{}
|
||||||
for i := 0; i < len(state.Releases); {
|
for i := 0; i < len(state.Releases); {
|
||||||
select {
|
select {
|
||||||
case err := <-errQueue:
|
case res := <-results:
|
||||||
errs = append(errs, err)
|
if len(res.errors) > 0 {
|
||||||
case <-doneQueue:
|
for _, e := range res.errors {
|
||||||
i++
|
errs = append(errs, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(errs) != 0 {
|
if len(errs) > 0 {
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -675,6 +731,9 @@ func (state *HelmState) namespaceAndValuesFlags(helm helmexec.Interface, release
|
||||||
}
|
}
|
||||||
|
|
||||||
yamlBytes, err := state.RenderValuesFileToBytes(path)
|
yamlBytes, err := state.RenderValuesFileToBytes(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("faield to render values files \"%s\": %v", typedValue, err)
|
||||||
|
}
|
||||||
|
|
||||||
valfile, err := ioutil.TempFile("", "values")
|
valfile, err := ioutil.TempFile("", "values")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,6 @@ func (r *renderer) RenderToBytes(path string) ([]byte, error) {
|
||||||
if len(splits) > 0 && splits[len(splits)-1] == "gotmpl" {
|
if len(splits) > 0 && splits[len(splits)-1] == "gotmpl" {
|
||||||
yamlBuf, err := r.tmplFileRenderer.RenderTemplateFileToBuffer(path)
|
yamlBuf, err := r.tmplFileRenderer.RenderTemplateFileToBuffer(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
return nil, fmt.Errorf("failed to render [%s], because of %v", path, err)
|
return nil, fmt.Errorf("failed to render [%s], because of %v", path, err)
|
||||||
}
|
}
|
||||||
yamlBytes = yamlBuf.Bytes()
|
yamlBytes = yamlBuf.Bytes()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue