From 0affa9425aaeee6f19b252a3db15a8dfad6a486b Mon Sep 17 00:00:00 2001 From: Thomas Rosenstein Date: Sun, 14 Dec 2025 19:31:51 +0000 Subject: [PATCH] Improve sync responsiveness with background execution and context cancellation This change improves the responsiveness of the operator when handling deletion requests by running sync operations in the background and using context cancellation to interrupt stuck operations. Changes: - Add context field to Cluster struct, passed through New() - Add Cancel() method to cancel cluster's context - Add StartSync/EndSync/NeedsResync for managing background sync state - Run Sync() in a background goroutine so worker can process other events - Add context-aware DB connection methods (initDbConnWithContext) - Add RetryWithContext() that respects context cancellation - Cancel cluster context immediately when DeletionTimestamp detected - Use context-aware connections in syncRoles/syncDatabases - StartSync/NeedsResync check context cancellation to prevent new syncs during deletion (no need for separate deleted flag) Flow: 1. Sync event spawns background goroutine and returns immediately 2. If another sync arrives while one is running, needsResync flag is set 3. When sync completes, it checks needsResync and requeues if needed 4. Delete cancels context -> stuck DB operations return early -> mutex released 5. StartSync/NeedsResync return false when context cancelled 6. Delete proceeds without waiting for slow/stuck sync operations --- pkg/cluster/cluster.go | 68 ++++++++++++++++++++++++++- pkg/cluster/cluster_test.go | 8 ++-- pkg/cluster/connection_pooler_test.go | 16 +++---- pkg/cluster/database.go | 39 +++++++-------- pkg/cluster/k8sres_test.go | 52 ++++++++++---------- pkg/cluster/pod_test.go | 3 +- pkg/cluster/streams_test.go | 8 ++-- pkg/cluster/sync.go | 16 ++++--- pkg/cluster/sync_test.go | 12 ++--- pkg/cluster/util_test.go | 2 +- pkg/cluster/volumes_test.go | 10 ++-- pkg/controller/postgresql.go | 52 ++++++++++++++++---- pkg/util/retryutil/retry_util.go | 19 ++++++-- 13 files changed, 209 insertions(+), 96 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f2f3651e7..0ab2906e9 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -89,6 +89,11 @@ type Cluster struct { podSubscribersMu sync.RWMutex pgDb *sql.DB mu sync.Mutex + ctx context.Context + cancelFunc context.CancelFunc + syncMu sync.Mutex // protects syncRunning and needsResync + syncRunning bool + needsResync bool userSyncStrategy spec.UserSyncer deleteOptions metav1.DeleteOptions podEventsQueue *cache.FIFO @@ -121,9 +126,12 @@ type compareLogicalBackupJobResult struct { } // New creates a new cluster. This function should be called from a controller. -func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster { +func New(ctx context.Context, cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster { deletePropagationPolicy := metav1.DeletePropagationOrphan + // Create a cancellable context for this cluster + clusterCtx, cancelFunc := context.WithCancel(ctx) + podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { e, ok := obj.(PodEvent) if !ok { @@ -138,6 +146,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres } cluster := &Cluster{ + ctx: clusterCtx, + cancelFunc: cancelFunc, Config: cfg, Postgresql: pgSpec, pgUsers: make(map[string]spec.PgUser), @@ -177,6 +187,62 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres return cluster } +// Cancel cancels the cluster's context, which will cause any ongoing +// context-aware operations (like Sync) to return early. +func (c *Cluster) Cancel() { + if c.cancelFunc != nil { + c.cancelFunc() + } +} + +// StartSync attempts to start a sync operation. Returns true if sync can start +// (no sync currently running and context not cancelled). Returns false if a sync +// is already running (needsResync is set) or if context is cancelled (deletion in progress). +func (c *Cluster) StartSync() bool { + c.syncMu.Lock() + defer c.syncMu.Unlock() + + // Check if context is cancelled (deletion in progress) + select { + case <-c.ctx.Done(): + return false + default: + } + + if c.syncRunning { + c.needsResync = true + return false + } + c.syncRunning = true + c.needsResync = false + return true +} + +// EndSync marks the sync operation as complete. +func (c *Cluster) EndSync() { + c.syncMu.Lock() + defer c.syncMu.Unlock() + c.syncRunning = false +} + +// NeedsResync returns true if a resync was requested while sync was running, +// and clears the flag. Returns false if context is cancelled (deletion in progress). +func (c *Cluster) NeedsResync() bool { + c.syncMu.Lock() + defer c.syncMu.Unlock() + + // Check if context is cancelled (deletion in progress) + select { + case <-c.ctx.Done(): + return false + default: + } + + result := c.needsResync + c.needsResync = false + return result +} + func (c *Cluster) clusterName() spec.NamespacedName { return util.NameFromMeta(c.ObjectMeta) } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index d78d4c92e..cf5f4aae1 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -43,7 +43,7 @@ var logger = logrus.New().WithField("test", "cluster") // 1 cluster, primary endpoint, 2 services, the secrets, the statefulset and pods being ready var eventRecorder = record.NewFakeRecorder(7) -var cl = New( +var cl = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -135,7 +135,7 @@ func TestCreate(t *testing.T) { client.Postgresqls(clusterNamespace).Create(context.TODO(), &pg, metav1.CreateOptions{}) client.Pods(clusterNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -1629,7 +1629,7 @@ func TestCompareLogicalBackupJob(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -1778,7 +1778,7 @@ func TestCrossNamespacedSecrets(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ ConnectionPooler: config.ConnectionPooler{ diff --git a/pkg/cluster/connection_pooler_test.go b/pkg/cluster/connection_pooler_test.go index 78d1c2527..cda965e9a 100644 --- a/pkg/cluster/connection_pooler_test.go +++ b/pkg/cluster/connection_pooler_test.go @@ -161,7 +161,7 @@ func noEmptySync(cluster *Cluster, err error, reason SyncReason) error { func TestNeedConnectionPooler(t *testing.T) { testName := "Test how connection pooler can be enabled" - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ ProtectedRoles: []string{"admin"}, @@ -297,7 +297,7 @@ func TestConnectionPoolerCreateDeletion(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ ConnectionPooler: config.ConnectionPooler{ @@ -405,7 +405,7 @@ func TestConnectionPoolerSync(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ ConnectionPooler: config.ConnectionPooler{ @@ -667,7 +667,7 @@ func TestConnectionPoolerSync(t *testing.T) { func TestConnectionPoolerPodSpec(t *testing.T) { testName := "Test connection pooler pod template generation" - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ ProtectedRoles: []string{"admin"}, @@ -690,7 +690,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { ConnectionPooler: &acidv1.ConnectionPooler{}, EnableReplicaConnectionPooler: boolToPointer(true), } - var clusterNoDefaultRes = New( + var clusterNoDefaultRes = New(context.Background(), Config{ OpConfig: config.Config{ ProtectedRoles: []string{"admin"}, @@ -780,7 +780,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { func TestConnectionPoolerDeploymentSpec(t *testing.T) { testName := "Test connection pooler deployment spec generation" - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ ProtectedRoles: []string{"admin"}, @@ -983,7 +983,7 @@ func TestPoolerTLS(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -1063,7 +1063,7 @@ func TestPoolerTLS(t *testing.T) { func TestConnectionPoolerServiceSpec(t *testing.T) { testName := "Test connection pooler service spec generation" - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ ProtectedRoles: []string{"admin"}, diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 56b5f3638..4f3d5c775 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -2,6 +2,7 @@ package cluster import ( "bytes" + "context" "database/sql" "fmt" "net" @@ -121,26 +122,31 @@ func (c *Cluster) initDbConn() error { if c.pgDb != nil { return nil } - return c.initDbConnWithName("") } -// Worker function for connection initialization. This function does not check -// if the connection is already open, if it is then it will be overwritten. -// Callers need to make sure no connection is open, otherwise we could leak -// connections +// initDbConnWithName initializes a database connection using the cluster's context. +// This function does not check if the connection is already open. func (c *Cluster) initDbConnWithName(dbname string) error { + return c.initDbConnWithNameContext(c.ctx, dbname) +} + +// initDbConnWithNameContext initializes a database connection with an explicit context. +// Use this when you need a custom context (e.g., different timeout, or context.Background() +// for operations that should not be cancelled). This function does not check if the +// connection is already open, callers need to ensure no connection is open to avoid leaks. +func (c *Cluster) initDbConnWithNameContext(ctx context.Context, dbname string) error { c.setProcessName("initializing db connection") var conn *sql.DB connstring := c.pgConnectionString(dbname) - finalerr := retryutil.Retry(constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, + finalerr := retryutil.RetryWithContext(ctx, constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { var err error conn, err = sql.Open("postgres", connstring) if err == nil { - err = conn.Ping() + err = conn.PingContext(ctx) } if err == nil { @@ -268,9 +274,7 @@ func findUsersFromRotation(rotatedUsers []string, db *sql.DB) (map[string]string }() for rows.Next() { - var ( - rolname, roldatesuffix string - ) + var rolname, roldatesuffix string err := rows.Scan(&rolname, &roldatesuffix) if err != nil { return nil, fmt.Errorf("error when processing rows of deprecated users: %v", err) @@ -331,9 +335,7 @@ func (c *Cluster) cleanupRotatedUsers(rotatedUsers []string) error { // getDatabases returns the map of current databases with owners // The caller is responsible for opening and closing the database connection func (c *Cluster) getDatabases() (dbs map[string]string, err error) { - var ( - rows *sql.Rows - ) + var rows *sql.Rows if rows, err = c.pgDb.Query(getDatabasesSQL); err != nil { return nil, fmt.Errorf("could not query database: %v", err) @@ -551,9 +553,7 @@ func (c *Cluster) getOwnerRoles(dbObjPath string, withUser bool) (owners []strin // getExtension returns the list of current database extensions // The caller is responsible for opening and closing the database connection func (c *Cluster) getExtensions() (dbExtensions map[string]string, err error) { - var ( - rows *sql.Rows - ) + var rows *sql.Rows if rows, err = c.pgDb.Query(getExtensionsSQL); err != nil { return nil, fmt.Errorf("could not query database extensions: %v", err) @@ -598,7 +598,6 @@ func (c *Cluster) executeAlterExtension(extName, schemaName string) error { } func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doing, operation string) error { - c.logger.Infof("%s %q schema %q", doing, extName, schemaName) if _, err := c.pgDb.Exec(fmt.Sprintf(statement, extName, schemaName)); err != nil { return fmt.Errorf("could not execute %s: %v", operation, err) @@ -610,9 +609,7 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi // getPublications returns the list of current database publications with tables // The caller is responsible for opening and closing the database connection func (c *Cluster) getPublications() (publications map[string]string, err error) { - var ( - rows *sql.Rows - ) + var rows *sql.Rows if rows, err = c.pgDb.Query(getPublicationsSQL); err != nil { return nil, fmt.Errorf("could not query database publications: %v", err) @@ -668,7 +665,6 @@ func (c *Cluster) executeAlterPublication(pubName, tableList string) error { } func (c *Cluster) execCreateOrAlterPublication(pubName, tableList, statement, doing, operation string) error { - c.logger.Debugf("%s %q with table list %q", doing, pubName, tableList) if _, err := c.pgDb.Exec(fmt.Sprintf(statement, pubName, tableList)); err != nil { return fmt.Errorf("could not execute %s: %v", operation, err) @@ -743,7 +739,6 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { - // At this moment we are not connected to any database if err := c.initDbConnWithName(dbname); err != nil { msg := "could not init database connection to %s" diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 6bd87366d..7c98eb6f0 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -52,7 +52,7 @@ type ExpectedValue struct { } func TestGenerateSpiloJSONConfiguration(t *testing.T) { - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ ProtectedRoles: []string{"admin"}, @@ -1143,7 +1143,7 @@ func TestGetNumberOfInstances(t *testing.T) { } for _, tt := range tests { - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: tt.config, }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) @@ -1210,7 +1210,7 @@ func TestCloneEnv(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ WALES3Bucket: "wale-bucket", @@ -1384,7 +1384,7 @@ func TestStandbyEnv(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) for _, tt := range tests { @@ -1431,7 +1431,7 @@ func TestNodeAffinity(t *testing.T) { } } - cluster = New( + cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -1524,7 +1524,7 @@ func TestPodAffinity(t *testing.T) { } for _, tt := range tests { - cluster := New( + cluster := New(context.Background(), Config{ OpConfig: config.Config{ EnablePodAntiAffinity: tt.anti, @@ -1687,7 +1687,7 @@ func TestTLS(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -1942,7 +1942,7 @@ func TestAdditionalVolume(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -2083,7 +2083,7 @@ func TestVolumeSelector(t *testing.T) { }, } - cluster := New( + cluster := New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -2182,7 +2182,7 @@ func TestSidecars(t *testing.T) { }, } - cluster = New( + cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -2491,7 +2491,7 @@ func TestContainerValidation(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - cluster := New(tc.clusterConfig, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + cluster := New(context.Background(), tc.clusterConfig, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) _, err := cluster.generateStatefulSet(&tc.spec) @@ -2580,7 +2580,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }{ { scenario: "With multiple instances", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2597,7 +2597,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }, { scenario: "With zero instances", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2614,7 +2614,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }, { scenario: "With PodDisruptionBudget disabled", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2631,7 +2631,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }, { scenario: "With non-default PDBNameFormat and PodDisruptionBudget explicitly enabled", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-databass-budget", EnablePodDisruptionBudget: util.True()}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2648,7 +2648,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }, { scenario: "With PDBMasterLabelSelector disabled", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True(), PDBMasterLabelSelector: util.False()}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2665,7 +2665,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }, { scenario: "With OwnerReference enabled", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2700,7 +2700,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }{ { scenario: "With multiple instances", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2717,7 +2717,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }, { scenario: "With zero instances", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2734,7 +2734,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }, { scenario: "With PodDisruptionBudget disabled", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2751,7 +2751,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { }, { scenario: "With OwnerReference enabled", - spec: New( + spec: New(context.Background(), Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}}, k8sutil.KubernetesClient{}, acidv1.Postgresql{ @@ -2813,7 +2813,7 @@ func TestGenerateService(t *testing.T) { EnableMasterLoadBalancer: &enableLB, } - cluster = New( + cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -2864,7 +2864,7 @@ func TestGenerateService(t *testing.T) { } func TestCreateLoadBalancerLogic(t *testing.T) { - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ ProtectedRoles: []string{"admin"}, @@ -3076,7 +3076,7 @@ func TestEnableLoadBalancers(t *testing.T) { } for _, tt := range tests { - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: tt.config, }, client, tt.pgSpec, logger, eventRecorder) @@ -3708,7 +3708,7 @@ func TestGenerateResourceRequirements(t *testing.T) { } for _, tt := range tests { - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: tt.config, }, client, tt.pgSpec, logger, newEventRecorder) @@ -3893,7 +3893,7 @@ func TestGenerateLogicalBackupJob(t *testing.T) { } for _, tt := range tests { - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: tt.config, }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) diff --git a/pkg/cluster/pod_test.go b/pkg/cluster/pod_test.go index 6816b4d7a..854f7e8c2 100644 --- a/pkg/cluster/pod_test.go +++ b/pkg/cluster/pod_test.go @@ -2,6 +2,7 @@ package cluster import ( "bytes" + "context" "fmt" "io" "net/http" @@ -24,7 +25,7 @@ func TestGetSwitchoverCandidate(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - var cluster = New( + cluster := New(context.Background(), Config{ OpConfig: config.Config{ PatroniAPICheckInterval: time.Duration(1), diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 934f2bfd4..86f26eea5 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -223,7 +223,7 @@ var ( }, } - cluster = New( + cluster = New(context.Background(), Config{ OpConfig: config.Config{ Auth: config.Auth{ @@ -529,7 +529,7 @@ func newFabricEventStream(streams []zalandov1.EventStream, annotations map[strin func TestSyncStreams(t *testing.T) { newClusterName := fmt.Sprintf("%s-2", pg.Name) pg.Name = newClusterName - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -688,7 +688,7 @@ func TestSameStreams(t *testing.T) { func TestUpdateStreams(t *testing.T) { pg.Name = fmt.Sprintf("%s-3", pg.Name) - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -787,7 +787,7 @@ func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.Postg func TestDeleteStreams(t *testing.T) { pg.Name = fmt.Sprintf("%s-4", pg.Name) - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index ecf692702..320be6cc6 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -70,7 +70,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return err } - //TODO: mind the secrets of the deleted/new users + // TODO: mind the secrets of the deleted/new users if err = c.syncSecrets(); err != nil { err = fmt.Errorf("could not sync secrets: %v", err) return err @@ -856,7 +856,6 @@ func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error { // AnnotationsToPropagate get the annotations to update if required // based on the annotations in postgres CRD func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[string]string { - if annotations == nil { annotations = make(map[string]string) } @@ -1110,7 +1109,8 @@ func (c *Cluster) updateSecret( secretUsername string, generatedSecret *v1.Secret, retentionUsers *[]string, - currentTime time.Time) (*v1.Secret, error) { + currentTime time.Time, +) (*v1.Secret, error) { var ( secret *v1.Secret err error @@ -1244,7 +1244,8 @@ func (c *Cluster) rotatePasswordInSecret( secretUsername string, roleOrigin spec.RoleOrigin, currentTime time.Time, - retentionUsers *[]string) (string, error) { + retentionUsers *[]string, +) (string, error) { var ( err error nextRotationDate time.Time @@ -1469,7 +1470,7 @@ func (c *Cluster) syncDatabases() error { preparedDatabases := make([]string, 0) if err := c.initDbConn(); err != nil { - return fmt.Errorf("could not init database connection") + return fmt.Errorf("could not init database connection: %v", err) } defer func() { if err := c.closeDbConn(); err != nil { @@ -1553,7 +1554,7 @@ func (c *Cluster) syncPreparedDatabases() error { errors := make([]string, 0) for preparedDbName, preparedDB := range c.Spec.PreparedDatabases { - if err := c.initDbConnWithName(preparedDbName); err != nil { + if err := c.initDbConnWithNameContext(c.ctx, preparedDbName); err != nil { errors = append(errors, fmt.Sprintf("could not init connection to database %s: %v", preparedDbName, err)) continue } @@ -1697,7 +1698,8 @@ func (c *Cluster) syncLogicalBackupJob() error { } if len(cmp.deletedPodAnnotations) != 0 { templateMetadataReq := map[string]map[string]map[string]map[string]map[string]map[string]map[string]*string{ - "spec": {"jobTemplate": {"spec": {"template": {"metadata": {"annotations": {}}}}}}} + "spec": {"jobTemplate": {"spec": {"template": {"metadata": {"annotations": {}}}}}}, + } for _, anno := range cmp.deletedPodAnnotations { templateMetadataReq["spec"]["jobTemplate"]["spec"]["template"]["metadata"]["annotations"][anno] = nil } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 87e9dc8a5..dda862d67 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -88,7 +88,7 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -184,7 +184,7 @@ func TestPodAnnotationsSync(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PatroniAPICheckInterval: time.Duration(1), @@ -369,7 +369,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PodManagementPolicy: "ordered_ready", @@ -691,7 +691,7 @@ func TestSyncStandbyClusterConfiguration(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ PatroniAPICheckInterval: time.Duration(1), @@ -844,7 +844,7 @@ func TestUpdateSecret(t *testing.T) { } // new cluster with enabled password rotation - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ Auth: config.Auth{ @@ -988,7 +988,7 @@ func TestUpdateSecretNameConflict(t *testing.T) { }, } - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ Auth: config.Auth{ diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 9cd7dc7e9..e9c3ca1bb 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -288,7 +288,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, }, } - cluster := New( + cluster := New(context.Background(), Config{ OpConfig: config.Config{ PatroniAPICheckInterval: time.Duration(1), diff --git a/pkg/cluster/volumes_test.go b/pkg/cluster/volumes_test.go index 95ecc7624..4d9eb0189 100644 --- a/pkg/cluster/volumes_test.go +++ b/pkg/cluster/volumes_test.go @@ -59,7 +59,7 @@ func TestResizeVolumeClaim(t *testing.T) { assert.NoError(t, err) // new cluster with pvc storage resize mode and configured labels - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ Resources: config.Resources{ @@ -185,7 +185,7 @@ func TestMigrateEBS(t *testing.T) { namespace := "default" // new cluster with pvc storage resize mode and configured labels - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ Resources: config.Resources{ @@ -293,7 +293,7 @@ func TestMigrateGp3Support(t *testing.T) { namespace := "default" // new cluster with pvc storage resize mode and configured labels - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ Resources: config.Resources{ @@ -355,7 +355,7 @@ func TestManualGp2Gp3Support(t *testing.T) { namespace := "default" // new cluster with pvc storage resize mode and configured labels - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ Resources: config.Resources{ @@ -415,7 +415,7 @@ func TestDontTouchType(t *testing.T) { namespace := "default" // new cluster with pvc storage resize mode and configured labels - var cluster = New( + var cluster = New(context.Background(), Config{ OpConfig: config.Config{ Resources: config.Resources{ diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 22042377e..3a1f748f9 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -166,7 +166,7 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam } } - cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder) + cl := cluster.New(context.Background(), c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder) cl.Run(c.stopCh) teamName := strings.ToLower(cl.Spec.TeamID) @@ -267,6 +267,7 @@ func (c *Controller) processEvent(event ClusterEvent) { // Check if this cluster has been marked for deletion if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() { lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339)) + cl.Cancel() // Cancel any ongoing operations if err = cl.Delete(); err != nil { cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err) c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error) @@ -277,7 +278,6 @@ func (c *Controller) processEvent(event ClusterEvent) { return } - lg.Infoln("update of the cluster started") err = cl.Update(event.OldSpec, event.NewSpec) if err != nil { cl.Error = fmt.Sprintf("could not update cluster: %v", err) @@ -305,6 +305,7 @@ func (c *Controller) processEvent(event ClusterEvent) { // when using finalizers the deletion already happened if c.opConfig.EnableFinalizers == nil || !*c.opConfig.EnableFinalizers { lg.Infoln("deletion of the cluster started") + cl.Cancel() // Cancel any ongoing operations if err := cl.Delete(); err != nil { cl.Error = fmt.Sprintf("could not delete cluster: %v", err) c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error) @@ -330,8 +331,6 @@ func (c *Controller) processEvent(event ClusterEvent) { lg.Infof("cluster has been deleted") case EventSync: - lg.Infof("syncing of the cluster started") - // no race condition because a cluster is always processed by single worker if !clusterFound { cl, err = c.addCluster(lg, clusterName, event.NewSpec) @@ -346,22 +345,42 @@ func (c *Controller) processEvent(event ClusterEvent) { // has this cluster been marked as deleted already, then we shall start cleaning up if !cl.ObjectMeta.DeletionTimestamp.IsZero() { lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339)) + cl.Cancel() // Cancel any ongoing operations if err = cl.Delete(); err != nil { cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err) c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error) lg.Error(cl.Error) return } - } else { - if err = cl.Sync(event.NewSpec); err != nil { + return + } + + // Try to start sync - returns false if sync already running or cluster deleted + if !cl.StartSync() { + lg.Infof("sync already in progress, will resync when current sync completes") + return + } + + // Run sync in background goroutine so we can process other events (like delete) + lg.Infof("syncing of the cluster started (background)") + go func() { + defer cl.EndSync() + + if err := cl.Sync(event.NewSpec); err != nil { cl.Error = fmt.Sprintf("could not sync cluster: %v", err) c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error) lg.Error(cl.Error) return } + cl.Error = "" lg.Infof("cluster has been synced") - } - cl.Error = "" + + // Check if resync was requested while we were syncing + if cl.NeedsResync() { + lg.Infof("resync requested, queueing new sync event") + c.queueClusterEvent(nil, event.NewSpec, EventSync) + } + }() } } @@ -477,6 +496,19 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. } } + // If the cluster is marked for deletion, cancel any ongoing operations immediately + // This unblocks stuck Sync operations so the delete can proceed + if informerNewSpec != nil && !informerNewSpec.ObjectMeta.DeletionTimestamp.IsZero() { + c.clustersMu.RLock() + if cl, found := c.clusters[clusterName]; found { + c.logger.WithField("cluster-name", clusterName).Infof( + "cluster marked for deletion (DeletionTimestamp: %s), cancelling ongoing operations", + informerNewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339)) + cl.Cancel() + } + c.clustersMu.RUnlock() + } + if clusterError != "" && eventType != EventDelete { c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError) @@ -566,9 +598,13 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { // Avoid the infinite recursion for status updates if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) { + c.logger.WithField("cluster-name", clusterName).Debugf( + "UPDATE event: no spec/annotation changes, skipping") return } } + + c.logger.WithField("cluster-name", clusterName).Infof("UPDATE event: spec or annotations changed, queueing event") c.queueClusterEvent(pgOld, pgNew, EventUpdate) } } diff --git a/pkg/util/retryutil/retry_util.go b/pkg/util/retryutil/retry_util.go index 868ba6e98..b5fab3b47 100644 --- a/pkg/util/retryutil/retry_util.go +++ b/pkg/util/retryutil/retry_util.go @@ -1,6 +1,7 @@ package retryutil import ( + "context" "fmt" "time" ) @@ -25,7 +26,7 @@ func (t *Ticker) Tick() { <-t.ticker.C } // Retry is a wrapper around RetryWorker that provides a real RetryTicker func Retry(interval time.Duration, timeout time.Duration, f func() (bool, error)) error { - //TODO: make the retry exponential + // TODO: make the retry exponential if timeout < interval { return fmt.Errorf("timeout(%s) should be greater than interval(%v)", timeout, interval) } @@ -33,6 +34,18 @@ func Retry(interval time.Duration, timeout time.Duration, f func() (bool, error) return RetryWorker(interval, timeout, tick, f) } +// RetryWithContext is like Retry but checks for context cancellation before each attempt. +func RetryWithContext(ctx context.Context, interval time.Duration, timeout time.Duration, f func() (bool, error)) error { + return Retry(interval, timeout, func() (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + return f() + } + }) +} + // RetryWorker calls ConditionFunc until either: // * it returns boolean true // * a timeout expires @@ -41,8 +54,8 @@ func RetryWorker( interval time.Duration, timeout time.Duration, tick RetryTicker, - f func() (bool, error)) error { - + f func() (bool, error), +) error { maxRetries := int(timeout / interval) defer tick.Stop()