Fix race condition. Add concurrency option similiar to DiffReleases

This commit is contained in:
Daniel 2018-04-06 16:05:32 +02:00
parent d380b8d550
commit 2b07b9715e
2 changed files with 65 additions and 29 deletions

View File

@ -134,6 +134,11 @@ func main() {
Name: "sync-repos",
Usage: "enable a repo sync prior to diffing",
},
cli.IntFlag{
Name: "concurrency",
Value: 0,
Usage: "maximum number of concurrent helm processes to run, 0 is unlimited",
},
},
Action: func(c *cli.Context) error {
state, helm, err := before(c)
@ -156,8 +161,9 @@ func main() {
}
values := c.StringSlice("values")
workers := c.Int("concurrency")
errs := state.DiffReleases(helm, values)
errs := state.DiffReleases(helm, values, workers)
return clean(state, errs)
},
},

View File

@ -230,45 +230,75 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [
return nil
}
func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []string) []error {
var wg sync.WaitGroup
func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []string, workerLimit int) []error {
var wgRelease sync.WaitGroup
var wgError sync.WaitGroup
errs := []error{}
jobQueue := make(chan *ReleaseSpec, len(state.Releases))
errQueue := make(chan error)
for i := 0; i < len(state.Releases); i++ {
release := &state.Releases[i]
renderedName, err := renderTemplateString(release.Name)
if err != nil {
errs = append(errs, err)
}
wg.Add(1)
go func(wg *sync.WaitGroup, release *ReleaseSpec) {
// Plugin command doesn't support explicit namespace
release.Namespace = ""
flags, flagsErr := flagsForRelease(helm, state.BaseChartPath, release)
if flagsErr != nil {
errs = append(errs, flagsErr)
}
if workerLimit < 1 {
workerLimit = len(state.Releases)
}
for _, value := range additionalValues {
valfile, err := filepath.Abs(value)
wgRelease.Add(len(state.Releases))
for w := 1; w <= workerLimit; w++ {
go func() {
for release := range jobQueue {
errs := []error{}
renderedName, err := renderTemplateString(release.Name)
if err != nil {
errs = append(errs, err)
}
// Plugin command doesn't support explicit namespace
release.Namespace = ""
flags, err := flagsForRelease(helm, state.BaseChartPath, release)
if err != nil {
errs = append(errs, err)
}
if _, err := os.Stat(valfile); os.IsNotExist(err) {
errs = append(errs, err)
for _, value := range additionalValues {
valfile, err := filepath.Abs(value)
if err != nil {
errs = append(errs, err)
}
if _, err := os.Stat(valfile); os.IsNotExist(err) {
errs = append(errs, err)
}
flags = append(flags, "--values", valfile)
}
flags = append(flags, "--values", valfile)
}
if len(errs) == 0 {
if err := helm.DiffRelease(renderedName, normalizeChart(state.BaseChartPath, release.Chart), flags...); err != nil {
errs = append(errs, err)
if len(errs) == 0 {
if err := helm.DiffRelease(renderedName, normalizeChart(state.BaseChartPath, release.Chart), flags...); err != nil {
errs = append(errs, err)
}
}
for _, err := range errs {
errQueue <- err
}
wgRelease.Done()
}
wg.Done()
}(&wg, release)
}()
}
wg.Wait()
wgError.Add(1)
go func() {
for err := range errQueue {
errs = append(errs, err)
}
wgError.Done()
}()
for i := 0; i < len(state.Releases); i++ {
jobQueue <- &state.Releases[i]
}
close(jobQueue)
wgRelease.Wait()
close(errQueue)
wgError.Wait()
if len(errs) != 0 {
return errs