fix: resolve deadlock by releasing OCI chart locks immediately after download
This commit simplifies the OCI chart locking mechanism to fix deadlock issues that occurred when multiple releases shared the same chart. Problem: When multiple releases used the same OCI chart, workers in PrepareCharts would deadlock because: 1. Worker 1 acquires lock for chart/path, downloads chart 2. Worker 2 tries to acquire lock on same path, blocks waiting 3. PrepareCharts waits for all workers to complete 4. Worker 1's lock held until PrepareCharts finishes -> deadlock Solution: Release locks immediately after chart download completes. This is safe because: - The tempDir cleanup is deferred until after helm operations complete in withPreparedCharts(), so charts won't be deleted mid-use - The in-memory chart cache prevents redundant downloads within a process - Cross-process coordination via file locks still works during download Changes: - Remove chartLock field from chartPrepareResult struct - Release locks immediately in getOCIChart() and forcedDownloadChart() - Simplify PrepareCharts() by removing lock collection and release logic - Update function signatures to return only (path, error) This also fixes the "signal: killed" test failures in CI for: - oci_chart_pull_direct - oci_chart_pull_once - oci_chart_pull_once2 Signed-off-by: Aditya Menon <amenon@canarytechnologies.com>
This commit is contained in:
parent
d1e87d074b
commit
bc83bbf2c3
|
|
@ -1309,7 +1309,6 @@ type chartPrepareResult struct {
|
|||
buildDeps bool
|
||||
skipRefresh bool
|
||||
chartFetchedByGoGetter bool
|
||||
chartLock *chartLockResult // Lock to be released after helm operation completes
|
||||
}
|
||||
|
||||
func (st *HelmState) GetRepositoryAndNameFromChartName(chartName string) (*RepositorySpec, string) {
|
||||
|
|
@ -1462,34 +1461,34 @@ func (st *HelmState) processLocalChart(normalizedChart, dir string, release *Rel
|
|||
}
|
||||
|
||||
// forcedDownloadChart handles forced chart downloads.
|
||||
// It returns the chart path and a lock that MUST be released by the caller after helm operations complete.
|
||||
// The lock ensures the chart won't be deleted while helm is using it.
|
||||
func (st *HelmState) forcedDownloadChart(chartName, dir string, release *ReleaseSpec, helm helmexec.Interface, opts ChartPrepareOptions) (string, *chartLockResult, error) {
|
||||
// Locks are acquired during download and released immediately after.
|
||||
func (st *HelmState) forcedDownloadChart(chartName, dir string, release *ReleaseSpec, helm helmexec.Interface, opts ChartPrepareOptions) (string, error) {
|
||||
// Check global chart cache first for non-OCI charts
|
||||
// If found, another worker in this process already downloaded and holds the lock.
|
||||
// We don't need to acquire another lock - the tempDir won't be deleted until
|
||||
// If found, another worker in this process already downloaded the chart.
|
||||
// We don't need to acquire a lock - the tempDir won't be deleted until
|
||||
// after helm operations complete (cleanup is deferred in withPreparedCharts).
|
||||
cacheKey := st.getChartCacheKey(release)
|
||||
if cachedPath, exists := st.checkChartCache(cacheKey); exists && st.fs.DirectoryExistsAt(cachedPath) {
|
||||
st.logger.Debugf("Chart %s:%s already downloaded, using cached version at %s", chartName, release.Version, cachedPath)
|
||||
return cachedPath, nil, nil
|
||||
return cachedPath, nil
|
||||
}
|
||||
|
||||
chartPath, err := generateChartPath(chartName, dir, release, opts.OutputDirTemplate)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Acquire locks and determine action
|
||||
lockResult, err := st.acquireChartLock(chartPath, opts)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return "", err
|
||||
}
|
||||
|
||||
// If cached, return with lock held
|
||||
// If cached, release lock and return
|
||||
if lockResult.action == chartActionUseCached {
|
||||
st.addToChartCache(cacheKey, lockResult.cachedPath)
|
||||
return lockResult.cachedPath, lockResult, nil
|
||||
lockResult.Release(st.logger)
|
||||
return lockResult.cachedPath, nil
|
||||
}
|
||||
|
||||
// Download the chart
|
||||
|
|
@ -1498,7 +1497,7 @@ func (st *HelmState) forcedDownloadChart(chartName, dir string, release *Release
|
|||
fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath)
|
||||
if err := helm.Fetch(chartName, fetchFlags...); err != nil {
|
||||
lockResult.Release(st.logger)
|
||||
return "", nil, err
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Set chartPath to be the path containing Chart.yaml, if found
|
||||
|
|
@ -1510,7 +1509,11 @@ func (st *HelmState) forcedDownloadChart(chartName, dir string, release *Release
|
|||
// Add to global chart cache
|
||||
st.addToChartCache(cacheKey, chartPath)
|
||||
|
||||
return chartPath, lockResult, nil
|
||||
// Release lock immediately after download - the tempDir cleanup is deferred
|
||||
// until after helm operations complete, so the chart won't be deleted mid-use.
|
||||
lockResult.Release(st.logger)
|
||||
|
||||
return chartPath, nil
|
||||
}
|
||||
|
||||
// prepareChartForRelease processes a single release and prepares its chart
|
||||
|
|
@ -1530,9 +1533,6 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec.
|
|||
|
||||
chartName := release.Chart
|
||||
|
||||
// Track lock acquired during chart preparation - will be released after helm operations
|
||||
var chartLock *chartLockResult
|
||||
|
||||
chartPath, err := st.downloadChartWithGoGetter(release)
|
||||
if err != nil {
|
||||
return &chartPrepareResult{err: fmt.Errorf("release %q: %w", release.Name, err)}
|
||||
|
|
@ -1540,14 +1540,13 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec.
|
|||
chartFetchedByGoGetter := chartPath != chartName
|
||||
|
||||
if !chartFetchedByGoGetter {
|
||||
ociChartPath, lock, err := st.getOCIChart(release, dir, helm, opts)
|
||||
ociChartPath, err := st.getOCIChart(release, dir, helm, opts)
|
||||
if err != nil {
|
||||
return &chartPrepareResult{err: fmt.Errorf("release %q: %w", release.Name, err)}
|
||||
}
|
||||
|
||||
if ociChartPath != nil {
|
||||
chartPath = *ociChartPath
|
||||
chartLock = lock
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1606,12 +1605,10 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec.
|
|||
// For helm 2, we `helm fetch` with the version flags and call `helm template`
|
||||
// WITHOUT the version flags.
|
||||
} else {
|
||||
var lock *chartLockResult
|
||||
chartPath, lock, err = st.forcedDownloadChart(chartName, dir, release, helm, opts)
|
||||
chartPath, err = st.forcedDownloadChart(chartName, dir, release, helm, opts)
|
||||
if err != nil {
|
||||
return &chartPrepareResult{err: err}
|
||||
}
|
||||
chartLock = lock
|
||||
}
|
||||
|
||||
return &chartPrepareResult{
|
||||
|
|
@ -1623,13 +1620,12 @@ func (st *HelmState) prepareChartForRelease(release *ReleaseSpec, helm helmexec.
|
|||
buildDeps: buildDeps,
|
||||
skipRefresh: skipRefresh,
|
||||
chartFetchedByGoGetter: chartFetchedByGoGetter,
|
||||
chartLock: chartLock,
|
||||
}
|
||||
}
|
||||
|
||||
// PrepareCharts downloads and prepares all charts for the selected releases.
|
||||
// Returns the chart paths, the chart locks (which MUST be released by the caller after helm operations),
|
||||
// and any errors encountered.
|
||||
// Returns the chart paths, any chart locks (for backward compatibility, always empty since locks
|
||||
// are released immediately after download), and any errors encountered.
|
||||
func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurrency int, helmfileCommand string, opts ChartPrepareOptions) (map[PrepareChartKey]string, map[PrepareChartKey]*chartLockResult, []error) {
|
||||
if !opts.SkipResolve {
|
||||
updated, err := st.ResolveDeps()
|
||||
|
|
@ -1684,10 +1680,6 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
|
|||
|
||||
if downloadRes.err != nil {
|
||||
errs = append(errs, downloadRes.err)
|
||||
// Release any lock that was acquired before error
|
||||
if downloadRes.chartLock != nil {
|
||||
downloadRes.chartLock.Release(st.logger)
|
||||
}
|
||||
return
|
||||
}
|
||||
func() {
|
||||
|
|
@ -1699,9 +1691,6 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
|
|||
Name: downloadRes.releaseName,
|
||||
}
|
||||
prepareChartInfo[key] = downloadRes.chartPath
|
||||
if downloadRes.chartLock != nil {
|
||||
chartLocks[key] = downloadRes.chartLock
|
||||
}
|
||||
}()
|
||||
|
||||
if downloadRes.buildDeps {
|
||||
|
|
@ -1712,19 +1701,11 @@ func (st *HelmState) PrepareCharts(helm helmexec.Interface, dir string, concurre
|
|||
)
|
||||
|
||||
if len(errs) > 0 {
|
||||
// Release all acquired locks on error
|
||||
for _, lock := range chartLocks {
|
||||
lock.Release(st.logger)
|
||||
}
|
||||
return nil, nil, errs
|
||||
}
|
||||
|
||||
if len(builds) > 0 {
|
||||
if err := st.runHelmDepBuilds(helm, concurrency, builds); err != nil {
|
||||
// Release all acquired locks on error
|
||||
for _, lock := range chartLocks {
|
||||
lock.Release(st.logger)
|
||||
}
|
||||
return nil, nil, []error{err}
|
||||
}
|
||||
}
|
||||
|
|
@ -4443,12 +4424,12 @@ func (r *chartLockResult) Release(logger *zap.SugaredLogger) {
|
|||
}
|
||||
}
|
||||
|
||||
// acquireChartLock acquires file and in-process locks using reader-writer semantics.
|
||||
// - Shared (read) lock: Multiple processes can use the cached chart simultaneously
|
||||
// - Exclusive (write) lock: Only one process can refresh (delete + download) at a time
|
||||
// acquireChartLock acquires file and in-process locks for chart download coordination.
|
||||
// - Shared (read) lock: Used when chart already exists in cache
|
||||
// - Exclusive (write) lock: Used when downloading or refreshing a chart
|
||||
//
|
||||
// The caller MUST call result.Release() when done with the chart (after helm operations complete).
|
||||
// Locks should be held for the entire duration of chart usage, not just during download.
|
||||
// IMPORTANT: Locks are released immediately after download completes.
|
||||
// The tempDir cleanup is deferred until after helm operations, so charts won't be deleted mid-use.
|
||||
func (st *HelmState) acquireChartLock(chartPath string, opts ChartPrepareOptions) (*chartLockResult, error) {
|
||||
result := &chartLockResult{
|
||||
chartPath: chartPath,
|
||||
|
|
@ -4526,37 +4507,45 @@ func (st *HelmState) acquireChartLock(chartPath string, opts ChartPrepareOptions
|
|||
// acquireSharedLock acquires a shared (read) lock for concurrent read access.
|
||||
// Multiple processes can hold shared locks simultaneously, but shared locks
|
||||
// block exclusive lock acquisition.
|
||||
// Lock order: in-process mutex first, then file lock (to prevent deadlock).
|
||||
func (st *HelmState) acquireSharedLock(result *chartLockResult, chartPath string) error {
|
||||
const lockTimeout = 5 * time.Minute
|
||||
const lockTimeout = 30 * time.Second
|
||||
|
||||
// Acquire in-process RLock FIRST (consistent ordering prevents deadlock)
|
||||
result.inProcessMutex = st.getNamedRWMutex(chartPath)
|
||||
result.inProcessMutex.RLock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), lockTimeout)
|
||||
defer cancel()
|
||||
|
||||
locked, err := result.fileLock.TryRLockContext(ctx, 500*time.Millisecond)
|
||||
if err != nil {
|
||||
result.inProcessMutex.RUnlock()
|
||||
return fmt.Errorf("failed to acquire shared file lock: %w", err)
|
||||
}
|
||||
if !locked {
|
||||
result.inProcessMutex.RUnlock()
|
||||
return fmt.Errorf("timeout waiting for shared file lock on chart %s", chartPath)
|
||||
}
|
||||
|
||||
// Also acquire in-process RLock
|
||||
result.inProcessMutex = st.getNamedRWMutex(chartPath)
|
||||
result.inProcessMutex.RLock()
|
||||
result.isExclusive = false
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// acquireExclusiveLock acquires an exclusive (write) lock with retry logic.
|
||||
// Exclusive locks block all other locks (both shared and exclusive).
|
||||
// Lock order: in-process mutex first, then file lock (to prevent deadlock).
|
||||
func (st *HelmState) acquireExclusiveLock(result *chartLockResult, chartPath string) error {
|
||||
const (
|
||||
lockTimeout = 5 * time.Minute
|
||||
lockTimeout = 30 * time.Second
|
||||
maxRetries = 3
|
||||
retryBackoff = 2 * time.Second
|
||||
retryBackoff = 1 * time.Second
|
||||
)
|
||||
|
||||
// Acquire in-process Lock FIRST (consistent ordering prevents deadlock)
|
||||
result.inProcessMutex = st.getNamedRWMutex(chartPath)
|
||||
result.inProcessMutex.Lock()
|
||||
|
||||
var locked bool
|
||||
var lockErr error
|
||||
|
||||
|
|
@ -4574,43 +4563,40 @@ func (st *HelmState) acquireExclusiveLock(result *chartLockResult, chartPath str
|
|||
time.Sleep(retryBackoff)
|
||||
}
|
||||
} else {
|
||||
result.inProcessMutex.Unlock()
|
||||
return fmt.Errorf("timeout waiting for exclusive file lock on chart %s after %v", chartPath, lockTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
if !locked {
|
||||
result.inProcessMutex.Unlock()
|
||||
return fmt.Errorf("failed to acquire exclusive file lock for chart %s after %d attempts: %w", chartPath, maxRetries, lockErr)
|
||||
}
|
||||
|
||||
// Also acquire in-process Lock
|
||||
result.inProcessMutex = st.getNamedRWMutex(chartPath)
|
||||
result.inProcessMutex.Lock()
|
||||
result.isExclusive = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getOCIChart downloads or retrieves an OCI chart from cache.
|
||||
// It returns the chart path and a lock that MUST be released by the caller after helm operations complete.
|
||||
// The lock ensures the chart won't be deleted while helm is using it.
|
||||
func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helmexec.Interface, opts ChartPrepareOptions) (*string, *chartLockResult, error) {
|
||||
// Locks are acquired during download and released immediately after.
|
||||
func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helmexec.Interface, opts ChartPrepareOptions) (*string, error) {
|
||||
qualifiedChartName, chartName, chartVersion, err := st.getOCIQualifiedChartName(release)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if qualifiedChartName == "" {
|
||||
return nil, nil, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Check global chart cache first (in-memory cache)
|
||||
// If found, another worker in this process already downloaded and holds the lock.
|
||||
// We don't need to acquire another lock - the tempDir won't be deleted until
|
||||
// If found, another worker in this process already downloaded the chart.
|
||||
// We don't need to acquire a lock - the tempDir won't be deleted until
|
||||
// after helm operations complete (cleanup is deferred in withPreparedCharts).
|
||||
cacheKey := st.getChartCacheKey(release)
|
||||
if cachedPath, exists := st.checkChartCache(cacheKey); exists && st.fs.DirectoryExistsAt(cachedPath) {
|
||||
st.logger.Debugf("OCI chart %s:%s already downloaded, using cached version at %s", chartName, chartVersion, cachedPath)
|
||||
return &cachedPath, nil, nil
|
||||
return &cachedPath, nil
|
||||
}
|
||||
|
||||
if opts.OutputDirTemplate == "" {
|
||||
|
|
@ -4622,20 +4608,21 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm
|
|||
// Acquire locks and determine action
|
||||
lockResult, err := st.acquireChartLock(chartPath, opts)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If cached, return with lock held
|
||||
// If cached, release lock and return
|
||||
if lockResult.action == chartActionUseCached {
|
||||
st.addToChartCache(cacheKey, lockResult.cachedPath)
|
||||
return &lockResult.cachedPath, lockResult, nil
|
||||
lockResult.Release(st.logger)
|
||||
return &lockResult.cachedPath, nil
|
||||
}
|
||||
|
||||
// Ensure temp directory exists
|
||||
if _, err := os.Stat(tempDir); err != nil {
|
||||
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||||
lockResult.Release(st.logger)
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -4648,18 +4635,18 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm
|
|||
|
||||
if err := helm.ChartPull(qualifiedChartName, chartPath, flags...); err != nil {
|
||||
lockResult.Release(st.logger)
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := helm.ChartExport(qualifiedChartName, chartPath); err != nil {
|
||||
lockResult.Release(st.logger)
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fullChartPath, err := findChartDirectory(chartPath)
|
||||
if err != nil {
|
||||
lockResult.Release(st.logger)
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
chartPath = filepath.Dir(fullChartPath)
|
||||
|
|
@ -4667,7 +4654,11 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm
|
|||
// Add to global chart cache
|
||||
st.addToChartCache(cacheKey, chartPath)
|
||||
|
||||
return &chartPath, lockResult, nil
|
||||
// Release lock immediately after download - the tempDir cleanup is deferred
|
||||
// until after helm operations complete, so the chart won't be deleted mid-use.
|
||||
lockResult.Release(st.logger)
|
||||
|
||||
return &chartPath, nil
|
||||
}
|
||||
|
||||
// IsOCIChart returns true if the chart is an OCI chart
|
||||
|
|
|
|||
Loading…
Reference in New Issue