Merge pull request #95 from danielcb/fix-race-condition
Fix race condition. Add concurrency option similiar to DiffReleases
This commit is contained in:
		
						commit
						eaee6ef6a9
					
				
							
								
								
									
										8
									
								
								main.go
								
								
								
								
							
							
						
						
									
										8
									
								
								main.go
								
								
								
								
							|  | @ -134,6 +134,11 @@ func main() { | ||||||
| 					Name:  "sync-repos", | 					Name:  "sync-repos", | ||||||
| 					Usage: "enable a repo sync prior to diffing", | 					Usage: "enable a repo sync prior to diffing", | ||||||
| 				}, | 				}, | ||||||
|  | 				cli.IntFlag{ | ||||||
|  | 					Name:  "concurrency", | ||||||
|  | 					Value: 0, | ||||||
|  | 					Usage: "maximum number of concurrent helm processes to run, 0 is unlimited", | ||||||
|  | 				}, | ||||||
| 			}, | 			}, | ||||||
| 			Action: func(c *cli.Context) error { | 			Action: func(c *cli.Context) error { | ||||||
| 				state, helm, err := before(c) | 				state, helm, err := before(c) | ||||||
|  | @ -156,8 +161,9 @@ func main() { | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				values := c.StringSlice("values") | 				values := c.StringSlice("values") | ||||||
|  | 				workers := c.Int("concurrency") | ||||||
| 
 | 
 | ||||||
| 				errs := state.DiffReleases(helm, values) | 				errs := state.DiffReleases(helm, values, workers) | ||||||
| 				return clean(state, errs) | 				return clean(state, errs) | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
|  |  | ||||||
|  | @ -230,23 +230,32 @@ func (state *HelmState) SyncReleases(helm helmexec.Interface, additionalValues [ | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []string) []error { | func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []string, workerLimit int) []error { | ||||||
| 	var wg sync.WaitGroup | 	var wgRelease sync.WaitGroup | ||||||
|  | 	var wgError sync.WaitGroup | ||||||
| 	errs := []error{} | 	errs := []error{} | ||||||
|  | 	jobQueue := make(chan *ReleaseSpec, len(state.Releases)) | ||||||
|  | 	errQueue := make(chan error) | ||||||
| 
 | 
 | ||||||
| 	for i := 0; i < len(state.Releases); i++ { | 	if workerLimit < 1 { | ||||||
| 		release := &state.Releases[i] | 		workerLimit = len(state.Releases) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	wgRelease.Add(len(state.Releases)) | ||||||
|  | 
 | ||||||
|  | 	for w := 1; w <= workerLimit; w++ { | ||||||
|  | 		go func() { | ||||||
|  | 			for release := range jobQueue { | ||||||
|  | 				errs := []error{} | ||||||
| 				renderedName, err := renderTemplateString(release.Name) | 				renderedName, err := renderTemplateString(release.Name) | ||||||
| 				if err != nil { | 				if err != nil { | ||||||
| 					errs = append(errs, err) | 					errs = append(errs, err) | ||||||
| 				} | 				} | ||||||
| 		wg.Add(1) |  | ||||||
| 		go func(wg *sync.WaitGroup, release *ReleaseSpec) { |  | ||||||
| 				// Plugin command doesn't support explicit namespace
 | 				// Plugin command doesn't support explicit namespace
 | ||||||
| 				release.Namespace = "" | 				release.Namespace = "" | ||||||
| 			flags, flagsErr := flagsForRelease(helm, state.BaseChartPath, release) | 				flags, err := flagsForRelease(helm, state.BaseChartPath, release) | ||||||
| 			if flagsErr != nil { | 				if err != nil { | ||||||
| 				errs = append(errs, flagsErr) | 					errs = append(errs, err) | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				for _, value := range additionalValues { | 				for _, value := range additionalValues { | ||||||
|  | @ -260,15 +269,36 @@ func (state *HelmState) DiffReleases(helm helmexec.Interface, additionalValues [ | ||||||
| 					} | 					} | ||||||
| 					flags = append(flags, "--values", valfile) | 					flags = append(flags, "--values", valfile) | ||||||
| 				} | 				} | ||||||
|  | 
 | ||||||
| 				if len(errs) == 0 { | 				if len(errs) == 0 { | ||||||
| 					if err := helm.DiffRelease(renderedName, normalizeChart(state.BaseChartPath, release.Chart), flags...); err != nil { | 					if err := helm.DiffRelease(renderedName, normalizeChart(state.BaseChartPath, release.Chart), flags...); err != nil { | ||||||
| 						errs = append(errs, err) | 						errs = append(errs, err) | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 			wg.Done() | 				for _, err := range errs { | ||||||
| 		}(&wg, release) | 					errQueue <- err | ||||||
| 				} | 				} | ||||||
| 	wg.Wait() | 				wgRelease.Done() | ||||||
|  | 			} | ||||||
|  | 		}() | ||||||
|  | 	} | ||||||
|  | 	wgError.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		for err := range errQueue { | ||||||
|  | 			errs = append(errs, err) | ||||||
|  | 		} | ||||||
|  | 		wgError.Done() | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	for i := 0; i < len(state.Releases); i++ { | ||||||
|  | 		jobQueue <- &state.Releases[i] | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	close(jobQueue) | ||||||
|  | 	wgRelease.Wait() | ||||||
|  | 
 | ||||||
|  | 	close(errQueue) | ||||||
|  | 	wgError.Wait() | ||||||
| 
 | 
 | ||||||
| 	if len(errs) != 0 { | 	if len(errs) != 0 { | ||||||
| 		return errs | 		return errs | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue