Add option to limit concurrent helm calls (#24)

* Add option to throttle concurrent `helm repo update` calls

Have added a new flag `--concurrency N` to `helmfile sync charts` that can be used to set a
limit on the number of concurrent calls to helm. 

Implementation details:

Switched `SyncCharts` from using a WaitGroup to using a pool
of workers and a queue of jobs. To ensure that this is thread safe and
an attempt is made to sync each chart at the end.

Fixes #23

* Fix formatting and update CI to catch these errors

Have fixed the formatting so that `make pristine` now passes. Have also
added this to the Circle CI config to catch these errors in the future.
This commit is contained in:
Thomas O'Donnell 2018-03-01 13:39:23 +00:00 committed by KUOKA Yusuke
parent c00b869045
commit effc747081
3 changed files with 79 additions and 36 deletions

View File

@ -18,6 +18,7 @@ dependencies:
test:
pre:
- cd "$WORK" && make check
- cd "$WORK" && make pristine
override:
- cd "$WORK" && make test

16
main.go
View File

@ -84,6 +84,11 @@ func main() {
Name: "values",
Usage: "additional value files to be merged into the command",
},
cli.IntFlag{
Name: "concurrency",
Value: 0,
Usage: "maximum number of concurrent helm processes to run, 0 is unlimited",
},
},
Action: func(c *cli.Context) error {
state, helm, err := before(c)
@ -97,8 +102,9 @@ func main() {
}
values := c.StringSlice("values")
workers := c.Int("concurrency")
if errs := state.SyncCharts(helm, values); errs != nil && len(errs) > 0 {
if errs := state.SyncCharts(helm, values, workers); errs != nil && len(errs) > 0 {
for _, err := range errs {
fmt.Printf("err: %s\n", err.Error())
}
@ -164,6 +170,11 @@ func main() {
Name: "values",
Usage: "additional value files to be merged into the command",
},
cli.IntFlag{
Name: "concurrency",
Value: 0,
Usage: "maximum number of concurrent helm processes to run, 0 is unlimited",
},
},
Action: func(c *cli.Context) error {
state, helm, err := before(c)
@ -179,8 +190,9 @@ func main() {
}
values := c.StringSlice("values")
workers := c.Int("concurrency")
if errs := state.SyncCharts(helm, values); errs != nil && len(errs) > 0 {
if errs := state.SyncCharts(helm, values, workers); errs != nil && len(errs) > 0 {
for _, err := range errs {
fmt.Printf("err: %s\n", err.Error())
}

View File

@ -12,10 +12,10 @@ import (
"github.com/roboll/helmfile/helmexec"
"bytes"
yaml "gopkg.in/yaml.v1"
"path"
"regexp"
"bytes"
)
type HelmState struct {
@ -31,9 +31,9 @@ type RepositorySpec struct {
}
type ChartSpec struct {
Chart string `yaml:"chart"`
Version string `yaml:"version"`
Verify bool `yaml:"verify"`
Chart string `yaml:"chart"`
Version string `yaml:"version"`
Verify bool `yaml:"verify"`
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
@ -64,13 +64,11 @@ func ReadFromFile(file string) (*HelmState, error) {
return &state, nil
}
var /* const */
stringTemplateFuncMap = template.FuncMap{
"env": getEnvVar,
}
var stringTemplateFuncMap = template.FuncMap{
"env": getEnvVar,
}
var /* const */
stringTemplate = template.New("stringTemplate").Funcs(stringTemplateFuncMap)
var stringTemplate = template.New("stringTemplate").Funcs(stringTemplateFuncMap)
func getEnvVar(envVarName string) (string, error) {
envVarValue, isSet := os.LookupEnv(envVarName)
@ -124,33 +122,65 @@ func (state *HelmState) SyncRepos(helm helmexec.Interface) []error {
return nil
}
func (state *HelmState) SyncCharts(helm helmexec.Interface, additonalValues []string) []error {
var wg sync.WaitGroup
func (state *HelmState) SyncCharts(helm helmexec.Interface, additonalValues []string, workerLimit int) []error {
errs := []error{}
jobQueue := make(chan ChartSpec)
doneQueue := make(chan bool)
errQueue := make(chan error)
for _, chart := range state.Charts {
wg.Add(1)
go func(wg *sync.WaitGroup, chart ChartSpec) {
flags, flagsErr := flagsForChart(state.BaseChartPath, &chart)
if flagsErr != nil {
errs = append(errs, flagsErr)
}
for _, value := range additonalValues {
valfile, err := filepath.Abs(value)
if err != nil {
errs = append(errs, err)
}
flags = append(flags, "--values", valfile)
}
if len(errs) == 0 {
if err := helm.SyncChart(chart.Name, normalizeChart(state.BaseChartPath, chart.Chart), flags...); err != nil {
errs = append(errs, err)
}
}
wg.Done()
}(&wg, chart)
if workerLimit < 1 {
workerLimit = len(state.Charts)
}
for w := 1; w <= workerLimit; w++ {
go func() {
for chart := range jobQueue {
flags, flagsErr := flagsForChart(state.BaseChartPath, &chart)
if flagsErr != nil {
errQueue <- flagsErr
doneQueue <- true
continue
}
haveValueErr := false
for _, value := range additonalValues {
valfile, err := filepath.Abs(value)
if err != nil {
errQueue <- err
haveValueErr = true
}
flags = append(flags, "--values", valfile)
}
if haveValueErr {
doneQueue <- true
continue
}
if err := helm.SyncChart(chart.Name, normalizeChart(state.BaseChartPath, chart.Chart), flags...); err != nil {
errQueue <- err
}
doneQueue <- true
}
}()
}
go func() {
for _, chart := range state.Charts {
jobQueue <- chart
}
close(jobQueue)
}()
for i := 0; i < len(state.Charts); {
select {
case err := <-errQueue:
errs = append(errs, err)
case <-doneQueue:
i++
}
}
wg.Wait()
if len(errs) != 0 {
return errs