From 7bb2a1f19b95458a77f727d6f1c8a87534af57fa Mon Sep 17 00:00:00 2001 From: Aditya Menon Date: Wed, 26 Nov 2025 23:48:56 +0100 Subject: [PATCH] fix: replace 60s timeout with reader-writer locks for OCI chart caching Address PR review feedback from @champtar about the OCI chart caching mechanism. The previous implementation used a 60-second timeout which was arbitrary and caused race conditions when helm deployments took longer (e.g., deployments triggering scaling up/down). Changes: - Replace 60s refresh marker timeout with proper reader-writer locks - Use shared locks (RLock) when using cached charts (allows concurrent reads) - Use exclusive locks (Lock) when refreshing/downloading charts - Hold locks during entire helm operation lifecycle (not just during download) - Add getNamedRWMutex() for in-process RW coordination - Update PrepareCharts() to return locks map for lifecycle management - Add chartLockReleaser in run.go to release locks after helm callback - Remove unused mutexMap and getNamedMutex (replaced by RW versions) - Add comprehensive tests for shared/exclusive lock behavior This eliminates the race condition where one process could delete a cached chart while another process's helm command was still using it. Fixes review comment on PR #2298 Signed-off-by: Aditya Menon --- pkg/app/run.go | 26 ++- pkg/state/oci_chart_lock_test.go | 256 +++++++++++++++++++++ pkg/state/state.go | 373 +++++++++++++++++++------------ 3 files changed, 511 insertions(+), 144 deletions(-) diff --git a/pkg/app/run.go b/pkg/app/run.go index f339ac96..a69e4a15 100644 --- a/pkg/app/run.go +++ b/pkg/app/run.go @@ -42,19 +42,30 @@ func (r *Run) askForConfirmation(msg string) bool { return AskForConfirmation(msg) } -func (r *Run) prepareChartsIfNeeded(helmfileCommand string, dir string, concurrency int, opts state.ChartPrepareOptions) (map[state.PrepareChartKey]string, error) { +// chartLockReleaser is a function that releases all chart locks. +// It should be called after helm operations complete. +type chartLockReleaser func() + +func (r *Run) prepareChartsIfNeeded(helmfileCommand string, dir string, concurrency int, opts state.ChartPrepareOptions) (map[state.PrepareChartKey]string, chartLockReleaser, error) { // Skip chart preparation for certain commands skipCommands := []string{"write-values", "list"} if slices.Contains(skipCommands, strings.ToLower(helmfileCommand)) { - return nil, nil + return nil, func() {}, nil } - releaseToChart, errs := r.state.PrepareCharts(r.helm, dir, concurrency, helmfileCommand, opts) + releaseToChart, chartLocks, errs := r.state.PrepareCharts(r.helm, dir, concurrency, helmfileCommand, opts) if len(errs) > 0 { - return nil, fmt.Errorf("%v", errs) + return nil, func() {}, fmt.Errorf("%v", errs) } - return releaseToChart, nil + // Return a releaser function that releases all acquired locks + releaser := func() { + for _, lock := range chartLocks { + lock.Release(r.state.Logger()) + } + } + + return releaseToChart, releaser, nil } func (r *Run) withPreparedCharts(helmfileCommand string, opts state.ChartPrepareOptions, f func()) error { @@ -93,10 +104,13 @@ func (r *Run) withPreparedCharts(helmfileCommand string, opts state.ChartPrepare return err } - releaseToChart, err := r.prepareChartsIfNeeded(helmfileCommand, dir, opts.Concurrency, opts) + releaseToChart, releaseLocks, err := r.prepareChartsIfNeeded(helmfileCommand, dir, opts.Concurrency, opts) if err != nil { return err } + // Release all chart locks after helm operations complete. + // This ensures charts are not deleted while helm is using them. + defer releaseLocks() for i := range r.state.Releases { rel := &r.state.Releases[i] diff --git a/pkg/state/oci_chart_lock_test.go b/pkg/state/oci_chart_lock_test.go index f3ef7de4..8f42471b 100644 --- a/pkg/state/oci_chart_lock_test.go +++ b/pkg/state/oci_chart_lock_test.go @@ -175,6 +175,262 @@ func TestOCIChartFileLock(t *testing.T) { }) } +// TestOCIChartSharedExclusiveLocks tests the reader-writer lock pattern +// where shared locks allow concurrent reads and exclusive locks are used for writes. +func TestOCIChartSharedExclusiveLocks(t *testing.T) { + t.Run("multiple shared locks can be acquired simultaneously", func(t *testing.T) { + tempDir, err := os.MkdirTemp("", "helmfile-shared-lock-test-*") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + lockFilePath := filepath.Join(tempDir, "test-chart.lock") + + // Number of concurrent readers + numReaders := 5 + var wg sync.WaitGroup + var activeReaders atomic.Int32 + var maxConcurrentReaders atomic.Int32 + + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + fileLock := flock.New(lockFilePath) + // Acquire shared (read) lock + err := fileLock.RLock() + require.NoError(t, err) + + // Track how many readers are active simultaneously + current := activeReaders.Add(1) + + // Update max concurrent readers + for { + max := maxConcurrentReaders.Load() + if current <= max { + break + } + if maxConcurrentReaders.CompareAndSwap(max, current) { + break + } + } + + // Simulate reading while holding shared lock + time.Sleep(50 * time.Millisecond) + + activeReaders.Add(-1) + err = fileLock.Unlock() + require.NoError(t, err) + }() + } + + wg.Wait() + + // Multiple readers should have been active at the same time + require.Greater(t, maxConcurrentReaders.Load(), int32(1), + "multiple shared locks should be held concurrently") + }) + + t.Run("exclusive lock blocks shared locks", func(t *testing.T) { + tempDir, err := os.MkdirTemp("", "helmfile-excl-block-test-*") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + lockFilePath := filepath.Join(tempDir, "test-chart.lock") + + // First, acquire exclusive lock + writerLock := flock.New(lockFilePath) + err = writerLock.Lock() + require.NoError(t, err) + + // Try to acquire shared lock with timeout - should fail + readerResult := make(chan bool) + go func() { + readerLock := flock.New(lockFilePath) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + locked, err := readerLock.TryRLockContext(ctx, 10*time.Millisecond) + if err != nil { + readerResult <- false + return + } + if locked { + readerLock.Unlock() + } + readerResult <- locked + }() + + select { + case acquired := <-readerResult: + require.False(t, acquired, "shared lock should not be acquired while exclusive lock is held") + case <-time.After(2 * time.Second): + t.Fatal("test timed out") + } + + // Release exclusive lock + err = writerLock.Unlock() + require.NoError(t, err) + }) + + t.Run("shared locks block exclusive lock", func(t *testing.T) { + tempDir, err := os.MkdirTemp("", "helmfile-shared-block-test-*") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + lockFilePath := filepath.Join(tempDir, "test-chart.lock") + + // First, acquire shared lock + readerLock := flock.New(lockFilePath) + err = readerLock.RLock() + require.NoError(t, err) + + // Try to acquire exclusive lock with timeout - should fail + writerResult := make(chan bool) + go func() { + writerLock := flock.New(lockFilePath) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + locked, err := writerLock.TryLockContext(ctx, 10*time.Millisecond) + if err != nil { + writerResult <- false + return + } + if locked { + writerLock.Unlock() + } + writerResult <- locked + }() + + select { + case acquired := <-writerResult: + require.False(t, acquired, "exclusive lock should not be acquired while shared lock is held") + case <-time.After(2 * time.Second): + t.Fatal("test timed out") + } + + // Release shared lock + err = readerLock.Unlock() + require.NoError(t, err) + }) + + t.Run("exclusive lock acquired after shared lock released", func(t *testing.T) { + tempDir, err := os.MkdirTemp("", "helmfile-lock-release-test-*") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + lockFilePath := filepath.Join(tempDir, "test-chart.lock") + + // Acquire shared lock + readerLock := flock.New(lockFilePath) + err = readerLock.RLock() + require.NoError(t, err) + + // Start writer waiting for lock + writerDone := make(chan bool) + go func() { + writerLock := flock.New(lockFilePath) + // Use longer timeout + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + locked, err := writerLock.TryLockContext(ctx, 10*time.Millisecond) + if err == nil && locked { + writerLock.Unlock() + writerDone <- true + return + } + writerDone <- false + }() + + // Release shared lock after a short delay + time.Sleep(50 * time.Millisecond) + err = readerLock.Unlock() + require.NoError(t, err) + + // Writer should now succeed + select { + case success := <-writerDone: + require.True(t, success, "writer should acquire lock after reader releases") + case <-time.After(3 * time.Second): + t.Fatal("test timed out waiting for writer") + } + }) +} + +// TestChartLockResultRelease tests the Release method of chartLockResult +// handles both shared and exclusive locks correctly. +func TestChartLockResultRelease(t *testing.T) { + t.Run("release exclusive lock", func(t *testing.T) { + tempDir, err := os.MkdirTemp("", "helmfile-release-excl-*") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + lockFilePath := filepath.Join(tempDir, "test.lock") + fileLock := flock.New(lockFilePath) + err = fileLock.Lock() + require.NoError(t, err) + + rwMutex := &sync.RWMutex{} + rwMutex.Lock() + + result := &chartLockResult{ + fileLock: fileLock, + inProcessMutex: rwMutex, + isExclusive: true, + chartPath: lockFilePath, + } + + // Release should not panic + result.Release(nil) + + // Verify lock is released - should be able to acquire again + fileLock2 := flock.New(lockFilePath) + locked, err := fileLock2.TryLock() + require.NoError(t, err) + require.True(t, locked, "should be able to acquire lock after release") + fileLock2.Unlock() + }) + + t.Run("release shared lock", func(t *testing.T) { + tempDir, err := os.MkdirTemp("", "helmfile-release-shared-*") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + lockFilePath := filepath.Join(tempDir, "test.lock") + fileLock := flock.New(lockFilePath) + err = fileLock.RLock() + require.NoError(t, err) + + rwMutex := &sync.RWMutex{} + rwMutex.RLock() + + result := &chartLockResult{ + fileLock: fileLock, + inProcessMutex: rwMutex, + isExclusive: false, + chartPath: lockFilePath, + } + + // Release should not panic + result.Release(nil) + + // Verify lock is released - should be able to acquire exclusive lock + fileLock2 := flock.New(lockFilePath) + locked, err := fileLock2.TryLock() + require.NoError(t, err) + require.True(t, locked, "should be able to acquire exclusive lock after shared release") + fileLock2.Unlock() + }) + + t.Run("release nil result is safe", func(t *testing.T) { + var result *chartLockResult + // Should not panic + result.Release(nil) + }) +} + // TestOCIChartDoubleCheckLocking verifies the double-check locking pattern // works correctly to avoid unnecessary work when the cache is populated // by another process while waiting for the lock. diff --git a/pkg/state/state.go b/pkg/state/state.go index 969a2092..9240d7a0 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -135,6 +135,12 @@ type HelmState struct { RenderedValues map[string]any } +// Logger returns the logger instance for this HelmState. +// This is used by external callers that need to pass the logger to lock release functions. +func (st *HelmState) Logger() *zap.SugaredLogger { + return st.logger +} + // SubHelmfileSpec defines the subhelmfile path and options type SubHelmfileSpec struct { //path or glob pattern for the sub helmfiles @@ -1303,6 +1309,7 @@ type chartPrepareResult struct { buildDeps bool skipRefresh bool chartFetchedByGoGetter bool + chartLock *chartLockResult // Lock to be released after helm operation completes } func (st *HelmState) GetRepositoryAndNameFromChartName(chartName string) (*RepositorySpec, string) { @@ -1319,17 +1326,19 @@ func (st *HelmState) GetRepositoryAndNameFromChartName(chartName string) (*Repos return nil, chartName } -var mutexMap sync.Map +var rwMutexMap sync.Map -// retrieves or creates a sync.Mutex for a given name -func (st *HelmState) getNamedMutex(name string) *sync.Mutex { - mu, ok := mutexMap.Load(name) +// getNamedRWMutex retrieves or creates a sync.RWMutex for a given name. +// This is used for reader-writer locking where multiple readers can access +// the resource simultaneously, but writers need exclusive access. +func (st *HelmState) getNamedRWMutex(name string) *sync.RWMutex { + mu, ok := rwMutexMap.Load(name) if ok { - return mu.(*sync.Mutex) + return mu.(*sync.RWMutex) } - newMu := &sync.Mutex{} - actualMu, _ := mutexMap.LoadOrStore(name, newMu) - return actualMu.(*sync.Mutex) + newMu := &sync.RWMutex{} + actualMu, _ := rwMutexMap.LoadOrStore(name, newMu) + return actualMu.(*sync.RWMutex) } type PrepareChartKey struct { @@ -1452,31 +1461,41 @@ func (st *HelmState) processLocalChart(normalizedChart, dir string, release *Rel return chartPath, nil } -// forcedDownloadChart handles forced chart downloads -func (st *HelmState) forcedDownloadChart(chartName, dir string, release *ReleaseSpec, helm helmexec.Interface, opts ChartPrepareOptions) (string, error) { +// forcedDownloadChart handles forced chart downloads. +// It returns the chart path and a lock that MUST be released by the caller after helm operations complete. +// The lock ensures the chart won't be deleted while helm is using it. +func (st *HelmState) forcedDownloadChart(chartName, dir string, release *ReleaseSpec, helm helmexec.Interface, opts ChartPrepareOptions) (string, *chartLockResult, error) { // Check global chart cache first for non-OCI charts cacheKey := st.getChartCacheKey(release) if cachedPath, exists := st.checkChartCache(cacheKey); exists && st.fs.DirectoryExistsAt(cachedPath) { st.logger.Debugf("Chart %s:%s already downloaded, using cached version at %s", chartName, release.Version, cachedPath) - return cachedPath, nil + // Still need to acquire a shared lock to prevent deletion while using + chartPath, err := generateChartPath(chartName, dir, release, opts.OutputDirTemplate) + if err != nil { + return "", nil, err + } + lockResult, lockErr := st.acquireChartLock(chartPath, opts) + if lockErr != nil { + return "", nil, lockErr + } + return cachedPath, lockResult, nil } chartPath, err := generateChartPath(chartName, dir, release, opts.OutputDirTemplate) if err != nil { - return "", err + return "", nil, err } // Acquire locks and determine action lockResult, err := st.acquireChartLock(chartPath, opts) if err != nil { - return "", err + return "", nil, err } - defer lockResult.release(st.logger) - // If cached, return immediately + // If cached, return with lock held if lockResult.action == chartActionUseCached { st.addToChartCache(cacheKey, lockResult.cachedPath) - return lockResult.cachedPath, nil + return lockResult.cachedPath, lockResult, nil } // Download the chart @@ -1484,12 +1503,10 @@ func (st *HelmState) forcedDownloadChart(chartName, dir string, release *Release fetchFlags = st.appendChartVersionFlags(fetchFlags, release) fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath) if err := helm.Fetch(chartName, fetchFlags...); err != nil { - return "", err + lockResult.Release(st.logger) + return "", nil, err } - // Create refresh marker if needed - lockResult.createRefreshMarker(st.logger) - // Set chartPath to be the path containing Chart.yaml, if found fullChartPath, err := findChartDirectory(chartPath) if err == nil { @@ -1499,7 +1516,7 @@ func (st *HelmState) forcedDownloadChart(chartName, dir string, release *Release // Add to global chart cache st.addToChartCache(cacheKey, chartPath) - return chartPath, nil + return chartPath, lockResult, nil } // prepareChartForRelease processes a single release and prepares its chart @@ -1519,6 +1536,9 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec. chartName := release.Chart + // Track lock acquired during chart preparation - will be released after helm operations + var chartLock *chartLockResult + chartPath, err := st.downloadChartWithGoGetter(release) if err != nil { return &chartPrepareResult{err: fmt.Errorf("release %q: %w", release.Name, err)} @@ -1526,13 +1546,14 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec. chartFetchedByGoGetter := chartPath != chartName if !chartFetchedByGoGetter { - ociChartPath, err := st.getOCIChart(release, dir, helm, opts) + ociChartPath, lock, err := st.getOCIChart(release, dir, helm, opts) if err != nil { return &chartPrepareResult{err: fmt.Errorf("release %q: %w", release.Name, err)} } if ociChartPath != nil { chartPath = *ociChartPath + chartLock = lock } } @@ -1591,10 +1612,12 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec. // For helm 2, we `helm fetch` with the version flags and call `helm template` // WITHOUT the version flags. } else { - chartPath, err = st.forcedDownloadChart(chartName, dir, release, helm, opts) + var lock *chartLockResult + chartPath, lock, err = st.forcedDownloadChart(chartName, dir, release, helm, opts) if err != nil { return &chartPrepareResult{err: err} } + chartLock = lock } return &chartPrepareResult{ @@ -1606,20 +1629,24 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec. buildDeps: buildDeps, skipRefresh: skipRefresh, chartFetchedByGoGetter: chartFetchedByGoGetter, + chartLock: chartLock, } } -func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurrency int, helmfileCommand string, opts ChartPrepareOptions) (map[PrepareChartKey]string, []error) { +// PrepareCharts downloads and prepares all charts for the selected releases. +// Returns the chart paths, the chart locks (which MUST be released by the caller after helm operations), +// and any errors encountered. +func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurrency int, helmfileCommand string, opts ChartPrepareOptions) (map[PrepareChartKey]string, map[PrepareChartKey]*chartLockResult, []error) { if !opts.SkipResolve { updated, err := st.ResolveDeps() if err != nil { - return nil, []error{err} + return nil, nil, []error{err} } *st = *updated } selected, err := st.GetSelectedReleases(opts.IncludeTransitiveNeeds) if err != nil { - return nil, []error{err} + return nil, nil, []error{err} } releases := releasesNeedCharts(selected) @@ -1633,6 +1660,7 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre var prepareChartInfoMutex sync.Mutex prepareChartInfo := make(map[PrepareChartKey]string, len(releases)) + chartLocks := make(map[PrepareChartKey]*chartLockResult, len(releases)) errs := []error{} @@ -1662,17 +1690,24 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre if downloadRes.err != nil { errs = append(errs, downloadRes.err) - + // Release any lock that was acquired before error + if downloadRes.chartLock != nil { + downloadRes.chartLock.Release(st.logger) + } return } func() { prepareChartInfoMutex.Lock() defer prepareChartInfoMutex.Unlock() - prepareChartInfo[PrepareChartKey{ + key := PrepareChartKey{ Namespace: downloadRes.releaseNamespace, KubeContext: downloadRes.releaseContext, Name: downloadRes.releaseName, - }] = downloadRes.chartPath + } + prepareChartInfo[key] = downloadRes.chartPath + if downloadRes.chartLock != nil { + chartLocks[key] = downloadRes.chartLock + } }() if downloadRes.buildDeps { @@ -1683,16 +1718,24 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre ) if len(errs) > 0 { - return nil, errs + // Release all acquired locks on error + for _, lock := range chartLocks { + lock.Release(st.logger) + } + return nil, nil, errs } if len(builds) > 0 { if err := st.runHelmDepBuilds(helm, concurrency, builds); err != nil { - return nil, []error{err} + // Release all acquired locks on error + for _, lock := range chartLocks { + lock.Release(st.logger) + } + return nil, nil, []error{err} } } - return prepareChartInfo, nil + return prepareChartInfo, chartLocks, nil } // nolint: unparam @@ -4374,48 +4417,50 @@ const ( chartActionRefresh // Delete and re-download (refresh) ) -// chartLockResult contains the result of acquiring locks and checking cache +// chartLockResult contains the result of acquiring locks and checking cache. +// It supports both shared (read) and exclusive (write) locks using reader-writer semantics. +// Multiple processes can hold shared locks simultaneously, but exclusive locks block all others. type chartLockResult struct { - action chartDownloadAction - cachedPath string // Path to use if action is chartActionUseCached - fileLock *flock.Flock // File lock to release after operation - inProcessMutex *sync.Mutex // In-process mutex to release after operation - refreshMarkerPath string // Path to refresh marker file - needsRefresh bool // Whether refresh mode is enabled + action chartDownloadAction + cachedPath string // Path to use if action is chartActionUseCached + fileLock *flock.Flock // File lock for cross-process synchronization + inProcessMutex *sync.RWMutex // In-process RWMutex for goroutine coordination + isExclusive bool // True if holding exclusive lock, false if shared + chartPath string // The chart path this lock is for (used for logging) } -// release releases all acquired locks -func (r *chartLockResult) release(logger *zap.SugaredLogger) { +// Release releases all acquired locks (both file and in-process). +// This method is safe to call even if locks were not acquired. +func (r *chartLockResult) Release(logger *zap.SugaredLogger) { + if r == nil { + return + } if r.inProcessMutex != nil { - r.inProcessMutex.Unlock() + if r.isExclusive { + r.inProcessMutex.Unlock() + } else { + r.inProcessMutex.RUnlock() + } } if r.fileLock != nil { if err := r.fileLock.Unlock(); err != nil { - logger.Warnf("Failed to release file lock: %v", err) + logger.Warnf("Failed to release file lock for %s: %v", r.chartPath, err) } } } -// createRefreshMarker creates the refresh marker file if in refresh mode -func (r *chartLockResult) createRefreshMarker(logger *zap.SugaredLogger) { - if r.needsRefresh && r.refreshMarkerPath != "" { - if markerFile, err := os.Create(r.refreshMarkerPath); err == nil { - if err := markerFile.Close(); err != nil { - logger.Warnf("Failed to close refresh marker file %s: %v", r.refreshMarkerPath, err) - } - } - } -} - -// acquireChartLock acquires file and in-process locks, then determines what action to take -// based on cache state and refresh mode. The caller MUST call result.release() when done. +// acquireChartLock acquires file and in-process locks using reader-writer semantics. +// - Shared (read) lock: Multiple processes can use the cached chart simultaneously +// - Exclusive (write) lock: Only one process can refresh (delete + download) at a time +// +// The caller MUST call result.Release() when done with the chart (after helm operations complete). +// Locks should be held for the entire duration of chart usage, not just during download. func (st *HelmState) acquireChartLock(chartPath string, opts ChartPrepareOptions) (*chartLockResult, error) { result := &chartLockResult{ - refreshMarkerPath: chartPath + ".refreshed", - needsRefresh: !opts.SkipDeps && !opts.SkipRefresh, + chartPath: chartPath, } - // Acquire filesystem-level lock for cross-process synchronization + // Ensure lock directory exists lockFilePath := chartPath + ".lock" lockFileDir := filepath.Dir(lockFilePath) if err := os.MkdirAll(lockFileDir, 0755); err != nil { @@ -4423,81 +4468,57 @@ func (st *HelmState) acquireChartLock(chartPath string, opts ChartPrepareOptions } result.fileLock = flock.New(lockFilePath) - // Retry logic with timeout for lock acquisition - const ( - lockTimeout = 5 * time.Minute - maxRetries = 3 - retryBackoff = 2 * time.Second - ) + needsRefresh := !opts.SkipDeps && !opts.SkipRefresh - var locked bool - var lockErr error - for attempt := 1; attempt <= maxRetries; attempt++ { - ctx, cancel := context.WithTimeout(context.Background(), lockTimeout) - locked, lockErr = result.fileLock.TryLockContext(ctx, 500*time.Millisecond) - cancel() - - if locked { - break - } - if lockErr != nil { - st.logger.Warnf("Failed to acquire file lock (attempt %d/%d): %v", attempt, maxRetries, lockErr) - if attempt < maxRetries { - time.Sleep(retryBackoff) - } + // Optimistic path: Try shared lock first if cache likely exists and no refresh needed + if st.fs.DirectoryExistsAt(chartPath) && !needsRefresh { + if err := st.acquireSharedLock(result, chartPath); err != nil { + // Failed to acquire shared lock, will try exclusive below + st.logger.Debugf("Failed to acquire shared lock for %s: %v, trying exclusive", chartPath, err) } else { - return nil, fmt.Errorf("timeout waiting for file lock on chart %s after %v", chartPath, lockTimeout) + // Got shared lock, validate cache + if fullChartPath, err := findChartDirectory(chartPath); err == nil { + result.action = chartActionUseCached + result.cachedPath = filepath.Dir(fullChartPath) + st.logger.Debugf("Using cached chart at %s (shared lock)", result.cachedPath) + return result, nil + } + // Invalid cache - release shared lock and acquire exclusive + st.logger.Debugf("Chart cache at %s is invalid, upgrading to exclusive lock", chartPath) + result.Release(st.logger) } } - if !locked { - return nil, fmt.Errorf("failed to acquire file lock for chart %s after %d attempts: %w", chartPath, maxRetries, lockErr) + + // Need exclusive lock for: download, refresh, or cache validation failure + if err := st.acquireExclusiveLock(result, chartPath); err != nil { + return nil, err } - // Acquire in-process mutex for goroutine coordination - result.inProcessMutex = st.getNamedMutex(chartPath) - result.inProcessMutex.Lock() - - // Determine action based on cache state and refresh mode - const refreshMarkerMaxAge = 60 * time.Second - + // Double-check after acquiring exclusive lock (another process may have populated cache) if st.fs.DirectoryExistsAt(chartPath) { fullChartPath, err := findChartDirectory(chartPath) if err == nil { - // Valid chart found - if result.needsRefresh { - // Check if another process already refreshed in this session - if markerInfo, markerErr := os.Stat(result.refreshMarkerPath); markerErr == nil { - markerAge := time.Since(markerInfo.ModTime()) - if markerAge < refreshMarkerMaxAge { - // Fresh marker - use cached - result.action = chartActionUseCached - result.cachedPath = filepath.Dir(fullChartPath) - st.logger.Debugf("Using cached chart at %s (refreshed %.1fs ago by another process)", result.cachedPath, markerAge.Seconds()) - return result, nil - } - // Stale marker - remove and refresh - st.logger.Debugf("Refresh marker at %s is stale (%.1fs old), will refresh", result.refreshMarkerPath, markerAge.Seconds()) - _ = os.Remove(result.refreshMarkerPath) // Best-effort cleanup, ignore error - } - // No fresh marker - need to refresh - st.logger.Debugf("Refreshing chart at %s (first process in refresh mode)", chartPath) - if err := os.RemoveAll(chartPath); err != nil { - result.release(st.logger) - return nil, err - } - result.action = chartActionRefresh - } else { - // Not refresh mode - use cached + // Valid cache exists now + if !needsRefresh { + // Another process populated the cache while we waited for lock result.action = chartActionUseCached result.cachedPath = filepath.Dir(fullChartPath) - st.logger.Debugf("Using cached chart at %s", result.cachedPath) + st.logger.Debugf("Using cached chart at %s (populated by another process)", result.cachedPath) + return result, nil } - } else { - // Directory exists but no valid Chart.yaml - corrupted - st.logger.Debugf("Chart directory exists but no valid Chart.yaml at %s: %v, will re-download", chartPath, err) + // Refresh mode: delete and re-download + st.logger.Debugf("Refreshing chart at %s (exclusive lock)", chartPath) if err := os.RemoveAll(chartPath); err != nil { - result.release(st.logger) - return nil, err + result.Release(st.logger) + return nil, fmt.Errorf("failed to remove chart directory for refresh: %w", err) + } + result.action = chartActionRefresh + } else { + // Directory exists but invalid (no Chart.yaml) - corrupted cache + st.logger.Debugf("Chart directory at %s is corrupted: %v, will re-download", chartPath, err) + if err := os.RemoveAll(chartPath); err != nil { + result.Release(st.logger) + return nil, fmt.Errorf("failed to remove corrupted chart directory: %w", err) } result.action = chartActionDownload } @@ -4508,21 +4529,97 @@ func (st *HelmState) acquireChartLock(chartPath string, opts ChartPrepareOptions return result, nil } -func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helmexec.Interface, opts ChartPrepareOptions) (*string, error) { +// acquireSharedLock acquires a shared (read) lock for concurrent read access. +// Multiple processes can hold shared locks simultaneously, but shared locks +// block exclusive lock acquisition. +func (st *HelmState) acquireSharedLock(result *chartLockResult, chartPath string) error { + const lockTimeout = 5 * time.Minute + + ctx, cancel := context.WithTimeout(context.Background(), lockTimeout) + defer cancel() + + locked, err := result.fileLock.TryRLockContext(ctx, 500*time.Millisecond) + if err != nil { + return fmt.Errorf("failed to acquire shared file lock: %w", err) + } + if !locked { + return fmt.Errorf("timeout waiting for shared file lock on chart %s", chartPath) + } + + // Also acquire in-process RLock + result.inProcessMutex = st.getNamedRWMutex(chartPath) + result.inProcessMutex.RLock() + result.isExclusive = false + + return nil +} + +// acquireExclusiveLock acquires an exclusive (write) lock with retry logic. +// Exclusive locks block all other locks (both shared and exclusive). +func (st *HelmState) acquireExclusiveLock(result *chartLockResult, chartPath string) error { + const ( + lockTimeout = 5 * time.Minute + maxRetries = 3 + retryBackoff = 2 * time.Second + ) + + var locked bool + var lockErr error + + for attempt := 1; attempt <= maxRetries; attempt++ { + ctx, cancel := context.WithTimeout(context.Background(), lockTimeout) + locked, lockErr = result.fileLock.TryLockContext(ctx, 500*time.Millisecond) + cancel() + + if locked { + break + } + if lockErr != nil { + st.logger.Warnf("Failed to acquire exclusive file lock (attempt %d/%d): %v", attempt, maxRetries, lockErr) + if attempt < maxRetries { + time.Sleep(retryBackoff) + } + } else { + return fmt.Errorf("timeout waiting for exclusive file lock on chart %s after %v", chartPath, lockTimeout) + } + } + + if !locked { + return fmt.Errorf("failed to acquire exclusive file lock for chart %s after %d attempts: %w", chartPath, maxRetries, lockErr) + } + + // Also acquire in-process Lock + result.inProcessMutex = st.getNamedRWMutex(chartPath) + result.inProcessMutex.Lock() + result.isExclusive = true + + return nil +} + +// getOCIChart downloads or retrieves an OCI chart from cache. +// It returns the chart path and a lock that MUST be released by the caller after helm operations complete. +// The lock ensures the chart won't be deleted while helm is using it. +func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helmexec.Interface, opts ChartPrepareOptions) (*string, *chartLockResult, error) { qualifiedChartName, chartName, chartVersion, err := st.getOCIQualifiedChartName(release) if err != nil { - return nil, err + return nil, nil, err } if qualifiedChartName == "" { - return nil, nil + return nil, nil, nil } - // Check global chart cache first + // Check global chart cache first (in-memory cache, no lock needed) cacheKey := st.getChartCacheKey(release) if cachedPath, exists := st.checkChartCache(cacheKey); exists && st.fs.DirectoryExistsAt(cachedPath) { st.logger.Debugf("OCI chart %s:%s already downloaded, using cached version at %s", chartName, chartVersion, cachedPath) - return &cachedPath, nil + // Still need to acquire a shared lock to prevent deletion while using + chartPath, _ := st.getOCIChartPath(tempDir, release, chartName, chartVersion, opts.OutputDirTemplate) + lockResult, lockErr := st.acquireChartLock(chartPath, opts) + if lockErr != nil { + return nil, nil, lockErr + } + return &cachedPath, lockResult, nil } if opts.OutputDirTemplate == "" { @@ -4534,20 +4631,20 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm // Acquire locks and determine action lockResult, err := st.acquireChartLock(chartPath, opts) if err != nil { - return nil, err + return nil, nil, err } - defer lockResult.release(st.logger) - // If cached, return immediately + // If cached, return with lock held if lockResult.action == chartActionUseCached { st.addToChartCache(cacheKey, lockResult.cachedPath) - return &lockResult.cachedPath, nil + return &lockResult.cachedPath, lockResult, nil } // Ensure temp directory exists if _, err := os.Stat(tempDir); err != nil { if err := os.MkdirAll(tempDir, 0755); err != nil { - return nil, err + lockResult.Release(st.logger) + return nil, nil, err } } @@ -4559,19 +4656,19 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm flags = st.appendChartVersionFlags(flags, release) if err := helm.ChartPull(qualifiedChartName, chartPath, flags...); err != nil { - return nil, err + lockResult.Release(st.logger) + return nil, nil, err } if err := helm.ChartExport(qualifiedChartName, chartPath); err != nil { - return nil, err + lockResult.Release(st.logger) + return nil, nil, err } - // Create refresh marker if needed - lockResult.createRefreshMarker(st.logger) - fullChartPath, err := findChartDirectory(chartPath) if err != nil { - return nil, err + lockResult.Release(st.logger) + return nil, nil, err } chartPath = filepath.Dir(fullChartPath) @@ -4579,7 +4676,7 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm // Add to global chart cache st.addToChartCache(cacheKey, chartPath) - return &chartPath, nil + return &chartPath, lockResult, nil } // IsOCIChart returns true if the chart is an OCI chart