feat: `apply` run `sync` against only affected releases (#291)

Enhance the `diff` functionality to be able to return affected releases that has any changes,
so that the succeeding `sync` can be run against only the affected releases.

This provides us extra idempotency.

Resolves #277
This commit is contained in:
KUOKA Yusuke 2018-09-04 21:48:43 +09:00 committed by GitHub
parent 60843cc224
commit f1ac50bf44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 156 additions and 61 deletions

70
main.go
View File

@ -21,6 +21,7 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"strings"
) )
const ( const (
@ -183,7 +184,8 @@ func main() {
}, },
Action: func(c *cli.Context) error { Action: func(c *cli.Context) error {
return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface) []error { return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface) []error {
return executeDiffCommand(c, state, helm, c.Bool("detailed-exitcode"), c.Bool("suppress-secrets")) _, errs := executeDiffCommand(c, state, helm, c.Bool("detailed-exitcode"), c.Bool("suppress-secrets"))
return errs
}) })
}, },
}, },
@ -297,40 +299,56 @@ func main() {
}, },
}, },
Action: func(c *cli.Context) error { Action: func(c *cli.Context) error {
return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface) []error { return findAndIterateOverDesiredStatesUsingFlags(c, func(st *state.HelmState, helm helmexec.Interface) []error {
if !c.Bool("skip-repo-update") { if !c.Bool("skip-repo-update") {
if errs := state.SyncRepos(helm); errs != nil && len(errs) > 0 { if errs := st.SyncRepos(helm); errs != nil && len(errs) > 0 {
return errs return errs
} }
} }
if errs := state.UpdateDeps(helm); errs != nil && len(errs) > 0 { if errs := st.UpdateDeps(helm); errs != nil && len(errs) > 0 {
return errs return errs
} }
errs := executeDiffCommand(c, state, helm, true, c.Bool("suppress-secrets")) releases, errs := executeDiffCommand(c, st, helm, true, c.Bool("suppress-secrets"))
noError := true
for _, e := range errs {
switch err := e.(type) {
case *state.DiffError:
noError = noError && err.Code == 2
default:
noError = false
}
}
// sync only when there are changes // sync only when there are changes
if len(errs) > 0 { if noError {
allErrsIndicateChanges := true if len(releases) == 0 {
for _, err := range errs { // TODO better way to get the logger
switch e := err.(type) { logger := c.App.Metadata["logger"].(*zap.SugaredLogger)
case *exec.ExitError: logger.Infof("")
status := e.Sys().(syscall.WaitStatus) logger.Infof("No affected releases")
// `helm diff --detailed-exitcode` returns 2 when there are changes } else {
allErrsIndicateChanges = allErrsIndicateChanges && status.ExitStatus() == 2 names := make([]string, len(releases))
default: for i, r := range releases {
allErrsIndicateChanges = false names[i] = fmt.Sprintf(" %s (%s)", r.Name, r.Chart)
}
} }
msg := `Do you really want to apply? msg := fmt.Sprintf(`Affected releases are:
%s
Do you really want to apply?
Helmfile will apply all your changes, as shown above. Helmfile will apply all your changes, as shown above.
` `, strings.Join(names, "\n"))
if allErrsIndicateChanges {
autoApprove := c.Bool("auto-approve") autoApprove := c.Bool("auto-approve")
if autoApprove || !autoApprove && askForConfirmation(msg) { if autoApprove || !autoApprove && askForConfirmation(msg) {
return executeSyncCommand(c, state, helm) rs := make([]state.ReleaseSpec, len(releases))
for i, r := range releases {
rs[i] = *r
}
st.Releases = rs
return executeSyncCommand(c, st, helm)
} }
} }
} }
@ -480,8 +498,8 @@ func executeTemplateCommand(c *cli.Context, state *state.HelmState, helm helmexe
return state.TemplateReleases(helm, values, args) return state.TemplateReleases(helm, values, args)
} }
func executeDiffCommand(c *cli.Context, state *state.HelmState, helm helmexec.Interface, detailedExitCode, suppressSecrets bool) []error { func executeDiffCommand(c *cli.Context, st *state.HelmState, helm helmexec.Interface, detailedExitCode, suppressSecrets bool) ([]*state.ReleaseSpec, []error) {
args := args.GetArgs(c.String("args"), state) args := args.GetArgs(c.String("args"), st)
if len(args) > 0 { if len(args) > 0 {
helm.SetExtraArgs(args...) helm.SetExtraArgs(args...)
} }
@ -490,15 +508,15 @@ func executeDiffCommand(c *cli.Context, state *state.HelmState, helm helmexec.In
} }
if c.Bool("sync-repos") { if c.Bool("sync-repos") {
if errs := state.SyncRepos(helm); errs != nil && len(errs) > 0 { if errs := st.SyncRepos(helm); errs != nil && len(errs) > 0 {
return errs return []*state.ReleaseSpec{}, errs
} }
} }
values := c.StringSlice("values") values := c.StringSlice("values")
workers := c.Int("concurrency") workers := c.Int("concurrency")
return state.DiffReleases(helm, values, workers, detailedExitCode, suppressSecrets) return st.DiffReleases(helm, values, workers, detailedExitCode, suppressSecrets)
} }
func findAndIterateOverDesiredStatesUsingFlags(c *cli.Context, converge func(*state.HelmState, helmexec.Interface) []error) error { func findAndIterateOverDesiredStatesUsingFlags(c *cli.Context, converge func(*state.HelmState, helmexec.Interface) []error) error {
@ -706,6 +724,8 @@ func clean(st *state.HelmState, errs []error) error {
// Propagate any non-zero exit status from the external command like `helm` that is failed under the hood // Propagate any non-zero exit status from the external command like `helm` that is failed under the hood
status := e.Sys().(syscall.WaitStatus) status := e.Sys().(syscall.WaitStatus)
os.Exit(status.ExitStatus()) os.Exit(status.ExitStatus())
case *state.DiffError:
os.Exit(e.Code)
default: default:
os.Exit(1) os.Exit(1)
} }

View File

@ -19,6 +19,8 @@ import (
"github.com/roboll/helmfile/valuesfile" "github.com/roboll/helmfile/valuesfile"
"go.uber.org/zap" "go.uber.org/zap"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"os/exec"
"syscall"
) )
// HelmState structure for the helmfile // HelmState structure for the helmfile
@ -405,23 +407,37 @@ func (state *HelmState) LintReleases(helm helmexec.Interface, additionalValues [
return nil return nil
} }
// DiffReleases wrapper for executing helm diff on the releases type DiffError struct {
func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []string, workerLimit int, detailedExitCode, suppressSecrets bool) []error { *ReleaseSpec
var wgRelease sync.WaitGroup err error
var wgError sync.WaitGroup Code int
errs := []error{} }
jobQueue := make(chan *ReleaseSpec, len(state.Releases))
errQueue := make(chan error)
if workerLimit < 1 { func (e *DiffError) Error() string {
workerLimit = len(state.Releases) return e.err.Error()
}
type diffResult struct {
err *DiffError
}
type diffPrepareResult struct {
release *ReleaseSpec
flags []string
errors []*ReleaseError
}
func (state *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalValues []string, concurrency int, detailedExitCode, suppressSecrets bool) ([]diffPrepareResult, []error) {
jobs := make(chan *ReleaseSpec, len(state.Releases))
results := make(chan diffPrepareResult)
if concurrency < 1 {
concurrency = len(state.Releases)
} }
wgRelease.Add(len(state.Releases)) for w := 1; w <= concurrency; w++ {
for w := 1; w <= workerLimit; w++ {
go func() { go func() {
for release := range jobQueue { for release := range jobs {
errs := []error{} errs := []error{}
state.applyDefaultsTo(release) state.applyDefaultsTo(release)
@ -451,41 +467,100 @@ func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues [
flags = append(flags, "--suppress-secrets") flags = append(flags, "--suppress-secrets")
} }
if len(errs) == 0 { if len(errs) > 0 {
if err := helm.DiffRelease(release.Name, normalizeChart(state.basePath, release.Chart), flags...); err != nil { rsErrs := make([]*ReleaseError, len(errs))
errs = append(errs, err) for i, e := range errs {
rsErrs[i] = &ReleaseError{release, e}
} }
results <- diffPrepareResult{errors: rsErrs}
} else {
results <- diffPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}}
} }
for _, err := range errs {
errQueue <- err
}
wgRelease.Done()
} }
}() }()
} }
wgError.Add(1)
go func() {
for err := range errQueue {
errs = append(errs, err)
}
wgError.Done()
}()
for i := 0; i < len(state.Releases); i++ { for i := 0; i < len(state.Releases); i++ {
jobQueue <- &state.Releases[i] jobs <- &state.Releases[i]
}
close(jobs)
rs := []diffPrepareResult{}
errs := []error{}
for i := 0; i < len(state.Releases); {
select {
case res := <-results:
if res.errors != nil && len(res.errors) > 0 {
for _, e := range res.errors {
errs = append(errs, e)
}
} else if res.release != nil {
rs = append(rs, res)
}
}
i++
}
return rs, errs
}
// DiffReleases wrapper for executing helm diff on the releases
// It returns releases that had any changes
func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []string, workerLimit int, detailedExitCode, suppressSecrets bool) ([]*ReleaseSpec, []error) {
preps, prepErrs := state.prepareDiffReleases(helm, additionalValues, workerLimit, detailedExitCode, suppressSecrets)
if len(prepErrs) > 0 {
return []*ReleaseSpec{}, prepErrs
} }
jobQueue := make(chan *diffPrepareResult, len(preps))
results := make(chan diffResult, len(preps))
if workerLimit < 1 {
workerLimit = len(state.Releases)
}
for w := 1; w <= workerLimit; w++ {
go func() {
for prep := range jobQueue {
flags := prep.flags
release := prep.release
if err := helm.DiffRelease(release.Name, normalizeChart(state.basePath, release.Chart), flags...); err != nil {
switch e := err.(type) {
case *exec.ExitError:
// Propagate any non-zero exit status from the external command like `helm` that is failed under the hood
status := e.Sys().(syscall.WaitStatus)
results <- diffResult{&DiffError{release, err, status.ExitStatus()}}
default:
results <- diffResult{&DiffError{release, err, 0}}
}
} else {
// diff succeeded, found no changes
results <- diffResult{}
}
}
}()
}
for i := 0; i < len(preps); i++ {
jobQueue <- &preps[i]
}
close(jobQueue) close(jobQueue)
wgRelease.Wait()
close(errQueue) rs := []*ReleaseSpec{}
wgError.Wait() errs := []error{}
for i := 0; i < len(preps); {
if len(errs) != 0 { select {
return errs case res := <-results:
if res.err != nil {
errs = append(errs, res.err)
if res.err.Code == 2 {
rs = append(rs, res.err.ReleaseSpec)
} }
}
return nil i++
}
}
close(results)
return rs, errs
} }
func (state *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int) []error { func (state *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int) []error {