feat: Concurrent chart download in template and lint commands
This enables `helmfile lint` and `helmfile template` commands to fetch and untar all the required charts concurrently. The concurrency is configurable via the `--concurrency` flag, that defaults to `0`. Ref #292
This commit is contained in:
		
							parent
							
								
									1c3bfcca10
								
							
						
					
					
						commit
						af121b85b5
					
				
							
								
								
									
										16
									
								
								main.go
								
								
								
								
							
							
						
						
									
										16
									
								
								main.go
								
								
								
								
							| 
						 | 
					@ -202,6 +202,11 @@ func main() {
 | 
				
			||||||
					Name:  "values",
 | 
										Name:  "values",
 | 
				
			||||||
					Usage: "additional value files to be merged into the command",
 | 
										Usage: "additional value files to be merged into the command",
 | 
				
			||||||
				},
 | 
									},
 | 
				
			||||||
 | 
									cli.IntFlag{
 | 
				
			||||||
 | 
										Name:  "concurrency",
 | 
				
			||||||
 | 
										Value: 0,
 | 
				
			||||||
 | 
										Usage: "maximum number of concurrent downloads of release charts",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			Action: func(c *cli.Context) error {
 | 
								Action: func(c *cli.Context) error {
 | 
				
			||||||
				return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface) []error {
 | 
									return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface) []error {
 | 
				
			||||||
| 
						 | 
					@ -222,6 +227,11 @@ func main() {
 | 
				
			||||||
					Name:  "values",
 | 
										Name:  "values",
 | 
				
			||||||
					Usage: "additional value files to be merged into the command",
 | 
										Usage: "additional value files to be merged into the command",
 | 
				
			||||||
				},
 | 
									},
 | 
				
			||||||
 | 
									cli.IntFlag{
 | 
				
			||||||
 | 
										Name:  "concurrency",
 | 
				
			||||||
 | 
										Value: 0,
 | 
				
			||||||
 | 
										Usage: "maximum number of concurrent downloads of release charts",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			Action: func(c *cli.Context) error {
 | 
								Action: func(c *cli.Context) error {
 | 
				
			||||||
				return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface) []error {
 | 
									return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface) []error {
 | 
				
			||||||
| 
						 | 
					@ -231,8 +241,9 @@ func main() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					values := c.StringSlice("values")
 | 
										values := c.StringSlice("values")
 | 
				
			||||||
					args := args.GetArgs(c.String("args"), state)
 | 
										args := args.GetArgs(c.String("args"), state)
 | 
				
			||||||
 | 
										workers := c.Int("concurrency")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					return state.LintReleases(helm, values, args)
 | 
										return state.LintReleases(helm, values, args, workers)
 | 
				
			||||||
				})
 | 
									})
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
| 
						 | 
					@ -494,8 +505,9 @@ func executeTemplateCommand(c *cli.Context, state *state.HelmState, helm helmexe
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	args := args.GetArgs(c.String("args"), state)
 | 
						args := args.GetArgs(c.String("args"), state)
 | 
				
			||||||
	values := c.StringSlice("values")
 | 
						values := c.StringSlice("values")
 | 
				
			||||||
 | 
						workers := c.Int("concurrency")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return state.TemplateReleases(helm, values, args)
 | 
						return state.TemplateReleases(helm, values, args, workers)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func executeDiffCommand(c *cli.Context, st *state.HelmState, helm helmexec.Interface, detailedExitCode, suppressSecrets bool) ([]*state.ReleaseSpec, []error) {
 | 
					func executeDiffCommand(c *cli.Context, st *state.HelmState, helm helmexec.Interface, detailedExitCode, suppressSecrets bool) ([]*state.ReleaseSpec, []error) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -272,10 +272,21 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// downloadCharts will download and untar charts for Lint and Template
 | 
					// downloadCharts will download and untar charts for Lint and Template
 | 
				
			||||||
func (state *HelmState) downloadCharts(helm helmexec.Interface, dir string) (map[string]string, error) {
 | 
					func (state *HelmState) downloadCharts(helm helmexec.Interface, dir string, workerLimit int) (map[string]string, []error) {
 | 
				
			||||||
	temp := make(map[string]string, len(state.Releases))
 | 
						temp := make(map[string]string, len(state.Releases))
 | 
				
			||||||
 | 
						errs := []error{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, release := range state.Releases {
 | 
						var wgFetch sync.WaitGroup
 | 
				
			||||||
 | 
						jobQueue := make(chan *ReleaseSpec, len(state.Releases))
 | 
				
			||||||
 | 
						wgFetch.Add(len(state.Releases))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if workerLimit < 1 {
 | 
				
			||||||
 | 
							workerLimit = len(state.Releases)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for w := 1; w <= workerLimit; w++ {
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								for release := range jobQueue {
 | 
				
			||||||
				chartPath := ""
 | 
									chartPath := ""
 | 
				
			||||||
				if pathExists(normalizeChart(state.basePath, release.Chart)) {
 | 
									if pathExists(normalizeChart(state.basePath, release.Chart)) {
 | 
				
			||||||
					chartPath = normalizeChart(state.basePath, release.Chart)
 | 
										chartPath = normalizeChart(state.basePath, release.Chart)
 | 
				
			||||||
| 
						 | 
					@ -292,19 +303,31 @@ func (state *HelmState) downloadCharts(helm helmexec.Interface, dir string) (map
 | 
				
			||||||
					if _, err := os.Stat(chartPath); os.IsNotExist(err) {
 | 
										if _, err := os.Stat(chartPath); os.IsNotExist(err) {
 | 
				
			||||||
						fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath)
 | 
											fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath)
 | 
				
			||||||
						if err := helm.Fetch(release.Chart, fetchFlags...); err != nil {
 | 
											if err := helm.Fetch(release.Chart, fetchFlags...); err != nil {
 | 
				
			||||||
					return nil, err
 | 
												errs = append(errs, err)
 | 
				
			||||||
						}
 | 
											}
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					chartPath = path.Join(chartPath, chartNameWithoutRepository(release.Chart))
 | 
										chartPath = path.Join(chartPath, chartNameWithoutRepository(release.Chart))
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				temp[release.Name] = chartPath
 | 
									temp[release.Name] = chartPath
 | 
				
			||||||
 | 
									wgFetch.Done()
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for i := 0; i < len(state.Releases); i++ {
 | 
				
			||||||
 | 
							jobQueue <- &state.Releases[i]
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						close(jobQueue)
 | 
				
			||||||
 | 
						wgFetch.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(errs) > 0 {
 | 
				
			||||||
 | 
							return nil, errs
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return temp, nil
 | 
						return temp, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TemplateReleases wrapper for executing helm template on the releases
 | 
					// TemplateReleases wrapper for executing helm template on the releases
 | 
				
			||||||
func (state *HelmState) TemplateReleases(helm helmexec.Interface, additionalValues []string, args []string) []error {
 | 
					func (state *HelmState) TemplateReleases(helm helmexec.Interface, additionalValues []string, args []string, workerLimit int) []error {
 | 
				
			||||||
	errs := []error{}
 | 
						errs := []error{}
 | 
				
			||||||
	// Create tmp directory and bail immediately if it fails
 | 
						// Create tmp directory and bail immediately if it fails
 | 
				
			||||||
	dir, err := ioutil.TempDir("", "")
 | 
						dir, err := ioutil.TempDir("", "")
 | 
				
			||||||
| 
						 | 
					@ -314,8 +337,9 @@ func (state *HelmState) TemplateReleases(helm helmexec.Interface, additionalValu
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	defer os.RemoveAll(dir)
 | 
						defer os.RemoveAll(dir)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	temp, err := state.downloadCharts(helm, dir)
 | 
						temp, errs := state.downloadCharts(helm, dir, workerLimit)
 | 
				
			||||||
	if err != nil {
 | 
					
 | 
				
			||||||
 | 
						if errs != nil {
 | 
				
			||||||
		errs = append(errs, err)
 | 
							errs = append(errs, err)
 | 
				
			||||||
		return errs
 | 
							return errs
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -356,7 +380,7 @@ func (state *HelmState) TemplateReleases(helm helmexec.Interface, additionalValu
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// LintReleases wrapper for executing helm lint on the releases
 | 
					// LintReleases wrapper for executing helm lint on the releases
 | 
				
			||||||
func (state *HelmState) LintReleases(helm helmexec.Interface, additionalValues []string, args []string) []error {
 | 
					func (state *HelmState) LintReleases(helm helmexec.Interface, additionalValues []string, args []string, workerLimit int) []error {
 | 
				
			||||||
	errs := []error{}
 | 
						errs := []error{}
 | 
				
			||||||
	// Create tmp directory and bail immediately if it fails
 | 
						// Create tmp directory and bail immediately if it fails
 | 
				
			||||||
	dir, err := ioutil.TempDir("", "")
 | 
						dir, err := ioutil.TempDir("", "")
 | 
				
			||||||
| 
						 | 
					@ -366,8 +390,8 @@ func (state *HelmState) LintReleases(helm helmexec.Interface, additionalValues [
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	defer os.RemoveAll(dir)
 | 
						defer os.RemoveAll(dir)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	temp, err := state.downloadCharts(helm, dir)
 | 
						temp, errs := state.downloadCharts(helm, dir, workerLimit)
 | 
				
			||||||
	if err != nil {
 | 
						if errs != nil {
 | 
				
			||||||
		errs = append(errs, err)
 | 
							errs = append(errs, err)
 | 
				
			||||||
		return errs
 | 
							return errs
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue