feat: configurable concurrency for `helmfile test` (#442)
`helmfile test --concurency N` to set a concurrency number. It is automatically ceiled at the number of releases just to reduce wasting computing resources. Also, I've refactored the scatter-gather logic scattered across the code-base. Resolves #433
This commit is contained in:
parent
b9a097ed14
commit
571f351a8f
8
main.go
8
main.go
|
|
@ -496,18 +496,24 @@ Do you really want to delete?
|
||||||
Value: 300,
|
Value: 300,
|
||||||
Usage: "maximum time for tests to run before being considered failed",
|
Usage: "maximum time for tests to run before being considered failed",
|
||||||
},
|
},
|
||||||
|
cli.IntFlag{
|
||||||
|
Name: "concurrency",
|
||||||
|
Value: 0,
|
||||||
|
Usage: "maximum number of concurrent helm processes to run, 0 is unlimited",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Action: func(c *cli.Context) error {
|
Action: func(c *cli.Context) error {
|
||||||
return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface, _ context) []error {
|
return findAndIterateOverDesiredStatesUsingFlags(c, func(state *state.HelmState, helm helmexec.Interface, _ context) []error {
|
||||||
cleanup := c.Bool("cleanup")
|
cleanup := c.Bool("cleanup")
|
||||||
timeout := c.Int("timeout")
|
timeout := c.Int("timeout")
|
||||||
|
concurrency := c.Int("concurrency")
|
||||||
|
|
||||||
args := args.GetArgs(c.String("args"), state)
|
args := args.GetArgs(c.String("args"), state)
|
||||||
if len(args) > 0 {
|
if len(args) > 0 {
|
||||||
helm.SetExtraArgs(args...)
|
helm.SetExtraArgs(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return state.TestReleases(helm, cleanup, timeout)
|
return state.TestReleases(helm, cleanup, timeout, concurrency)
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
||||||
389
state/state.go
389
state/state.go
|
|
@ -3,6 +3,7 @@ package state
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/roboll/helmfile/helmexec"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
|
@ -10,9 +11,6 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/roboll/helmfile/helmexec"
|
|
||||||
|
|
||||||
"regexp"
|
"regexp"
|
||||||
|
|
||||||
|
|
@ -188,16 +186,19 @@ func (st *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalValu
|
||||||
jobs := make(chan *ReleaseSpec, numReleases)
|
jobs := make(chan *ReleaseSpec, numReleases)
|
||||||
results := make(chan syncPrepareResult, numReleases)
|
results := make(chan syncPrepareResult, numReleases)
|
||||||
|
|
||||||
if concurrency < 1 {
|
res := []syncPrepareResult{}
|
||||||
concurrency = numReleases
|
errs := []error{}
|
||||||
}
|
|
||||||
|
|
||||||
// WaitGroup is required to wait until goroutine per job in job queue cleanly stops.
|
st.scatterGather(
|
||||||
var waitGroup sync.WaitGroup
|
concurrency,
|
||||||
waitGroup.Add(concurrency)
|
numReleases,
|
||||||
|
func() {
|
||||||
for w := 1; w <= concurrency; w++ {
|
for i := 0; i < numReleases; i++ {
|
||||||
go func() {
|
jobs <- &releases[i]
|
||||||
|
}
|
||||||
|
close(jobs)
|
||||||
|
},
|
||||||
|
func(_ int) {
|
||||||
for release := range jobs {
|
for release := range jobs {
|
||||||
st.applyDefaultsTo(release)
|
st.applyDefaultsTo(release)
|
||||||
|
|
||||||
|
|
@ -227,29 +228,20 @@ func (st *HelmState) prepareSyncReleases(helm helmexec.Interface, additionalValu
|
||||||
|
|
||||||
results <- syncPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}}
|
results <- syncPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}}
|
||||||
}
|
}
|
||||||
waitGroup.Done()
|
},
|
||||||
}()
|
func() {
|
||||||
}
|
for i := 0; i < numReleases; {
|
||||||
|
select {
|
||||||
for i := 0; i < numReleases; i++ {
|
case r := <-results:
|
||||||
jobs <- &releases[i]
|
for _, e := range r.errors {
|
||||||
}
|
errs = append(errs, e)
|
||||||
close(jobs)
|
}
|
||||||
|
res = append(res, r)
|
||||||
res := []syncPrepareResult{}
|
i++
|
||||||
errs := []error{}
|
}
|
||||||
for i := 0; i < numReleases; {
|
|
||||||
select {
|
|
||||||
case r := <-results:
|
|
||||||
for _, e := range r.errors {
|
|
||||||
errs = append(errs, e)
|
|
||||||
}
|
}
|
||||||
res = append(res, r)
|
},
|
||||||
i++
|
)
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
waitGroup.Wait()
|
|
||||||
|
|
||||||
return res, errs
|
return res, errs
|
||||||
}
|
}
|
||||||
|
|
@ -286,21 +278,20 @@ func (st *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []st
|
||||||
return prepErrs
|
return prepErrs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errs := []error{}
|
||||||
jobQueue := make(chan *syncPrepareResult, len(preps))
|
jobQueue := make(chan *syncPrepareResult, len(preps))
|
||||||
results := make(chan syncResult, len(preps))
|
results := make(chan syncResult, len(preps))
|
||||||
|
|
||||||
if workerLimit < 1 {
|
st.scatterGather(
|
||||||
workerLimit = len(preps)
|
workerLimit,
|
||||||
}
|
len(preps),
|
||||||
|
func() {
|
||||||
// WaitGroup is required to wait until goroutine per job in job queue cleanly stops.
|
for i := 0; i < len(preps); i++ {
|
||||||
// Otherwise, cleanup hooks won't run fully.
|
jobQueue <- &preps[i]
|
||||||
// See #363 for more context.
|
}
|
||||||
var waitGroup sync.WaitGroup
|
close(jobQueue)
|
||||||
waitGroup.Add(workerLimit)
|
},
|
||||||
|
func(_ int) {
|
||||||
for w := 1; w <= workerLimit; w++ {
|
|
||||||
go func() {
|
|
||||||
for prep := range jobQueue {
|
for prep := range jobQueue {
|
||||||
release := prep.release
|
release := prep.release
|
||||||
flags := prep.flags
|
flags := prep.flags
|
||||||
|
|
@ -323,29 +314,21 @@ func (st *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []st
|
||||||
st.logger.Warnf("warn: %v\n", err)
|
st.logger.Warnf("warn: %v\n", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
waitGroup.Done()
|
},
|
||||||
}()
|
func() {
|
||||||
}
|
for i := 0; i < len(preps); {
|
||||||
|
select {
|
||||||
for i := 0; i < len(preps); i++ {
|
case res := <-results:
|
||||||
jobQueue <- &preps[i]
|
if len(res.errors) > 0 {
|
||||||
}
|
for _, e := range res.errors {
|
||||||
close(jobQueue)
|
errs = append(errs, e)
|
||||||
|
}
|
||||||
errs := []error{}
|
}
|
||||||
for i := 0; i < len(preps); {
|
|
||||||
select {
|
|
||||||
case res := <-results:
|
|
||||||
if len(res.errors) > 0 {
|
|
||||||
for _, e := range res.errors {
|
|
||||||
errs = append(errs, e)
|
|
||||||
}
|
}
|
||||||
|
i++
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
i++
|
)
|
||||||
}
|
|
||||||
|
|
||||||
waitGroup.Wait()
|
|
||||||
|
|
||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
return errs
|
return errs
|
||||||
|
|
@ -355,7 +338,7 @@ func (st *HelmState) SyncReleases(helm helmexec.Interface, additionalValues []st
|
||||||
}
|
}
|
||||||
|
|
||||||
// downloadCharts will download and untar charts for Lint and Template
|
// downloadCharts will download and untar charts for Lint and Template
|
||||||
func (st *HelmState) downloadCharts(helm helmexec.Interface, dir string, workerLimit int, helmfileCommand string) (map[string]string, []error) {
|
func (st *HelmState) downloadCharts(helm helmexec.Interface, dir string, concurrency int, helmfileCommand string) (map[string]string, []error) {
|
||||||
temp := make(map[string]string, len(st.Releases))
|
temp := make(map[string]string, len(st.Releases))
|
||||||
type downloadResults struct {
|
type downloadResults struct {
|
||||||
releaseName string
|
releaseName string
|
||||||
|
|
@ -363,17 +346,19 @@ func (st *HelmState) downloadCharts(helm helmexec.Interface, dir string, workerL
|
||||||
}
|
}
|
||||||
errs := []error{}
|
errs := []error{}
|
||||||
|
|
||||||
var wgFetch sync.WaitGroup
|
|
||||||
jobQueue := make(chan *ReleaseSpec, len(st.Releases))
|
jobQueue := make(chan *ReleaseSpec, len(st.Releases))
|
||||||
results := make(chan *downloadResults, len(st.Releases))
|
results := make(chan *downloadResults, len(st.Releases))
|
||||||
wgFetch.Add(len(st.Releases))
|
|
||||||
|
|
||||||
if workerLimit < 1 {
|
st.scatterGather(
|
||||||
workerLimit = len(st.Releases)
|
concurrency,
|
||||||
}
|
len(st.Releases),
|
||||||
|
func() {
|
||||||
for w := 1; w <= workerLimit; w++ {
|
for i := 0; i < len(st.Releases); i++ {
|
||||||
go func() {
|
jobQueue <- &st.Releases[i]
|
||||||
|
}
|
||||||
|
close(jobQueue)
|
||||||
|
},
|
||||||
|
func(_ int) {
|
||||||
for release := range jobQueue {
|
for release := range jobQueue {
|
||||||
chartPath := ""
|
chartPath := ""
|
||||||
if pathExists(normalizeChart(st.basePath, release.Chart)) {
|
if pathExists(normalizeChart(st.basePath, release.Chart)) {
|
||||||
|
|
@ -403,20 +388,14 @@ func (st *HelmState) downloadCharts(helm helmexec.Interface, dir string, workerL
|
||||||
|
|
||||||
results <- &downloadResults{release.Name, chartPath}
|
results <- &downloadResults{release.Name, chartPath}
|
||||||
}
|
}
|
||||||
wgFetch.Done()
|
},
|
||||||
}()
|
func() {
|
||||||
}
|
for i := 0; i < len(st.Releases); i++ {
|
||||||
for i := 0; i < len(st.Releases); i++ {
|
downloadRes := <-results
|
||||||
jobQueue <- &st.Releases[i]
|
temp[downloadRes.releaseName] = downloadRes.chartPath
|
||||||
}
|
}
|
||||||
close(jobQueue)
|
},
|
||||||
|
)
|
||||||
for i := 0; i < len(st.Releases); i++ {
|
|
||||||
downloadRes := <-results
|
|
||||||
temp[downloadRes.releaseName] = downloadRes.chartPath
|
|
||||||
}
|
|
||||||
|
|
||||||
wgFetch.Wait()
|
|
||||||
|
|
||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
return nil, errs
|
return nil, errs
|
||||||
|
|
@ -568,16 +547,19 @@ func (st *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalValu
|
||||||
jobs := make(chan *ReleaseSpec, numReleases)
|
jobs := make(chan *ReleaseSpec, numReleases)
|
||||||
results := make(chan diffPrepareResult, numReleases)
|
results := make(chan diffPrepareResult, numReleases)
|
||||||
|
|
||||||
if concurrency < 1 {
|
rs := []diffPrepareResult{}
|
||||||
concurrency = numReleases
|
errs := []error{}
|
||||||
}
|
|
||||||
|
|
||||||
// WaitGroup is required to wait until goroutine per job in job queue cleanly stops.
|
st.scatterGather(
|
||||||
var waitGroup sync.WaitGroup
|
concurrency,
|
||||||
waitGroup.Add(concurrency)
|
numReleases,
|
||||||
|
func() {
|
||||||
for w := 1; w <= concurrency; w++ {
|
for i := 0; i < numReleases; i++ {
|
||||||
go func() {
|
jobs <- &releases[i]
|
||||||
|
}
|
||||||
|
close(jobs)
|
||||||
|
},
|
||||||
|
func(_ int) {
|
||||||
for release := range jobs {
|
for release := range jobs {
|
||||||
errs := []error{}
|
errs := []error{}
|
||||||
|
|
||||||
|
|
@ -618,32 +600,20 @@ func (st *HelmState) prepareDiffReleases(helm helmexec.Interface, additionalValu
|
||||||
results <- diffPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}}
|
results <- diffPrepareResult{release: release, flags: flags, errors: []*ReleaseError{}}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
waitGroup.Done()
|
},
|
||||||
}()
|
func() {
|
||||||
}
|
for i := 0; i < numReleases; i++ {
|
||||||
|
res := <-results
|
||||||
for i := 0; i < numReleases; i++ {
|
if res.errors != nil && len(res.errors) > 0 {
|
||||||
jobs <- &releases[i]
|
for _, e := range res.errors {
|
||||||
}
|
errs = append(errs, e)
|
||||||
close(jobs)
|
}
|
||||||
|
} else if res.release != nil {
|
||||||
rs := []diffPrepareResult{}
|
rs = append(rs, res)
|
||||||
errs := []error{}
|
|
||||||
for i := 0; i < numReleases; {
|
|
||||||
select {
|
|
||||||
case res := <-results:
|
|
||||||
if res.errors != nil && len(res.errors) > 0 {
|
|
||||||
for _, e := range res.errors {
|
|
||||||
errs = append(errs, e)
|
|
||||||
}
|
}
|
||||||
} else if res.release != nil {
|
|
||||||
rs = append(rs, res)
|
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
i++
|
)
|
||||||
}
|
|
||||||
|
|
||||||
waitGroup.Wait()
|
|
||||||
|
|
||||||
return rs, errs
|
return rs, errs
|
||||||
}
|
}
|
||||||
|
|
@ -659,18 +629,19 @@ func (st *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []st
|
||||||
jobQueue := make(chan *diffPrepareResult, len(preps))
|
jobQueue := make(chan *diffPrepareResult, len(preps))
|
||||||
results := make(chan diffResult, len(preps))
|
results := make(chan diffResult, len(preps))
|
||||||
|
|
||||||
if workerLimit < 1 {
|
rs := []*ReleaseSpec{}
|
||||||
workerLimit = len(preps)
|
errs := []error{}
|
||||||
}
|
|
||||||
|
|
||||||
// WaitGroup is required to wait until goroutine per job in job queue cleanly stops.
|
st.scatterGather(
|
||||||
// Otherwise, cleanup hooks won't run fully.
|
workerLimit,
|
||||||
// See #363 for more context.
|
len(preps),
|
||||||
var waitGroup sync.WaitGroup
|
func() {
|
||||||
waitGroup.Add(workerLimit)
|
for i := 0; i < len(preps); i++ {
|
||||||
|
jobQueue <- &preps[i]
|
||||||
for w := 1; w <= workerLimit; w++ {
|
}
|
||||||
go func() {
|
close(jobQueue)
|
||||||
|
},
|
||||||
|
func(_ int) {
|
||||||
for prep := range jobQueue {
|
for prep := range jobQueue {
|
||||||
flags := prep.flags
|
flags := prep.flags
|
||||||
release := prep.release
|
release := prep.release
|
||||||
|
|
@ -694,140 +665,50 @@ func (st *HelmState) DiffReleases(helm helmexec.Interface, additionalValues []st
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
waitGroup.Done()
|
},
|
||||||
}()
|
func() {
|
||||||
}
|
for i := 0; i < len(preps); i++ {
|
||||||
|
res := <-results
|
||||||
for i := 0; i < len(preps); i++ {
|
if res.err != nil {
|
||||||
jobQueue <- &preps[i]
|
errs = append(errs, res.err)
|
||||||
}
|
if res.err.Code == 2 {
|
||||||
close(jobQueue)
|
rs = append(rs, res.err.ReleaseSpec)
|
||||||
|
}
|
||||||
rs := []*ReleaseSpec{}
|
|
||||||
errs := []error{}
|
|
||||||
for i := 0; i < len(preps); {
|
|
||||||
select {
|
|
||||||
case res := <-results:
|
|
||||||
if res.err != nil {
|
|
||||||
errs = append(errs, res.err)
|
|
||||||
if res.err.Code == 2 {
|
|
||||||
rs = append(rs, res.err.ReleaseSpec)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
i++
|
},
|
||||||
}
|
)
|
||||||
}
|
|
||||||
close(results)
|
|
||||||
|
|
||||||
waitGroup.Wait()
|
|
||||||
|
|
||||||
return rs, errs
|
return rs, errs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int) []error {
|
func (st *HelmState) ReleaseStatuses(helm helmexec.Interface, workerLimit int) []error {
|
||||||
var errs []error
|
return st.scatterGatherReleases(helm, workerLimit, func(release ReleaseSpec) error {
|
||||||
jobQueue := make(chan ReleaseSpec)
|
return helm.ReleaseStatus(release.Name)
|
||||||
doneQueue := make(chan bool)
|
})
|
||||||
errQueue := make(chan error)
|
|
||||||
|
|
||||||
if workerLimit < 1 {
|
|
||||||
workerLimit = len(st.Releases)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WaitGroup is required to wait until goroutine per job in job queue cleanly stops.
|
|
||||||
var waitGroup sync.WaitGroup
|
|
||||||
waitGroup.Add(workerLimit)
|
|
||||||
|
|
||||||
for w := 1; w <= workerLimit; w++ {
|
|
||||||
go func() {
|
|
||||||
for release := range jobQueue {
|
|
||||||
if err := helm.ReleaseStatus(release.Name); err != nil {
|
|
||||||
errQueue <- err
|
|
||||||
}
|
|
||||||
doneQueue <- true
|
|
||||||
}
|
|
||||||
waitGroup.Done()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for _, release := range st.Releases {
|
|
||||||
jobQueue <- release
|
|
||||||
}
|
|
||||||
close(jobQueue)
|
|
||||||
}()
|
|
||||||
|
|
||||||
for i := 0; i < len(st.Releases); {
|
|
||||||
select {
|
|
||||||
case err := <-errQueue:
|
|
||||||
errs = append(errs, err)
|
|
||||||
case <-doneQueue:
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
waitGroup.Wait()
|
|
||||||
|
|
||||||
if len(errs) != 0 {
|
|
||||||
return errs
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteReleases wrapper for executing helm delete on the releases
|
// DeleteReleases wrapper for executing helm delete on the releases
|
||||||
func (st *HelmState) DeleteReleases(helm helmexec.Interface, purge bool) []error {
|
func (st *HelmState) DeleteReleases(helm helmexec.Interface, purge bool) []error {
|
||||||
var wg sync.WaitGroup
|
return st.scatterGatherReleases(helm, len(st.Releases), func(release ReleaseSpec) error {
|
||||||
errs := []error{}
|
flags := []string{}
|
||||||
|
if purge {
|
||||||
for _, release := range st.Releases {
|
flags = append(flags, "--purge")
|
||||||
wg.Add(1)
|
}
|
||||||
go func(wg *sync.WaitGroup, release ReleaseSpec) {
|
return helm.DeleteRelease(release.Name, flags...)
|
||||||
flags := []string{}
|
})
|
||||||
if purge {
|
|
||||||
flags = append(flags, "--purge")
|
|
||||||
}
|
|
||||||
if err := helm.DeleteRelease(release.Name, flags...); err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
wg.Done()
|
|
||||||
}(&wg, release)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
if len(errs) != 0 {
|
|
||||||
return errs
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestReleases wrapper for executing helm test on the releases
|
// TestReleases wrapper for executing helm test on the releases
|
||||||
func (st *HelmState) TestReleases(helm helmexec.Interface, cleanup bool, timeout int) []error {
|
func (st *HelmState) TestReleases(helm helmexec.Interface, cleanup bool, timeout int, concurrency int) []error {
|
||||||
var wg sync.WaitGroup
|
return st.scatterGatherReleases(helm, concurrency, func(release ReleaseSpec) error {
|
||||||
errs := []error{}
|
flags := []string{}
|
||||||
|
if cleanup {
|
||||||
for _, release := range st.Releases {
|
flags = append(flags, "--cleanup")
|
||||||
wg.Add(1)
|
}
|
||||||
go func(wg *sync.WaitGroup, release ReleaseSpec) {
|
flags = append(flags, "--timeout", strconv.Itoa(timeout))
|
||||||
flags := []string{}
|
return helm.TestRelease(release.Name, flags...)
|
||||||
if cleanup {
|
})
|
||||||
flags = append(flags, "--cleanup")
|
|
||||||
}
|
|
||||||
flags = append(flags, "--timeout", strconv.Itoa(timeout))
|
|
||||||
if err := helm.TestRelease(release.Name, flags...); err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
wg.Done()
|
|
||||||
}(&wg, release)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
if len(errs) != 0 {
|
|
||||||
return errs
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean will remove any generated secrets
|
// Clean will remove any generated secrets
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,88 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/roboll/helmfile/helmexec"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type result struct {
|
||||||
|
release ReleaseSpec
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (st *HelmState) scatterGather(concurrency int, items int, produceInputs func(), receiveInputsAndProduceIntermediates func(int), aggregateIntermediates func()) {
|
||||||
|
numReleases := len(st.Releases)
|
||||||
|
if concurrency < 1 {
|
||||||
|
concurrency = numReleases
|
||||||
|
} else if concurrency > numReleases {
|
||||||
|
concurrency = numReleases
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitGroup is required to wait until goroutine per job in job queue cleanly stops.
|
||||||
|
var waitGroup sync.WaitGroup
|
||||||
|
waitGroup.Add(concurrency)
|
||||||
|
|
||||||
|
go produceInputs()
|
||||||
|
|
||||||
|
for w := 1; w <= concurrency; w++ {
|
||||||
|
go func(id int) {
|
||||||
|
st.logger.Debugf("worker %d/%d started", id, concurrency)
|
||||||
|
receiveInputsAndProduceIntermediates(id)
|
||||||
|
st.logger.Debugf("worker %d/%d finished", id, concurrency)
|
||||||
|
waitGroup.Done()
|
||||||
|
}(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
aggregateIntermediates()
|
||||||
|
|
||||||
|
// Wait until all the goroutines to gracefully finish
|
||||||
|
waitGroup.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (st *HelmState) scatterGatherReleases(helm helmexec.Interface, concurrency int, do func(ReleaseSpec) error) []error {
|
||||||
|
var errs []error
|
||||||
|
|
||||||
|
inputs := st.Releases
|
||||||
|
inputsSize := len(inputs)
|
||||||
|
|
||||||
|
releases := make(chan ReleaseSpec)
|
||||||
|
results := make(chan result)
|
||||||
|
|
||||||
|
st.scatterGather(
|
||||||
|
concurrency,
|
||||||
|
inputsSize,
|
||||||
|
func() {
|
||||||
|
for _, release := range inputs {
|
||||||
|
releases <- release
|
||||||
|
}
|
||||||
|
close(releases)
|
||||||
|
},
|
||||||
|
func(id int) {
|
||||||
|
for release := range releases {
|
||||||
|
err := do(release)
|
||||||
|
st.logger.Debugf("sending result for release: %s\n", release.Name)
|
||||||
|
results <- result{release: release, err: err}
|
||||||
|
st.logger.Debugf("sent result for release: %s\n", release.Name)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func() {
|
||||||
|
for i := range inputs {
|
||||||
|
st.logger.Debugf("receiving result %d", i)
|
||||||
|
r := <-results
|
||||||
|
if r.err != nil {
|
||||||
|
errs = append(errs, fmt.Errorf("release \"%s\" failed: %v", r.release.Name, r.err))
|
||||||
|
} else {
|
||||||
|
st.logger.Debugf("received result for release \"%s\"", r.release.Name)
|
||||||
|
}
|
||||||
|
st.logger.Debugf("received result for %d", i)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(errs) != 0 {
|
||||||
|
return errs
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -738,6 +738,7 @@ func TestHelmState_SyncReleases(t *testing.T) {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
state := &HelmState{
|
state := &HelmState{
|
||||||
Releases: tt.releases,
|
Releases: tt.releases,
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
if _ = state.SyncReleases(tt.helm, []string{}, 1); !reflect.DeepEqual(tt.helm.releases, tt.wantReleases) {
|
if _ = state.SyncReleases(tt.helm, []string{}, 1); !reflect.DeepEqual(tt.helm.releases, tt.wantReleases) {
|
||||||
t.Errorf("HelmState.SyncReleases() for [%s] = %v, want %v", tt.name, tt.helm.releases, tt.wantReleases)
|
t.Errorf("HelmState.SyncReleases() for [%s] = %v, want %v", tt.name, tt.helm.releases, tt.wantReleases)
|
||||||
|
|
@ -815,6 +816,7 @@ func TestHelmState_ReleaseStatuses(t *testing.T) {
|
||||||
i := func(t *testing.T) {
|
i := func(t *testing.T) {
|
||||||
state := &HelmState{
|
state := &HelmState{
|
||||||
Releases: tt.releases,
|
Releases: tt.releases,
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
errs := state.ReleaseStatuses(tt.helm, 1)
|
errs := state.ReleaseStatuses(tt.helm, 1)
|
||||||
if (errs != nil) != tt.wantErr {
|
if (errs != nil) != tt.wantErr {
|
||||||
|
|
@ -874,8 +876,9 @@ func TestHelmState_TestReleasesNoCleanUp(t *testing.T) {
|
||||||
i := func(t *testing.T) {
|
i := func(t *testing.T) {
|
||||||
state := &HelmState{
|
state := &HelmState{
|
||||||
Releases: tt.releases,
|
Releases: tt.releases,
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
errs := state.TestReleases(tt.helm, tt.cleanup, 1)
|
errs := state.TestReleases(tt.helm, tt.cleanup, 1, 1)
|
||||||
if (errs != nil) != tt.wantErr {
|
if (errs != nil) != tt.wantErr {
|
||||||
t.Errorf("TestReleases() for %s error = %v, wantErr %v", tt.name, errs, tt.wantErr)
|
t.Errorf("TestReleases() for %s error = %v, wantErr %v", tt.name, errs, tt.wantErr)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue