fix: add cross-process locking and refresh coordination for chart downloads
This commit fixes race conditions and duplicate downloads when multiple helmfile processes run in parallel (e.g., helmfile.d with multiple release files). Changes: 1. OCI Registry Login URL Fix: - Extract registry host from URL before helm registry login - e.g., "account.dkr.ecr.region.amazonaws.com/charts" -> "account.dkr.ecr.region.amazonaws.com" - Fixes authentication issues with ECR and other registries 2. Cross-Process Chart Locking: - Add file-based locking (flock) to prevent race conditions - Serialize chart downloads across parallel helmfile processes - Applies to both OCI and non-OCI charts 3. Refresh Mode Coordination: - Use marker files with 60-second TTL to coordinate refresh - First process to acquire lock performs refresh, others use cache - Prevents duplicate downloads while still supporting --skip-refresh=false 4. Code Refactoring: - Extract common locking logic into acquireChartLock() helper - Reduce code duplication between forcedDownloadChart and getOCIChart - Add unit tests for OCI registry login and SyncRepos
This commit is contained in:
parent
83b4a8ffc7
commit
1c48256e13
|
|
@ -33,6 +33,7 @@ type DiffKey struct {
|
|||
type Helm struct {
|
||||
Charts []string
|
||||
Repo []string
|
||||
RegistryLoginHost string // Captures the host passed to RegistryLogin
|
||||
Releases []Release
|
||||
Deleted []Release
|
||||
Linted []Release
|
||||
|
|
@ -109,6 +110,7 @@ func (helm *Helm) UpdateRepo() error {
|
|||
return nil
|
||||
}
|
||||
func (helm *Helm) RegistryLogin(name, username, password, caFile, certFile, keyFile string, skipTLSVerify bool) error {
|
||||
helm.RegistryLoginHost = name
|
||||
return nil
|
||||
}
|
||||
func (helm *Helm) SyncRelease(context helmexec.HelmContext, name, chart, namespace string, flags ...string) error {
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package state
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
|
|
@ -20,6 +21,7 @@ import (
|
|||
"time"
|
||||
|
||||
"dario.cat/mergo"
|
||||
"github.com/gofrs/flock"
|
||||
"github.com/Masterminds/semver/v3"
|
||||
"github.com/helmfile/chartify"
|
||||
"github.com/helmfile/vals"
|
||||
|
|
@ -608,7 +610,10 @@ func (st *HelmState) SyncRepos(helm RepoUpdater, shouldSkip map[string]bool) ([]
|
|||
username, password := gatherUsernamePassword(repo.Name, repo.Username, repo.Password)
|
||||
var err error
|
||||
if repo.OCI {
|
||||
err = helm.RegistryLogin(repo.URL, username, password, repo.CaFile, repo.CertFile, repo.KeyFile, repo.SkipTLSVerify)
|
||||
// For OCI registries, we only need the registry host for login, not the full URL with chart path.
|
||||
// e.g., "123456789012.dkr.ecr.us-east-1.amazonaws.com/charts" -> "123456789012.dkr.ecr.us-east-1.amazonaws.com"
|
||||
registryHost := extractRegistryHost(repo.URL)
|
||||
err = helm.RegistryLogin(registryHost, username, password, repo.CaFile, repo.CertFile, repo.KeyFile, repo.SkipTLSVerify)
|
||||
} else {
|
||||
err = helm.AddRepo(repo.Name, repo.URL, repo.CaFile, repo.CertFile, repo.KeyFile, username, password, repo.Managed, repo.PassCredentials, repo.SkipTLSVerify)
|
||||
}
|
||||
|
|
@ -642,6 +647,26 @@ func gatherUsernamePassword(repoName string, username string, password string) (
|
|||
return user, pass
|
||||
}
|
||||
|
||||
// extractRegistryHost extracts just the registry host from an OCI repository URL.
|
||||
// For "helm registry login", we only need the registry host, not the full path including chart location.
|
||||
// Examples:
|
||||
// - "123456789012.dkr.ecr.us-west-2.amazonaws.com/charts" -> "123456789012.dkr.ecr.us-west-2.amazonaws.com"
|
||||
// - "ghcr.io/deliveryhero/helm-charts" -> "ghcr.io"
|
||||
// - "registry.example.com:5000/charts" -> "registry.example.com:5000"
|
||||
func extractRegistryHost(url string) string {
|
||||
// Remove any protocol prefix if present
|
||||
url = strings.TrimPrefix(url, "oci://")
|
||||
url = strings.TrimPrefix(url, "https://")
|
||||
url = strings.TrimPrefix(url, "http://")
|
||||
|
||||
// Find the first slash that separates the host from the path
|
||||
if idx := strings.Index(url, "/"); idx != -1 {
|
||||
return url[:idx]
|
||||
}
|
||||
// No path, return as-is
|
||||
return url
|
||||
}
|
||||
|
||||
type syncResult struct {
|
||||
errors []*ReleaseError
|
||||
}
|
||||
|
|
@ -1441,17 +1466,29 @@ func (st *HelmState) forcedDownloadChart(chartName, dir string, release *Release
|
|||
return "", err
|
||||
}
|
||||
|
||||
// only fetch chart if it is not already fetched
|
||||
if _, err := os.Stat(chartPath); os.IsNotExist(err) {
|
||||
var fetchFlags []string
|
||||
fetchFlags = st.appendChartVersionFlags(fetchFlags, release)
|
||||
fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath)
|
||||
if err := helm.Fetch(chartName, fetchFlags...); err != nil {
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
st.logger.Infof("\"%s\" has not been downloaded because the output directory \"%s\" already exists", chartName, chartPath)
|
||||
// Acquire locks and determine action
|
||||
lockResult, err := st.acquireChartLock(chartPath, opts)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer lockResult.release(st.logger)
|
||||
|
||||
// If cached, return immediately
|
||||
if lockResult.action == chartActionUseCached {
|
||||
st.addToChartCache(cacheKey, lockResult.cachedPath)
|
||||
return lockResult.cachedPath, nil
|
||||
}
|
||||
|
||||
// Download the chart
|
||||
var fetchFlags []string
|
||||
fetchFlags = st.appendChartVersionFlags(fetchFlags, release)
|
||||
fetchFlags = append(fetchFlags, "--untar", "--untardir", chartPath)
|
||||
if err := helm.Fetch(chartName, fetchFlags...); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Create refresh marker if needed
|
||||
lockResult.createRefreshMarker(st.logger)
|
||||
|
||||
// Set chartPath to be the path containing Chart.yaml, if found
|
||||
fullChartPath, err := findChartDirectory(chartPath)
|
||||
|
|
@ -4294,10 +4331,6 @@ type ChartCacheKey struct {
|
|||
var downloadedCharts = make(map[ChartCacheKey]string) // key -> chart path
|
||||
var downloadedChartsMutex sync.RWMutex
|
||||
|
||||
// Legacy OCI-specific cache (kept for backward compatibility)
|
||||
var downloadedOCICharts = make(map[string]bool)
|
||||
var downloadedOCIMutex sync.RWMutex
|
||||
|
||||
// getChartCacheKey creates a cache key for a chart and version
|
||||
func (st *HelmState) getChartCacheKey(release *ReleaseSpec) ChartCacheKey {
|
||||
version := release.Version
|
||||
|
|
@ -4326,6 +4359,153 @@ func (st *HelmState) addToChartCache(key ChartCacheKey, path string) {
|
|||
downloadedCharts[key] = path
|
||||
}
|
||||
|
||||
// chartDownloadAction represents what action should be taken after acquiring locks
|
||||
type chartDownloadAction int
|
||||
|
||||
const (
|
||||
chartActionUseCached chartDownloadAction = iota // Use the cached chart
|
||||
chartActionDownload // Download the chart
|
||||
chartActionRefresh // Delete and re-download (refresh)
|
||||
)
|
||||
|
||||
// chartLockResult contains the result of acquiring locks and checking cache
|
||||
type chartLockResult struct {
|
||||
action chartDownloadAction
|
||||
cachedPath string // Path to use if action is chartActionUseCached
|
||||
fileLock *flock.Flock // File lock to release after operation
|
||||
inProcessMutex *sync.Mutex // In-process mutex to release after operation
|
||||
refreshMarkerPath string // Path to refresh marker file
|
||||
needsRefresh bool // Whether refresh mode is enabled
|
||||
}
|
||||
|
||||
// release releases all acquired locks
|
||||
func (r *chartLockResult) release(logger *zap.SugaredLogger) {
|
||||
if r.inProcessMutex != nil {
|
||||
r.inProcessMutex.Unlock()
|
||||
}
|
||||
if r.fileLock != nil {
|
||||
if err := r.fileLock.Unlock(); err != nil {
|
||||
logger.Warnf("Failed to release file lock: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createRefreshMarker creates the refresh marker file if in refresh mode
|
||||
func (r *chartLockResult) createRefreshMarker(logger *zap.SugaredLogger) {
|
||||
if r.needsRefresh && r.refreshMarkerPath != "" {
|
||||
if markerFile, err := os.Create(r.refreshMarkerPath); err == nil {
|
||||
if err := markerFile.Close(); err != nil {
|
||||
logger.Warnf("Failed to close refresh marker file %s: %v", r.refreshMarkerPath, err)
|
||||
}
|
||||
logger.Debugf("Created refresh marker at %s", r.refreshMarkerPath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// acquireChartLock acquires file and in-process locks, then determines what action to take
|
||||
// based on cache state and refresh mode. The caller MUST call result.release() when done.
|
||||
func (st *HelmState) acquireChartLock(chartPath string, opts ChartPrepareOptions) (*chartLockResult, error) {
|
||||
result := &chartLockResult{
|
||||
refreshMarkerPath: chartPath + ".refreshed",
|
||||
needsRefresh: !opts.SkipDeps && !opts.SkipRefresh,
|
||||
}
|
||||
|
||||
// Acquire filesystem-level lock for cross-process synchronization
|
||||
lockFilePath := chartPath + ".lock"
|
||||
lockFileDir := filepath.Dir(lockFilePath)
|
||||
if err := os.MkdirAll(lockFileDir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create lock directory %s: %w", lockFileDir, err)
|
||||
}
|
||||
result.fileLock = flock.New(lockFilePath)
|
||||
|
||||
// Retry logic with timeout for lock acquisition
|
||||
const (
|
||||
lockTimeout = 5 * time.Minute
|
||||
maxRetries = 3
|
||||
retryBackoff = 2 * time.Second
|
||||
)
|
||||
|
||||
var locked bool
|
||||
var lockErr error
|
||||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||||
st.logger.Debugf("Attempting to acquire file lock for chart at %s (attempt %d/%d)", lockFilePath, attempt, maxRetries)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), lockTimeout)
|
||||
locked, lockErr = result.fileLock.TryLockContext(ctx, 500*time.Millisecond)
|
||||
cancel()
|
||||
|
||||
if locked {
|
||||
break
|
||||
}
|
||||
if lockErr != nil {
|
||||
st.logger.Warnf("Failed to acquire file lock (attempt %d/%d): %v", attempt, maxRetries, lockErr)
|
||||
if attempt < maxRetries {
|
||||
time.Sleep(retryBackoff)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("timeout waiting for file lock on chart %s after %v", chartPath, lockTimeout)
|
||||
}
|
||||
}
|
||||
if !locked {
|
||||
return nil, fmt.Errorf("failed to acquire file lock for chart %s after %d attempts: %w", chartPath, maxRetries, lockErr)
|
||||
}
|
||||
st.logger.Debugf("Acquired file lock for chart at %s", lockFilePath)
|
||||
|
||||
// Acquire in-process mutex for goroutine coordination
|
||||
result.inProcessMutex = st.getNamedMutex(chartPath)
|
||||
result.inProcessMutex.Lock()
|
||||
|
||||
// Determine action based on cache state and refresh mode
|
||||
const refreshMarkerMaxAge = 60 * time.Second
|
||||
|
||||
if st.fs.DirectoryExistsAt(chartPath) {
|
||||
fullChartPath, err := findChartDirectory(chartPath)
|
||||
if err == nil {
|
||||
// Valid chart found
|
||||
if result.needsRefresh {
|
||||
// Check if another process already refreshed in this session
|
||||
if markerInfo, markerErr := os.Stat(result.refreshMarkerPath); markerErr == nil {
|
||||
markerAge := time.Since(markerInfo.ModTime())
|
||||
if markerAge < refreshMarkerMaxAge {
|
||||
// Fresh marker - use cached
|
||||
result.action = chartActionUseCached
|
||||
result.cachedPath = filepath.Dir(fullChartPath)
|
||||
st.logger.Debugf("Using cached chart at %s (refreshed %.1fs ago by another process)", result.cachedPath, markerAge.Seconds())
|
||||
return result, nil
|
||||
}
|
||||
// Stale marker - remove and refresh
|
||||
st.logger.Debugf("Refresh marker at %s is stale (%.1fs old), will refresh", result.refreshMarkerPath, markerAge.Seconds())
|
||||
_ = os.Remove(result.refreshMarkerPath) // Best-effort cleanup, ignore error
|
||||
}
|
||||
// No fresh marker - need to refresh
|
||||
st.logger.Debugf("Refreshing chart at %s (first process in refresh mode)", chartPath)
|
||||
if err := os.RemoveAll(chartPath); err != nil {
|
||||
result.release(st.logger)
|
||||
return nil, err
|
||||
}
|
||||
result.action = chartActionRefresh
|
||||
} else {
|
||||
// Not refresh mode - use cached
|
||||
result.action = chartActionUseCached
|
||||
result.cachedPath = filepath.Dir(fullChartPath)
|
||||
st.logger.Debugf("Using cached chart at %s", result.cachedPath)
|
||||
}
|
||||
} else {
|
||||
// Directory exists but no valid Chart.yaml - corrupted
|
||||
st.logger.Debugf("Chart directory exists but no valid Chart.yaml at %s: %v, will re-download", chartPath, err)
|
||||
if err := os.RemoveAll(chartPath); err != nil {
|
||||
result.release(st.logger)
|
||||
return nil, err
|
||||
}
|
||||
result.action = chartActionDownload
|
||||
}
|
||||
} else {
|
||||
result.action = chartActionDownload
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helmexec.Interface, opts ChartPrepareOptions) (*string, error) {
|
||||
qualifiedChartName, chartName, chartVersion, err := st.getOCIQualifiedChartName(release)
|
||||
if err != nil {
|
||||
|
|
@ -4349,58 +4529,49 @@ func (st *HelmState) getOCIChart(release *ReleaseSpec, tempDir string, helm helm
|
|||
|
||||
chartPath, _ := st.getOCIChartPath(tempDir, release, chartName, chartVersion, opts.OutputDirTemplate)
|
||||
|
||||
mu := st.getNamedMutex(chartPath)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
_, err = os.Stat(tempDir)
|
||||
// Acquire locks and determine action
|
||||
lockResult, err := st.acquireChartLock(chartPath, opts)
|
||||
if err != nil {
|
||||
err = os.MkdirAll(tempDir, 0755)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer lockResult.release(st.logger)
|
||||
|
||||
// If cached, return immediately
|
||||
if lockResult.action == chartActionUseCached {
|
||||
st.addToChartCache(cacheKey, lockResult.cachedPath)
|
||||
return &lockResult.cachedPath, nil
|
||||
}
|
||||
|
||||
// Ensure temp directory exists
|
||||
if _, err := os.Stat(tempDir); err != nil {
|
||||
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
downloadedOCIMutex.RLock()
|
||||
alreadyDownloadedFlag := downloadedOCICharts[chartPath]
|
||||
downloadedOCIMutex.RUnlock()
|
||||
// Download the chart
|
||||
flags := st.chartOCIFlags(release)
|
||||
flags = st.appendVerifyFlags(flags, release)
|
||||
flags = st.appendKeyringFlags(flags, release)
|
||||
flags = st.appendChartDownloadFlags(flags, release)
|
||||
flags = st.appendChartVersionFlags(flags, release)
|
||||
|
||||
if !opts.SkipDeps && !opts.SkipRefresh && !alreadyDownloadedFlag {
|
||||
err = os.RemoveAll(chartPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := helm.ChartPull(qualifiedChartName, chartPath, flags...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if st.fs.DirectoryExistsAt(chartPath) {
|
||||
st.logger.Debugf("chart already exists at %s", chartPath)
|
||||
} else {
|
||||
flags := st.chartOCIFlags(release)
|
||||
flags = st.appendVerifyFlags(flags, release)
|
||||
flags = st.appendKeyringFlags(flags, release)
|
||||
flags = st.appendChartDownloadFlags(flags, release)
|
||||
flags = st.appendChartVersionFlags(flags, release)
|
||||
|
||||
err := helm.ChartPull(qualifiedChartName, chartPath, flags...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = helm.ChartExport(qualifiedChartName, chartPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := helm.ChartExport(qualifiedChartName, chartPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create refresh marker if needed
|
||||
lockResult.createRefreshMarker(st.logger)
|
||||
|
||||
fullChartPath, err := findChartDirectory(chartPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
downloadedOCIMutex.Lock()
|
||||
downloadedOCICharts[chartPath] = true
|
||||
downloadedOCIMutex.Unlock()
|
||||
|
||||
chartPath = filepath.Dir(fullChartPath)
|
||||
|
||||
// Add to global chart cache
|
||||
|
|
|
|||
|
|
@ -3274,6 +3274,118 @@ func Test_gatherUsernamePassword(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func Test_extractRegistryHost(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
url string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "ECR with chart path",
|
||||
url: "123456789012.dkr.ecr.us-east-1.amazonaws.com/helm-charts",
|
||||
want: "123456789012.dkr.ecr.us-east-1.amazonaws.com",
|
||||
},
|
||||
{
|
||||
name: "GHCR with nested path",
|
||||
url: "ghcr.io/deliveryhero/helm-charts",
|
||||
want: "ghcr.io",
|
||||
},
|
||||
{
|
||||
name: "registry with port and path",
|
||||
url: "registry.example.com:5000/helm-charts",
|
||||
want: "registry.example.com:5000",
|
||||
},
|
||||
{
|
||||
name: "registry without path",
|
||||
url: "registry.example.com",
|
||||
want: "registry.example.com",
|
||||
},
|
||||
{
|
||||
name: "with oci:// prefix and path",
|
||||
url: "oci://ghcr.io/charts/nginx",
|
||||
want: "ghcr.io",
|
||||
},
|
||||
{
|
||||
name: "with https:// prefix and path",
|
||||
url: "https://registry.example.com/helm-charts",
|
||||
want: "registry.example.com",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := extractRegistryHost(tt.url); got != tt.want {
|
||||
t.Errorf("extractRegistryHost() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHelmState_SyncRepos_OCI(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
repos []RepositorySpec
|
||||
wantRegistryLoginHost string
|
||||
}{
|
||||
{
|
||||
name: "OCI registry with chart path should extract host only",
|
||||
repos: []RepositorySpec{
|
||||
{
|
||||
Name: "ecr",
|
||||
URL: "123456789012.dkr.ecr.us-east-1.amazonaws.com/helm-charts",
|
||||
OCI: true,
|
||||
Username: "AWS",
|
||||
Password: "token",
|
||||
},
|
||||
},
|
||||
wantRegistryLoginHost: "123456789012.dkr.ecr.us-east-1.amazonaws.com",
|
||||
},
|
||||
{
|
||||
name: "OCI registry without path should pass URL as-is",
|
||||
repos: []RepositorySpec{
|
||||
{
|
||||
Name: "ghcr",
|
||||
URL: "ghcr.io",
|
||||
OCI: true,
|
||||
Username: "user",
|
||||
Password: "pass",
|
||||
},
|
||||
},
|
||||
wantRegistryLoginHost: "ghcr.io",
|
||||
},
|
||||
{
|
||||
name: "OCI registry with nested path",
|
||||
repos: []RepositorySpec{
|
||||
{
|
||||
Name: "docker",
|
||||
URL: "registry-1.docker.io/bitnamicharts",
|
||||
OCI: true,
|
||||
Username: "user",
|
||||
Password: "pass",
|
||||
},
|
||||
},
|
||||
wantRegistryLoginHost: "registry-1.docker.io",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
helm := &exectest.Helm{}
|
||||
state := &HelmState{
|
||||
ReleaseSetSpec: ReleaseSetSpec{
|
||||
Repositories: tt.repos,
|
||||
},
|
||||
}
|
||||
_, err := state.SyncRepos(helm, map[string]bool{})
|
||||
if err != nil {
|
||||
t.Errorf("SyncRepos() error = %v", err)
|
||||
return
|
||||
}
|
||||
if helm.RegistryLoginHost != tt.wantRegistryLoginHost {
|
||||
t.Errorf("RegistryLogin was called with host = %q, want %q", helm.RegistryLoginHost, tt.wantRegistryLoginHost)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGenerateOutputFilePath(t *testing.T) {
|
||||
tests := []struct {
|
||||
envName string
|
||||
|
|
|
|||
Loading…
Reference in New Issue