fix: replace 60s timeout with reader-writer locks for OCI chart caching
Address PR review feedback from @champtar about the OCI chart caching mechanism. The previous implementation used a 60-second timeout which was arbitrary and caused race conditions when helm deployments took longer (e.g., deployments triggering scaling up/down). Changes: - Replace 60s refresh marker timeout with proper reader-writer locks - Use shared locks (RLock) when using cached charts (allows concurrent reads) - Use exclusive locks (Lock) when refreshing/downloading charts - Hold locks during entire helm operation lifecycle (not just during download) - Add getNamedRWMutex() for in-process RW coordination - Update PrepareCharts() to return locks map for lifecycle management - Add chartLockReleaser in run.go to release locks after helm callback - Remove unused mutexMap and getNamedMutex (replaced by RW versions) - Add comprehensive tests for shared/exclusive lock behavior This eliminates the race condition where one process could delete a cached chart while another process's helm command was still using it. Fixes review comment on PR #2298 Signed-off-by: Aditya Menon <amenon@canarytechnologies.com>
This commit is contained in:
parent
5d9db12a99
commit
7bb2a1f19b
|
|
@ -42,19 +42,30 @@ func (r *Run) askForConfirmation(msg string) bool {
|
|||
return AskForConfirmation(msg)
|
||||
}
|
||||
|
||||
func (r *Run) prepareChartsIfNeeded(helmfileCommand string, dir string, concurrency int, opts state.ChartPrepareOptions) (map[state.PrepareChartKey]string, error) {
|
||||
// chartLockReleaser is a function that releases all chart locks.
|
||||
// It should be called after helm operations complete.
|
||||
type chartLockReleaser func()
|
||||
|
||||
func (r *Run) prepareChartsIfNeeded(helmfileCommand string, dir string, concurrency int, opts state.ChartPrepareOptions) (map[state.PrepareChartKey]string, chartLockReleaser, error) {
|
||||
// Skip chart preparation for certain commands
|
||||
skipCommands := []string{"write-values", "list"}
|
||||
if slices.Contains(skipCommands, strings.ToLower(helmfileCommand)) {
|
||||
return nil, nil
|
||||
return nil, func() {}, nil
|
||||
}
|
||||
|
||||
releaseToChart, errs := r.state.PrepareCharts(r.helm, dir, concurrency, helmfileCommand, opts)
|
||||
releaseToChart, chartLocks, errs := r.state.PrepareCharts(r.helm, dir, concurrency, helmfileCommand, opts)
|
||||
if len(errs) > 0 {
|
||||
return nil, fmt.Errorf("%v", errs)
|
||||
return nil, func() {}, fmt.Errorf("%v", errs)
|
||||
}
|
||||
|
||||
return releaseToChart, nil
|
||||
// Return a releaser function that releases all acquired locks
|
||||
releaser := func() {
|
||||
for _, lock := range chartLocks {
|
||||
lock.Release(r.state.Logger())
|
||||
}
|
||||
}
|
||||
|
||||
return releaseToChart, releaser, nil
|
||||
}
|
||||
|
||||
func (r *Run) withPreparedCharts(helmfileCommand string, opts state.ChartPrepareOptions, f func()) error {
|
||||
|
|
@ -93,10 +104,13 @@ func (r *Run) withPreparedCharts(helmfileCommand string, opts state.ChartPrepare
|
|||
return err
|
||||
}
|
||||
|
||||
releaseToChart, err := r.prepareChartsIfNeeded(helmfileCommand, dir, opts.Concurrency, opts)
|
||||
releaseToChart, releaseLocks, err := r.prepareChartsIfNeeded(helmfileCommand, dir, opts.Concurrency, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Release all chart locks after helm operations complete.
|
||||
// This ensures charts are not deleted while helm is using them.
|
||||
defer releaseLocks()
|
||||
|
||||
for i := range r.state.Releases {
|
||||
rel := &r.state.Releases[i]
|
||||
|
|
|
|||
|
|
@ -175,6 +175,262 @@ func TestOCIChartFileLock(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
// TestOCIChartSharedExclusiveLocks tests the reader-writer lock pattern
|
||||
// where shared locks allow concurrent reads and exclusive locks are used for writes.
|
||||
func TestOCIChartSharedExclusiveLocks(t *testing.T) {
|
||||
t.Run("multiple shared locks can be acquired simultaneously", func(t *testing.T) {
|
||||
tempDir, err := os.MkdirTemp("", "helmfile-shared-lock-test-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
lockFilePath := filepath.Join(tempDir, "test-chart.lock")
|
||||
|
||||
// Number of concurrent readers
|
||||
numReaders := 5
|
||||
var wg sync.WaitGroup
|
||||
var activeReaders atomic.Int32
|
||||
var maxConcurrentReaders atomic.Int32
|
||||
|
||||
for i := 0; i < numReaders; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
fileLock := flock.New(lockFilePath)
|
||||
// Acquire shared (read) lock
|
||||
err := fileLock.RLock()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Track how many readers are active simultaneously
|
||||
current := activeReaders.Add(1)
|
||||
|
||||
// Update max concurrent readers
|
||||
for {
|
||||
max := maxConcurrentReaders.Load()
|
||||
if current <= max {
|
||||
break
|
||||
}
|
||||
if maxConcurrentReaders.CompareAndSwap(max, current) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Simulate reading while holding shared lock
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
activeReaders.Add(-1)
|
||||
err = fileLock.Unlock()
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Multiple readers should have been active at the same time
|
||||
require.Greater(t, maxConcurrentReaders.Load(), int32(1),
|
||||
"multiple shared locks should be held concurrently")
|
||||
})
|
||||
|
||||
t.Run("exclusive lock blocks shared locks", func(t *testing.T) {
|
||||
tempDir, err := os.MkdirTemp("", "helmfile-excl-block-test-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
lockFilePath := filepath.Join(tempDir, "test-chart.lock")
|
||||
|
||||
// First, acquire exclusive lock
|
||||
writerLock := flock.New(lockFilePath)
|
||||
err = writerLock.Lock()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Try to acquire shared lock with timeout - should fail
|
||||
readerResult := make(chan bool)
|
||||
go func() {
|
||||
readerLock := flock.New(lockFilePath)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
locked, err := readerLock.TryRLockContext(ctx, 10*time.Millisecond)
|
||||
if err != nil {
|
||||
readerResult <- false
|
||||
return
|
||||
}
|
||||
if locked {
|
||||
readerLock.Unlock()
|
||||
}
|
||||
readerResult <- locked
|
||||
}()
|
||||
|
||||
select {
|
||||
case acquired := <-readerResult:
|
||||
require.False(t, acquired, "shared lock should not be acquired while exclusive lock is held")
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("test timed out")
|
||||
}
|
||||
|
||||
// Release exclusive lock
|
||||
err = writerLock.Unlock()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("shared locks block exclusive lock", func(t *testing.T) {
|
||||
tempDir, err := os.MkdirTemp("", "helmfile-shared-block-test-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
lockFilePath := filepath.Join(tempDir, "test-chart.lock")
|
||||
|
||||
// First, acquire shared lock
|
||||
readerLock := flock.New(lockFilePath)
|
||||
err = readerLock.RLock()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Try to acquire exclusive lock with timeout - should fail
|
||||
writerResult := make(chan bool)
|
||||
go func() {
|
||||
writerLock := flock.New(lockFilePath)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
locked, err := writerLock.TryLockContext(ctx, 10*time.Millisecond)
|
||||
if err != nil {
|
||||
writerResult <- false
|
||||
return
|
||||
}
|
||||
if locked {
|
||||
writerLock.Unlock()
|
||||
}
|
||||
writerResult <- locked
|
||||
}()
|
||||
|
||||
select {
|
||||
case acquired := <-writerResult:
|
||||
require.False(t, acquired, "exclusive lock should not be acquired while shared lock is held")
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("test timed out")
|
||||
}
|
||||
|
||||
// Release shared lock
|
||||
err = readerLock.Unlock()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("exclusive lock acquired after shared lock released", func(t *testing.T) {
|
||||
tempDir, err := os.MkdirTemp("", "helmfile-lock-release-test-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
lockFilePath := filepath.Join(tempDir, "test-chart.lock")
|
||||
|
||||
// Acquire shared lock
|
||||
readerLock := flock.New(lockFilePath)
|
||||
err = readerLock.RLock()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start writer waiting for lock
|
||||
writerDone := make(chan bool)
|
||||
go func() {
|
||||
writerLock := flock.New(lockFilePath)
|
||||
// Use longer timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
locked, err := writerLock.TryLockContext(ctx, 10*time.Millisecond)
|
||||
if err == nil && locked {
|
||||
writerLock.Unlock()
|
||||
writerDone <- true
|
||||
return
|
||||
}
|
||||
writerDone <- false
|
||||
}()
|
||||
|
||||
// Release shared lock after a short delay
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
err = readerLock.Unlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Writer should now succeed
|
||||
select {
|
||||
case success := <-writerDone:
|
||||
require.True(t, success, "writer should acquire lock after reader releases")
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("test timed out waiting for writer")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestChartLockResultRelease tests the Release method of chartLockResult
|
||||
// handles both shared and exclusive locks correctly.
|
||||
func TestChartLockResultRelease(t *testing.T) {
|
||||
t.Run("release exclusive lock", func(t *testing.T) {
|
||||
tempDir, err := os.MkdirTemp("", "helmfile-release-excl-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
lockFilePath := filepath.Join(tempDir, "test.lock")
|
||||
fileLock := flock.New(lockFilePath)
|
||||
err = fileLock.Lock()
|
||||
require.NoError(t, err)
|
||||
|
||||
rwMutex := &sync.RWMutex{}
|
||||
rwMutex.Lock()
|
||||
|
||||
result := &chartLockResult{
|
||||
fileLock: fileLock,
|
||||
inProcessMutex: rwMutex,
|
||||
isExclusive: true,
|
||||
chartPath: lockFilePath,
|
||||
}
|
||||
|
||||
// Release should not panic
|
||||
result.Release(nil)
|
||||
|
||||
// Verify lock is released - should be able to acquire again
|
||||
fileLock2 := flock.New(lockFilePath)
|
||||
locked, err := fileLock2.TryLock()
|
||||
require.NoError(t, err)
|
||||
require.True(t, locked, "should be able to acquire lock after release")
|
||||
fileLock2.Unlock()
|
||||
})
|
||||
|
||||
t.Run("release shared lock", func(t *testing.T) {
|
||||
tempDir, err := os.MkdirTemp("", "helmfile-release-shared-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
lockFilePath := filepath.Join(tempDir, "test.lock")
|
||||
fileLock := flock.New(lockFilePath)
|
||||
err = fileLock.RLock()
|
||||
require.NoError(t, err)
|
||||
|
||||
rwMutex := &sync.RWMutex{}
|
||||
rwMutex.RLock()
|
||||
|
||||
result := &chartLockResult{
|
||||
fileLock: fileLock,
|
||||
inProcessMutex: rwMutex,
|
||||
isExclusive: false,
|
||||
chartPath: lockFilePath,
|
||||
}
|
||||
|
||||
// Release should not panic
|
||||
result.Release(nil)
|
||||
|
||||
// Verify lock is released - should be able to acquire exclusive lock
|
||||
fileLock2 := flock.New(lockFilePath)
|
||||
locked, err := fileLock2.TryLock()
|
||||
require.NoError(t, err)
|
||||
require.True(t, locked, "should be able to acquire exclusive lock after shared release")
|
||||
fileLock2.Unlock()
|
||||
})
|
||||
|
||||
t.Run("release nil result is safe", func(t *testing.T) {
|
||||
var result *chartLockResult
|
||||
// Should not panic
|
||||
result.Release(nil)
|
||||
})
|
||||
}
|
||||
|
||||
// TestOCIChartDoubleCheckLocking verifies the double-check locking pattern
|
||||
// works correctly to avoid unnecessary work when the cache is populated
|
||||
// by another process while waiting for the lock.
|
||||
|
|
|
|||
|
|
@ -135,6 +135,12 @@ type HelmState struct {
|
|||
RenderedValues map[string]any
|
||||
}
|
||||
|
||||
// Logger returns the logger instance for this HelmState.
|
||||
// This is used by external callers that need to pass the logger to lock release functions.
|
||||
func (st *HelmState) Logger() *zap.SugaredLogger {
|
||||
return st.logger
|
||||
}
|
||||
|
||||
// SubHelmfileSpec defines the subhelmfile path and options
|
||||
type SubHelmfileSpec struct {
|
||||
//path or glob pattern for the sub helmfiles
|
||||
|
|
@ -1303,6 +1309,7 @@ type chartPrepareResult struct {
|
|||
buildDeps bool
|
||||
skipRefresh bool
|
||||
chartFetchedByGoGetter bool
|
||||
chartLock *chartLockResult // Lock to be released after helm operation completes
|
||||
}
|
||||
|
||||
func (st *HelmState) GetRepositoryAndNameFromChartName(chartName string) (*RepositorySpec, string) {
|
||||
|
|
@ -1319,17 +1326,19 @@ func (st *HelmState) GetRepositoryAndNameFromChartName(chartName string) (*Repos
|
|||
return nil, chartName
|
||||
}
|
||||
|
||||
var mutexMap sync.Map
|
||||
var rwMutexMap sync.Map
|
||||
|
||||
// retrieves or creates a sync.Mutex for a given name
|
||||
func (st *HelmState) getNamedMutex(name string) *sync.Mutex {
|
||||
mu, ok := mutexMap.Load(name)
|
||||
// getNamedRWMutex retrieves or creates a sync.RWMutex for a given name.
|
||||
// This is used for reader-writer locking where multiple readers can access
|
||||
// the resource simultaneously, but writers need exclusive access.
|
||||
func (st *HelmState) getNamedRWMutex(name string) *sync.RWMutex {
|
||||
mu, ok := rwMutexMap.Load(name)
|
||||
if ok {
|
||||
return mu.(*sync.Mutex)
|
||||
return mu.(*sync.RWMutex)
|
||||
}
|
||||
newMu := &sync.Mutex{}
|
||||
actualMu, _ := mutexMap.LoadOrStore(name, newMu)
|
||||
return actualMu.(*sync.Mutex)
|
||||
newMu := &sync.RWMutex{}
|
||||
actualMu, _ := rwMutexMap.LoadOrStore(name, newMu)
|
||||
return actualMu.(*sync.RWMutex)
|
||||
}
|
||||
|
||||
type PrepareChartKey struct {
|
||||
|
|
@ -1452,31 +1461,41 @@ func (st *HelmState) processLocalChart(normalizedChart, dir string, release *Rel
|
|||
return chartPath, nil
|
||||
}
|
||||
|
||||
// forcedDownloadChart handles forced chart downloads
|
||||
func (st *HelmState) forcedDownloadChart(chartName, dir string, release *ReleaseSpec, helm helmexec.Interface, opts ChartPrepareOptions) (string, error) {
|
||||
// forcedDownloadChart handles forced chart downloads.
|
||||
// It returns the chart path and a lock that MUST be released by the caller after helm operations complete.
|
||||
// The lock ensures the chart won't be deleted while helm is using it.
|
||||
func (st *HelmState) forcedDownloadChart(chartName, dir string, release *ReleaseSpec, helm helmexec.Interface, opts ChartPrepareOptions) (string, *chartLockResult, error) {
|
||||
// Check global chart cache first for non-OCI charts
|
||||
cacheKey := st.getChartCacheKey(release)
|
||||
if cachedPath, exists := st.checkChartCache(cacheKey); exists && st.fs.DirectoryExistsAt(cachedPath) {
|
||||
st.logger.Debugf("Chart %s:%s already downloaded, using cached version at %s", chartName, release.Version, cachedPath)
|
||||
return cachedPath, nil
|
||||
// Still need to acquire a shared lock to prevent deletion while using
|
||||
chartPath, err := generateChartPath(chartName, dir, release, opts.OutputDirTemplate)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
lockResult, lockErr := st.acquireChartLock(chartPath, opts)
|
||||
if lockErr != nil {
|
||||
return "", nil, lockErr
|
||||
}
|
||||
return cachedPath, lockResult, nil
|
||||
}
|
||||
|
||||
chartPath, err := generateChartPath(chartName, dir, release, opts.OutputDirTemplate)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
// Acquire locks and determine action
|
||||
lockResult, err := st.acquireChartLock(chartPath, opts)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return "", nil, err
|
||||
}
|
||||
defer lockResult.release(st.logger)
|
||||
|
||||
// If cached, return immediately
|
||||
// If cached, return with lock held
|
||||
if lockResult.action == chartActionUseCached {
|
||||
st.addToChartCache(cacheKey, lockResult.cachedPath)
|
||||
return lockResult.cachedPath, nil
|
||||
return lockResult.cachedPath, lockResult, nil
|
||||
}
|
||||
|
||||
// Download the chart
|
||||
|
|
@ -1484,12 +1503,10 @@ func (st *HelmState) forcedDownloadChart(chartName, dir string, release *Release
|
|||
fetchFlags = st.appendChartVersionFlags(fetchFlags, release)
|
||||
fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath)
|
||||
if err := helm.Fetch(chartName, fetchFlags...); err != nil {
|
||||
return "", err
|
||||
lockResult.Release(st.logger)
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
// Create refresh marker if needed
|
||||
lockResult.createRefreshMarker(st.logger)
|
||||
|
||||
// Set chartPath to be the path containing Chart.yaml, if found
|
||||
fullChartPath, err := findChartDirectory(chartPath)
|
||||
if err == nil {
|
||||
|
|
@ -1499,7 +1516,7 @@ func (st *HelmState) forcedDownloadChart(chartName, dir string, release *Release
|
|||
// Add to global chart cache
|
||||
st.addToChartCache(cacheKey, chartPath)
|
||||
|
||||
return chartPath, nil
|
||||
return chartPath, lockResult, nil
|
||||
}
|
||||
|
||||
// prepareChartForRelease processes a single release and prepares its chart
|
||||
|
|
@ -1519,6 +1536,9 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec.
|
|||
|
||||
chartName := release.Chart
|
||||
|
||||
// Track lock acquired during chart preparation - will be released after helm operations
|
||||
var chartLock *chartLockResult
|
||||
|
||||
chartPath, err := st.downloadChartWithGoGetter(release)
|
||||
if err != nil {
|
||||
return &chartPrepareResult{err: fmt.Errorf("release %q: %w", release.Name, err)}
|
||||
|
|
@ -1526,13 +1546,14 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec.
|
|||
chartFetchedByGoGetter := chartPath != chartName
|
||||
|
||||
if !chartFetchedByGoGetter {
|
||||
ociChartPath, err := st.getOCIChart(release, dir, helm, opts)
|
||||
ociChartPath, lock, err := st.getOCIChart(release, dir, helm, opts)
|
||||
if err != nil {
|
||||
return &chartPrepareResult{err: fmt.Errorf("release %q: %w", release.Name, err)}
|
||||
}
|
||||
|
||||
if ociChartPath != nil {
|
||||
chartPath = *ociChartPath
|
||||
chartLock = lock
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1591,10 +1612,12 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec.
|
|||
// For helm 2, we `helm fetch` with the version flags and call `helm template`
|
||||
// WITHOUT the version flags.
|
||||
} else {
|
||||
chartPath, err = st.forcedDownloadChart(chartName, dir, release, helm, opts)
|
||||
var lock *chartLockResult
|
||||
chartPath, lock, err = st.forcedDownloadChart(chartName, dir, release, helm, opts)
|
||||
if err != nil {
|
||||
return &chartPrepareResult{err: err}
|
||||
}
|
||||
chartLock = lock
|
||||
}
|
||||
|
||||
return &chartPrepareResult{
|
||||
|
|
@ -1606,20 +1629,24 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec.
|
|||
buildDeps: buildDeps,
|
||||
skipRefresh: skipRefresh,
|
||||
chartFetchedByGoGetter: chartFetchedByGoGetter,
|
||||
chartLock: chartLock,
|
||||
}
|
||||
}
|
||||
|
||||
func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurrency int, helmfileCommand string, opts ChartPrepareOptions) (map[PrepareChartKey]string, []error) {
|
||||
// PrepareCharts downloads and prepares all charts for the selected releases.
|
||||
// Returns the chart paths, the chart locks (which MUST be released by the caller after helm operations),
|
||||
// and any errors encountered.
|
||||
func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurrency int, helmfileCommand string, opts ChartPrepareOptions) (map[PrepareChartKey]string, map[PrepareChartKey]*chartLockResult, []error) {
|
||||
if !opts.SkipResolve {
|
||||
updated, err := st.ResolveDeps()
|
||||
if err != nil {
|
||||
return nil, []error{err}
|
||||
return nil, nil, []error{err}
|
||||
}
|
||||
*st = *updated
|
||||
}
|
||||
selected, err := st.GetSelectedReleases(opts.IncludeTransitiveNeeds)
|
||||
if err != nil {
|
||||
return nil, []error{err}
|
||||
return nil, nil, []error{err}
|
||||
}
|
||||
|
||||
releases := releasesNeedCharts(selected)
|
||||
|
|
@ -1633,6 +1660,7 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
|
|||
var prepareChartInfoMutex sync.Mutex
|
||||
|
||||
prepareChartInfo := make(map[PrepareChartKey]string, len(releases))
|
||||
chartLocks := make(map[PrepareChartKey]*chartLockResult, len(releases))
|
||||
|
||||
errs := []error{}
|
||||
|
||||
|
|
@ -1662,17 +1690,24 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
|
|||
|
||||
if downloadRes.err != nil {
|
||||
errs = append(errs, downloadRes.err)
|
||||
|
||||
// Release any lock that was acquired before error
|
||||
if downloadRes.chartLock != nil {
|
||||
downloadRes.chartLock.Release(st.logger)
|
||||
}
|
||||
return
|
||||
}
|
||||
func() {
|
||||
prepareChartInfoMutex.Lock()
|
||||
defer prepareChartInfoMutex.Unlock()
|
||||
prepareChartInfo[PrepareChartKey{
|
||||
key := PrepareChartKey{
|
||||
Namespace: downloadRes.releaseNamespace,
|
||||
KubeContext: downloadRes.releaseContext,
|
||||
Name: downloadRes.releaseName,
|
||||
}] = downloadRes.chartPath
|
||||
}
|
||||
prepareChartInfo[key] = downloadRes.chartPath
|
||||
if downloadRes.chartLock != nil {
|
||||
chartLocks[key] = downloadRes.chartLock
|
||||
}
|
||||
}()
|
||||
|
||||
if downloadRes.buildDeps {
|
||||
|
|
@ -1683,16 +1718,24 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
|
|||
)
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, errs
|
||||
// Release all acquired locks on error
|
||||
for _, lock := range chartLocks {
|
||||
lock.Release(st.logger)
|
||||
}
|
||||
return nil, nil, errs
|
||||
}
|
||||
|
||||
if len(builds) > 0 {
|
||||
if err := st.runHelmDepBuilds(helm, concurrency, builds); err != nil {
|
||||
return nil, []error{err}
|
||||
// Release all acquired locks on error
|
||||
for _, lock := range chartLocks {
|
||||
lock.Release(st.logger)
|
||||
}
|
||||
return nil, nil, []error{err}
|
||||
}
|
||||
}
|
||||
|
||||
return prepareChartInfo, nil
|
||||
return prepareChartInfo, chartLocks, nil
|
||||
}
|
||||
|
||||
// nolint: unparam
|
||||
|
|
@ -4374,48 +4417,50 @@ const (
|
|||
chartActionRefresh // Delete and re-download (refresh)
|
||||
)
|
||||
|
||||
// chartLockResult contains the result of acquiring locks and checking cache
|
||||
// chartLockResult contains the result of acquiring locks and checking cache.
|
||||
// It supports both shared (read) and exclusive (write) locks using reader-writer semantics.
|
||||
// Multiple processes can hold shared locks simultaneously, but exclusive locks block all others.
|
||||
type chartLockResult struct {
|
||||
action chartDownloadAction
|
||||
cachedPath string // Path to use if action is chartActionUseCached
|
||||
fileLock *flock.Flock // File lock to release after operation
|
||||
inProcessMutex *sync.Mutex // In-process mutex to release after operation
|
||||
refreshMarkerPath string // Path to refresh marker file
|
||||
needsRefresh bool // Whether refresh mode is enabled
|
||||
action chartDownloadAction
|
||||
cachedPath string // Path to use if action is chartActionUseCached
|
||||
fileLock *flock.Flock // File lock for cross-process synchronization
|
||||
inProcessMutex *sync.RWMutex // In-process RWMutex for goroutine coordination
|
||||
isExclusive bool // True if holding exclusive lock, false if shared
|
||||
chartPath string // The chart path this lock is for (used for logging)
|
||||
}
|
||||
|
||||
// release releases all acquired locks
|
||||
func (r *chartLockResult) release(logger *zap.SugaredLogger) {
|
||||
// Release releases all acquired locks (both file and in-process).
|
||||
// This method is safe to call even if locks were not acquired.
|
||||
func (r *chartLockResult) Release(logger *zap.SugaredLogger) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
if r.inProcessMutex != nil {
|
||||
r.inProcessMutex.Unlock()
|
||||
if r.isExclusive {
|
||||
r.inProcessMutex.Unlock()
|
||||
} else {
|
||||
r.inProcessMutex.RUnlock()
|
||||
}
|
||||
}
|
||||
if r.fileLock != nil {
|
||||
if err := r.fileLock.Unlock(); err != nil {
|
||||
logger.Warnf("Failed to release file lock: %v", err)
|
||||
logger.Warnf("Failed to release file lock for %s: %v", r.chartPath, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createRefreshMarker creates the refresh marker file if in refresh mode
|
||||
func (r *chartLockResult) createRefreshMarker(logger *zap.SugaredLogger) {
|
||||
if r.needsRefresh && r.refreshMarkerPath != "" {
|
||||
if markerFile, err := os.Create(r.refreshMarkerPath); err == nil {
|
||||
if err := markerFile.Close(); err != nil {
|
||||
logger.Warnf("Failed to close refresh marker file %s: %v", r.refreshMarkerPath, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// acquireChartLock acquires file and in-process locks, then determines what action to take
|
||||
// based on cache state and refresh mode. The caller MUST call result.release() when done.
|
||||
// acquireChartLock acquires file and in-process locks using reader-writer semantics.
|
||||
// - Shared (read) lock: Multiple processes can use the cached chart simultaneously
|
||||
// - Exclusive (write) lock: Only one process can refresh (delete + download) at a time
|
||||
//
|
||||
// The caller MUST call result.Release() when done with the chart (after helm operations complete).
|
||||
// Locks should be held for the entire duration of chart usage, not just during download.
|
||||
func (st *HelmState) acquireChartLock(chartPath string, opts ChartPrepareOptions) (*chartLockResult, error) {
|
||||
result := &chartLockResult{
|
||||
refreshMarkerPath: chartPath + ".refreshed",
|
||||
needsRefresh: !opts.SkipDeps && !opts.SkipRefresh,
|
||||
chartPath: chartPath,
|
||||
}
|
||||
|
||||
// Acquire filesystem-level lock for cross-process synchronization
|
||||
// Ensure lock directory exists
|
||||
lockFilePath := chartPath + ".lock"
|
||||
lockFileDir := filepath.Dir(lockFilePath)
|
||||
if err := os.MkdirAll(lockFileDir, 0755); err != nil {
|
||||
|
|
@ -4423,81 +4468,57 @@ func (st *HelmState) acquireChartLock(chartPath string, opts ChartPrepareOptions
|
|||
}
|
||||
result.fileLock = flock.New(lockFilePath)
|
||||
|
||||
// Retry logic with timeout for lock acquisition
|
||||
const (
|
||||
lockTimeout = 5 * time.Minute
|
||||
maxRetries = 3
|
||||
retryBackoff = 2 * time.Second
|
||||
)
|
||||
needsRefresh := !opts.SkipDeps && !opts.SkipRefresh
|
||||
|
||||
var locked bool
|
||||
var lockErr error
|
||||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), lockTimeout)
|
||||
locked, lockErr = result.fileLock.TryLockContext(ctx, 500*time.Millisecond)
|
||||
cancel()
|
||||
|
||||
if locked {
|
||||
break
|
||||
}
|
||||
if lockErr != nil {
|
||||
st.logger.Warnf("Failed to acquire file lock (attempt %d/%d): %v", attempt, maxRetries, lockErr)
|
||||
if attempt < maxRetries {
|
||||
time.Sleep(retryBackoff)
|
||||
}
|
||||
// Optimistic path: Try shared lock first if cache likely exists and no refresh needed
|
||||
if st.fs.DirectoryExistsAt(chartPath) && !needsRefresh {
|
||||
if err := st.acquireSharedLock(result, chartPath); err != nil {
|
||||
// Failed to acquire shared lock, will try exclusive below
|
||||
st.logger.Debugf("Failed to acquire shared lock for %s: %v, trying exclusive", chartPath, err)
|
||||
} else {
|
||||
return nil, fmt.Errorf("timeout waiting for file lock on chart %s after %v", chartPath, lockTimeout)
|
||||
// Got shared lock, validate cache
|
||||
if fullChartPath, err := findChartDirectory(chartPath); err == nil {
|
||||
result.action = chartActionUseCached
|
||||
result.cachedPath = filepath.Dir(fullChartPath)
|
||||
st.logger.Debugf("Using cached chart at %s (shared lock)", result.cachedPath)
|
||||
return result, nil
|
||||
}
|
||||
// Invalid cache - release shared lock and acquire exclusive
|
||||
st.logger.Debugf("Chart cache at %s is invalid, upgrading to exclusive lock", chartPath)
|
||||
result.Release(st.logger)
|
||||
}
|
||||
}
|
||||
if !locked {
|
||||
return nil, fmt.Errorf("failed to acquire file lock for chart %s after %d attempts: %w", chartPath, maxRetries, lockErr)
|
||||
|
||||
// Need exclusive lock for: download, refresh, or cache validation failure
|
||||
if err := st.acquireExclusiveLock(result, chartPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Acquire in-process mutex for goroutine coordination
|
||||
result.inProcessMutex = st.getNamedMutex(chartPath)
|
||||
result.inProcessMutex.Lock()
|
||||
|
||||
// Determine action based on cache state and refresh mode
|
||||
const refreshMarkerMaxAge = 60 * time.Second
|
||||
|
||||
// Double-check after acquiring exclusive lock (another process may have populated cache)
|
||||
if st.fs.DirectoryExistsAt(chartPath) {
|
||||
fullChartPath, err := findChartDirectory(chartPath)
|
||||
if err == nil {
|
||||
// Valid chart found
|
||||
if result.needsRefresh {
|
||||
// Check if another process already refreshed in this session
|
||||
if markerInfo, markerErr := os.Stat(result.refreshMarkerPath); markerErr == nil {
|
||||
markerAge := time.Since(markerInfo.ModTime())
|
||||
if markerAge < refreshMarkerMaxAge {
|
||||
// Fresh marker - use cached
|
||||
result.action = chartActionUseCached
|
||||
result.cachedPath = filepath.Dir(fullChartPath)
|
||||
st.logger.Debugf("Using cached chart at %s (refreshed %.1fs ago by another process)", result.cachedPath, markerAge.Seconds())
|
||||
return result, nil
|
||||
}
|
||||
// Stale marker - remove and refresh
|
||||
st.logger.Debugf("Refresh marker at %s is stale (%.1fs old), will refresh", result.refreshMarkerPath, markerAge.Seconds())
|
||||
_ = os.Remove(result.refreshMarkerPath) // Best-effort cleanup, ignore error
|
||||
}
|
||||
// No fresh marker - need to refresh
|
||||
st.logger.Debugf("Refreshing chart at %s (first process in refresh mode)", chartPath)
|
||||
if err := os.RemoveAll(chartPath); err != nil {
|
||||
result.release(st.logger)
|
||||
return nil, err
|
||||
}
|
||||
result.action = chartActionRefresh
|
||||
} else {
|
||||
// Not refresh mode - use cached
|
||||
// Valid cache exists now
|
||||
if !needsRefresh {
|
||||
// Another process populated the cache while we waited for lock
|
||||
result.action = chartActionUseCached
|
||||
result.cachedPath = filepath.Dir(fullChartPath)
|
||||
st.logger.Debugf("Using cached chart at %s", result.cachedPath)
|
||||
st.logger.Debugf("Using cached chart at %s (populated by another process)", result.cachedPath)
|
||||
return result, nil
|
||||
}
|
||||
} else {
|
||||
// Directory exists but no valid Chart.yaml - corrupted
|
||||
st.logger.Debugf("Chart directory exists but no valid Chart.yaml at %s: %v, will re-download", chartPath, err)
|
||||
// Refresh mode: delete and re-download
|
||||
st.logger.Debugf("Refreshing chart at %s (exclusive lock)", chartPath)
|
||||
if err := os.RemoveAll(chartPath); err != nil {
|
||||
result.release(st.logger)
|
||||
return nil, err
|
||||
result.Release(st.logger)
|
||||
return nil, fmt.Errorf("failed to remove chart directory for refresh: %w", err)
|
||||
}
|
||||
result.action = chartActionRefresh
|
||||
} else {
|
||||
// Directory exists but invalid (no Chart.yaml) - corrupted cache
|
||||
st.logger.Debugf("Chart directory at %s is corrupted: %v, will re-download", chartPath, err)
|
||||
if err := os.RemoveAll(chartPath); err != nil {
|
||||
result.Release(st.logger)
|
||||
return nil, fmt.Errorf("failed to remove corrupted chart directory: %w", err)
|
||||
}
|
||||
result.action = chartActionDownload
|
||||
}
|
||||
|
|
@ -4508,21 +4529,97 @@ func (st *HelmState) acquireChartLock(chartPath string, opts ChartPrepareOptions
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helmexec.Interface, opts ChartPrepareOptions) (*string, error) {
|
||||
// acquireSharedLock acquires a shared (read) lock for concurrent read access.
|
||||
// Multiple processes can hold shared locks simultaneously, but shared locks
|
||||
// block exclusive lock acquisition.
|
||||
func (st *HelmState) acquireSharedLock(result *chartLockResult, chartPath string) error {
|
||||
const lockTimeout = 5 * time.Minute
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), lockTimeout)
|
||||
defer cancel()
|
||||
|
||||
locked, err := result.fileLock.TryRLockContext(ctx, 500*time.Millisecond)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to acquire shared file lock: %w", err)
|
||||
}
|
||||
if !locked {
|
||||
return fmt.Errorf("timeout waiting for shared file lock on chart %s", chartPath)
|
||||
}
|
||||
|
||||
// Also acquire in-process RLock
|
||||
result.inProcessMutex = st.getNamedRWMutex(chartPath)
|
||||
result.inProcessMutex.RLock()
|
||||
result.isExclusive = false
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// acquireExclusiveLock acquires an exclusive (write) lock with retry logic.
|
||||
// Exclusive locks block all other locks (both shared and exclusive).
|
||||
func (st *HelmState) acquireExclusiveLock(result *chartLockResult, chartPath string) error {
|
||||
const (
|
||||
lockTimeout = 5 * time.Minute
|
||||
maxRetries = 3
|
||||
retryBackoff = 2 * time.Second
|
||||
)
|
||||
|
||||
var locked bool
|
||||
var lockErr error
|
||||
|
||||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), lockTimeout)
|
||||
locked, lockErr = result.fileLock.TryLockContext(ctx, 500*time.Millisecond)
|
||||
cancel()
|
||||
|
||||
if locked {
|
||||
break
|
||||
}
|
||||
if lockErr != nil {
|
||||
st.logger.Warnf("Failed to acquire exclusive file lock (attempt %d/%d): %v", attempt, maxRetries, lockErr)
|
||||
if attempt < maxRetries {
|
||||
time.Sleep(retryBackoff)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("timeout waiting for exclusive file lock on chart %s after %v", chartPath, lockTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
if !locked {
|
||||
return fmt.Errorf("failed to acquire exclusive file lock for chart %s after %d attempts: %w", chartPath, maxRetries, lockErr)
|
||||
}
|
||||
|
||||
// Also acquire in-process Lock
|
||||
result.inProcessMutex = st.getNamedRWMutex(chartPath)
|
||||
result.inProcessMutex.Lock()
|
||||
result.isExclusive = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getOCIChart downloads or retrieves an OCI chart from cache.
|
||||
// It returns the chart path and a lock that MUST be released by the caller after helm operations complete.
|
||||
// The lock ensures the chart won't be deleted while helm is using it.
|
||||
func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helmexec.Interface, opts ChartPrepareOptions) (*string, *chartLockResult, error) {
|
||||
qualifiedChartName, chartName, chartVersion, err := st.getOCIQualifiedChartName(release)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if qualifiedChartName == "" {
|
||||
return nil, nil
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// Check global chart cache first
|
||||
// Check global chart cache first (in-memory cache, no lock needed)
|
||||
cacheKey := st.getChartCacheKey(release)
|
||||
if cachedPath, exists := st.checkChartCache(cacheKey); exists && st.fs.DirectoryExistsAt(cachedPath) {
|
||||
st.logger.Debugf("OCI chart %s:%s already downloaded, using cached version at %s", chartName, chartVersion, cachedPath)
|
||||
return &cachedPath, nil
|
||||
// Still need to acquire a shared lock to prevent deletion while using
|
||||
chartPath, _ := st.getOCIChartPath(tempDir, release, chartName, chartVersion, opts.OutputDirTemplate)
|
||||
lockResult, lockErr := st.acquireChartLock(chartPath, opts)
|
||||
if lockErr != nil {
|
||||
return nil, nil, lockErr
|
||||
}
|
||||
return &cachedPath, lockResult, nil
|
||||
}
|
||||
|
||||
if opts.OutputDirTemplate == "" {
|
||||
|
|
@ -4534,20 +4631,20 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm
|
|||
// Acquire locks and determine action
|
||||
lockResult, err := st.acquireChartLock(chartPath, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
defer lockResult.release(st.logger)
|
||||
|
||||
// If cached, return immediately
|
||||
// If cached, return with lock held
|
||||
if lockResult.action == chartActionUseCached {
|
||||
st.addToChartCache(cacheKey, lockResult.cachedPath)
|
||||
return &lockResult.cachedPath, nil
|
||||
return &lockResult.cachedPath, lockResult, nil
|
||||
}
|
||||
|
||||
// Ensure temp directory exists
|
||||
if _, err := os.Stat(tempDir); err != nil {
|
||||
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||||
return nil, err
|
||||
lockResult.Release(st.logger)
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -4559,19 +4656,19 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm
|
|||
flags = st.appendChartVersionFlags(flags, release)
|
||||
|
||||
if err := helm.ChartPull(qualifiedChartName, chartPath, flags...); err != nil {
|
||||
return nil, err
|
||||
lockResult.Release(st.logger)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err := helm.ChartExport(qualifiedChartName, chartPath); err != nil {
|
||||
return nil, err
|
||||
lockResult.Release(st.logger)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Create refresh marker if needed
|
||||
lockResult.createRefreshMarker(st.logger)
|
||||
|
||||
fullChartPath, err := findChartDirectory(chartPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
lockResult.Release(st.logger)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
chartPath = filepath.Dir(fullChartPath)
|
||||
|
|
@ -4579,7 +4676,7 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm
|
|||
// Add to global chart cache
|
||||
st.addToChartCache(cacheKey, chartPath)
|
||||
|
||||
return &chartPath, nil
|
||||
return &chartPath, lockResult, nil
|
||||
}
|
||||
|
||||
// IsOCIChart returns true if the chart is an OCI chart
|
||||
|
|
|
|||
Loading…
Reference in New Issue