fixup! Fix race while running `helm dep build` on local chart

This commit is contained in:
Yusuke Kuoka 2020-08-29 10:04:09 +09:00
parent b42e847154
commit 8a43e8f782
1 changed files with 125 additions and 57 deletions

View File

@ -6,6 +6,7 @@ import (
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
"golang.org/x/sync/errgroup"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -798,6 +799,15 @@ type ChartPrepareOptions struct {
SkipResolve bool SkipResolve bool
} }
type chartPrepareResult struct {
releaseName string
chartName string
chartPath string
err error
buildDeps bool
chartFetchedByGoGetter bool
}
// PrepareCharts creates temporary directories of charts. // PrepareCharts creates temporary directories of charts.
// //
// Each resulting "chart" can be one of the followings: // Each resulting "chart" can be one of the followings:
@ -813,23 +823,14 @@ type ChartPrepareOptions struct {
// //
// If exists, it will also patch resources by json patches, strategic-merge patches, and injectors. // If exists, it will also patch resources by json patches, strategic-merge patches, and injectors.
func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurrency int, helmfileCommand string, opts ChartPrepareOptions) (map[string]string, []error) { func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurrency int, helmfileCommand string, opts ChartPrepareOptions) (map[string]string, []error) {
depBuiltChartsMu := &sync.Mutex{}
depBuiltCharts := map[string]bool{}
releases := releasesNeedCharts(st.Releases) releases := releasesNeedCharts(st.Releases)
temp := make(map[string]string, len(releases)) temp := make(map[string]string, len(releases))
type downloadResults struct {
releaseName string
chartPath string
err error
diagnostic string
}
errs := []error{} errs := []error{}
jobQueue := make(chan *ReleaseSpec, len(releases)) jobQueue := make(chan *ReleaseSpec, len(releases))
results := make(chan *downloadResults, len(releases)) results := make(chan *chartPrepareResult, len(releases))
var helm3 bool var helm3 bool
@ -845,7 +846,7 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
*st = *updated *st = *updated
} }
var diagnostics []string var builds []*chartPrepareResult
st.scatterGather( st.scatterGather(
concurrency, concurrency,
@ -864,7 +865,7 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
// If it wasn't called here, Helmfile can end up an issue like // If it wasn't called here, Helmfile can end up an issue like
// https://github.com/roboll/helmfile/issues/1328 // https://github.com/roboll/helmfile/issues/1328
if _, err := st.triggerPrepareEvent(release, helmfileCommand); err != nil { if _, err := st.triggerPrepareEvent(release, helmfileCommand); err != nil {
results <- &downloadResults{err: err} results <- &chartPrepareResult{err: err}
return return
} }
@ -872,7 +873,7 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
chartPath, err := st.goGetterChart(chartName, release.Directory, release.ForceGoGetter) chartPath, err := st.goGetterChart(chartName, release.Directory, release.ForceGoGetter)
if err != nil { if err != nil {
results <- &downloadResults{err: fmt.Errorf("release %q: %w", release.Name, err)} results <- &chartPrepareResult{err: fmt.Errorf("release %q: %w", release.Name, err)}
return return
} }
@ -881,11 +882,11 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
chartification, clean, err := st.PrepareChartify(helm, release, chartPath, workerIndex) chartification, clean, err := st.PrepareChartify(helm, release, chartPath, workerIndex)
defer clean() defer clean()
if err != nil { if err != nil {
results <- &downloadResults{err: err} results <- &chartPrepareResult{err: err}
return return
} }
var diagnostic string var buildDeps bool
if chartification != nil { if chartification != nil {
c := chartify.New( c := chartify.New(
@ -895,7 +896,7 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
out, err := c.Chartify(release.Name, chartPath, chartify.WithChartifyOpts(chartification.Opts)) out, err := c.Chartify(release.Name, chartPath, chartify.WithChartifyOpts(chartification.Opts))
if err != nil { if err != nil {
results <- &downloadResults{err: err} results <- &chartPrepareResult{err: err}
return return
} else { } else {
// TODO Chartify // TODO Chartify
@ -927,36 +928,7 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
// a broken remote chart won't completely block their job. // a broken remote chart won't completely block their job.
chartPath = normalizeChart(st.basePath, chartPath) chartPath = normalizeChart(st.basePath, chartPath)
var doBuildDeps bool buildDeps = true
depBuiltChartsMu.Lock()
if depBuiltCharts == nil {
depBuiltCharts = map[string]bool{}
}
if !depBuiltCharts[chartPath] {
depBuiltCharts[chartPath] = true
doBuildDeps = true
}
depBuiltChartsMu.Unlock()
if !opts.SkipRepos && doBuildDeps {
if err := helm.BuildDeps(release.Name, chartPath); err != nil {
if chartFetchedByGoGetter {
diagnostic = fmt.Sprintf(
"WARN: `helm dep build` failed. While processing release %q, Helmfile observed that remote chart %q fetched by go-getter is seemingly broken. "+
"One of well-known causes of this is that the chart has outdated Chart.lock, which needs the chart maintainer needs to run `helm dep up`. "+
"Helmfile is tolerating the error to avoid blocking you until the remote chart gets fixed. "+
"But this may result in any failure later if the chart is broken badly. FYI, the tolerated error was: %v",
release.Name,
chartName,
err.Error(),
)
} else {
results <- &downloadResults{err: err}
return
}
}
}
} else if !opts.ForceDownload { } else if !opts.ForceDownload {
// At this point, we are sure that either: // At this point, we are sure that either:
// 1. It is a local chart and we can use it in later process (helm upgrade/template/lint/etc) // 1. It is a local chart and we can use it in later process (helm upgrade/template/lint/etc)
@ -996,7 +968,7 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
fetchFlags := st.chartVersionFlags(release) fetchFlags := st.chartVersionFlags(release)
fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath) fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath)
if err := helm.Fetch(chartName, fetchFlags...); err != nil { if err := helm.Fetch(chartName, fetchFlags...); err != nil {
results <- &downloadResults{err: err} results <- &chartPrepareResult{err: err}
return return
} }
} }
@ -1008,36 +980,132 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
} }
} }
results <- &downloadResults{releaseName: release.Name, chartPath: chartPath, diagnostic: diagnostic} results <- &chartPrepareResult{
releaseName: release.Name,
chartName: chartName,
chartPath: chartPath,
buildDeps: buildDeps,
chartFetchedByGoGetter: chartFetchedByGoGetter,
}
} }
}, },
func() { func() {
for i := 0; i < len(releases); i++ { for i := 0; i < len(releases); i++ {
downloadRes := <-results downloadRes := <-results
if d := downloadRes.diagnostic; d != "" {
diagnostics = append(diagnostics, d)
}
if downloadRes.err != nil { if downloadRes.err != nil {
errs = append(errs, downloadRes.err) errs = append(errs, downloadRes.err)
return return
} }
temp[downloadRes.releaseName] = downloadRes.chartPath temp[downloadRes.releaseName] = downloadRes.chartPath
if downloadRes.buildDeps {
builds = append(builds, downloadRes)
}
} }
}, },
) )
if len(errs) > 0 {
return nil, errs
}
if !opts.SkipRepos {
if err := st.runHelmDepBuilds(helm, concurrency, builds); err != nil {
return nil, []error{err}
}
}
return temp, nil
}
func (st *HelmState) runHelmDepBuilds(helm helmexec.Interface, concurrency int, builds []*chartPrepareResult) error {
type buildOutput struct {
diagnostic string
}
buildQueue := make(chan *chartPrepareResult)
buildOutputQueue := make(chan *buildOutput)
buildWorkers := &errgroup.Group{}
for i := 0; i < concurrency; i++ {
buildWorkers.Go(func() error {
for r := range buildQueue {
out, err := func() (*buildOutput, error) {
if err := helm.BuildDeps(r.releaseName, r.chartPath); err != nil {
if r.chartFetchedByGoGetter {
return &buildOutput{
diagnostic: fmt.Sprintf(
"WARN: `helm dep build` failed. While processing release %q, Helmfile observed that remote chart %q fetched by go-getter is seemingly broken. "+
"One of well-known causes of this is that the chart has outdated Chart.lock, which needs the chart maintainer needs to run `helm dep up`. "+
"Helmfile is tolerating the error to avoid blocking you until the remote chart gets fixed. "+
"But this may result in any failure later if the chart is broken badly. FYI, the tolerated error was: %v",
r.releaseName,
r.chartName,
err.Error(),
),
}, nil
}
return nil, err
}
return nil, nil
}()
if err != nil {
return err
}
if out != nil {
buildOutputQueue <- out
}
}
return nil
})
}
diagnosticsAggregator := &sync.WaitGroup{}
diagnosticsAggregator.Add(1)
go func() {
var diagnostics []string
for r := range buildOutputQueue {
if d := r.diagnostic; d != "" {
diagnostics = append(diagnostics, d)
}
}
sort.Strings(diagnostics) sort.Strings(diagnostics)
for _, d := range diagnostics { for _, d := range diagnostics {
st.logger.Warn(d) st.logger.Warn(d)
} }
if len(errs) > 0 { diagnosticsAggregator.Done()
return nil, errs }()
built := map[string]bool{}
for _, b := range builds {
// `helm dep build` fails when it was run concurrency on the same chart.
// To avoid that, we run `helm dep build` only once per each local chart.
//
// See https://github.com/roboll/helmfile/issues/1438
if !built[b.chartPath] {
built[b.chartPath] = true
buildQueue <- b
} }
return temp, nil }
close(buildQueue)
if err := buildWorkers.Wait(); err != nil {
return err
}
close(buildOutputQueue)
diagnosticsAggregator.Wait()
return nil
} }
type TemplateOpts struct { type TemplateOpts struct {