Improve sync responsiveness with background execution and context cancellation

This change improves the responsiveness of the operator when handling
deletion requests by running sync operations in the background and
using context cancellation to interrupt stuck operations.

Changes:
- Add context field to Cluster struct, passed through New()
- Add Cancel() method to cancel cluster's context
- Add StartSync/EndSync/NeedsResync for managing background sync state
- Run Sync() in a background goroutine so worker can process other events
- Add context-aware DB connection methods (initDbConnWithContext)
- Add RetryWithContext() that respects context cancellation
- Cancel cluster context immediately when DeletionTimestamp detected
- Use context-aware connections in syncRoles/syncDatabases
- StartSync/NeedsResync check context cancellation to prevent new syncs
  during deletion (no need for separate deleted flag)

Flow:
1. Sync event spawns background goroutine and returns immediately
2. If another sync arrives while one is running, needsResync flag is set
3. When sync completes, it checks needsResync and requeues if needed
4. Delete cancels context -> stuck DB operations return early -> mutex released
5. StartSync/NeedsResync return false when context cancelled
6. Delete proceeds without waiting for slow/stuck sync operations
This commit is contained in:
Thomas Rosenstein 2025-12-14 19:31:51 +00:00
parent 9ca87d9db4
commit 0affa9425a
13 changed files with 209 additions and 96 deletions

View File

@ -89,6 +89,11 @@ type Cluster struct {
podSubscribersMu sync.RWMutex
pgDb *sql.DB
mu sync.Mutex
ctx context.Context
cancelFunc context.CancelFunc
syncMu sync.Mutex // protects syncRunning and needsResync
syncRunning bool
needsResync bool
userSyncStrategy spec.UserSyncer
deleteOptions metav1.DeleteOptions
podEventsQueue *cache.FIFO
@ -121,9 +126,12 @@ type compareLogicalBackupJobResult struct {
}
// New creates a new cluster. This function should be called from a controller.
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster {
func New(ctx context.Context, cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster {
deletePropagationPolicy := metav1.DeletePropagationOrphan
// Create a cancellable context for this cluster
clusterCtx, cancelFunc := context.WithCancel(ctx)
podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) {
e, ok := obj.(PodEvent)
if !ok {
@ -138,6 +146,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
}
cluster := &Cluster{
ctx: clusterCtx,
cancelFunc: cancelFunc,
Config: cfg,
Postgresql: pgSpec,
pgUsers: make(map[string]spec.PgUser),
@ -177,6 +187,62 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
return cluster
}
// Cancel cancels the cluster's context, which will cause any ongoing
// context-aware operations (like Sync) to return early.
func (c *Cluster) Cancel() {
if c.cancelFunc != nil {
c.cancelFunc()
}
}
// StartSync attempts to start a sync operation. Returns true if sync can start
// (no sync currently running and context not cancelled). Returns false if a sync
// is already running (needsResync is set) or if context is cancelled (deletion in progress).
func (c *Cluster) StartSync() bool {
c.syncMu.Lock()
defer c.syncMu.Unlock()
// Check if context is cancelled (deletion in progress)
select {
case <-c.ctx.Done():
return false
default:
}
if c.syncRunning {
c.needsResync = true
return false
}
c.syncRunning = true
c.needsResync = false
return true
}
// EndSync marks the sync operation as complete.
func (c *Cluster) EndSync() {
c.syncMu.Lock()
defer c.syncMu.Unlock()
c.syncRunning = false
}
// NeedsResync returns true if a resync was requested while sync was running,
// and clears the flag. Returns false if context is cancelled (deletion in progress).
func (c *Cluster) NeedsResync() bool {
c.syncMu.Lock()
defer c.syncMu.Unlock()
// Check if context is cancelled (deletion in progress)
select {
case <-c.ctx.Done():
return false
default:
}
result := c.needsResync
c.needsResync = false
return result
}
func (c *Cluster) clusterName() spec.NamespacedName {
return util.NameFromMeta(c.ObjectMeta)
}

View File

@ -43,7 +43,7 @@ var logger = logrus.New().WithField("test", "cluster")
// 1 cluster, primary endpoint, 2 services, the secrets, the statefulset and pods being ready
var eventRecorder = record.NewFakeRecorder(7)
var cl = New(
var cl = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -135,7 +135,7 @@ func TestCreate(t *testing.T) {
client.Postgresqls(clusterNamespace).Create(context.TODO(), &pg, metav1.CreateOptions{})
client.Pods(clusterNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -1629,7 +1629,7 @@ func TestCompareLogicalBackupJob(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -1778,7 +1778,7 @@ func TestCrossNamespacedSecrets(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ConnectionPooler: config.ConnectionPooler{

View File

@ -161,7 +161,7 @@ func noEmptySync(cluster *Cluster, err error, reason SyncReason) error {
func TestNeedConnectionPooler(t *testing.T) {
testName := "Test how connection pooler can be enabled"
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
@ -297,7 +297,7 @@ func TestConnectionPoolerCreateDeletion(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ConnectionPooler: config.ConnectionPooler{
@ -405,7 +405,7 @@ func TestConnectionPoolerSync(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ConnectionPooler: config.ConnectionPooler{
@ -667,7 +667,7 @@ func TestConnectionPoolerSync(t *testing.T) {
func TestConnectionPoolerPodSpec(t *testing.T) {
testName := "Test connection pooler pod template generation"
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
@ -690,7 +690,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
ConnectionPooler: &acidv1.ConnectionPooler{},
EnableReplicaConnectionPooler: boolToPointer(true),
}
var clusterNoDefaultRes = New(
var clusterNoDefaultRes = New(context.Background(),
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
@ -780,7 +780,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
func TestConnectionPoolerDeploymentSpec(t *testing.T) {
testName := "Test connection pooler deployment spec generation"
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
@ -983,7 +983,7 @@ func TestPoolerTLS(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -1063,7 +1063,7 @@ func TestPoolerTLS(t *testing.T) {
func TestConnectionPoolerServiceSpec(t *testing.T) {
testName := "Test connection pooler service spec generation"
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},

View File

@ -2,6 +2,7 @@ package cluster
import (
"bytes"
"context"
"database/sql"
"fmt"
"net"
@ -121,26 +122,31 @@ func (c *Cluster) initDbConn() error {
if c.pgDb != nil {
return nil
}
return c.initDbConnWithName("")
}
// Worker function for connection initialization. This function does not check
// if the connection is already open, if it is then it will be overwritten.
// Callers need to make sure no connection is open, otherwise we could leak
// connections
// initDbConnWithName initializes a database connection using the cluster's context.
// This function does not check if the connection is already open.
func (c *Cluster) initDbConnWithName(dbname string) error {
return c.initDbConnWithNameContext(c.ctx, dbname)
}
// initDbConnWithNameContext initializes a database connection with an explicit context.
// Use this when you need a custom context (e.g., different timeout, or context.Background()
// for operations that should not be cancelled). This function does not check if the
// connection is already open, callers need to ensure no connection is open to avoid leaks.
func (c *Cluster) initDbConnWithNameContext(ctx context.Context, dbname string) error {
c.setProcessName("initializing db connection")
var conn *sql.DB
connstring := c.pgConnectionString(dbname)
finalerr := retryutil.Retry(constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout,
finalerr := retryutil.RetryWithContext(ctx, constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout,
func() (bool, error) {
var err error
conn, err = sql.Open("postgres", connstring)
if err == nil {
err = conn.Ping()
err = conn.PingContext(ctx)
}
if err == nil {
@ -268,9 +274,7 @@ func findUsersFromRotation(rotatedUsers []string, db *sql.DB) (map[string]string
}()
for rows.Next() {
var (
rolname, roldatesuffix string
)
var rolname, roldatesuffix string
err := rows.Scan(&rolname, &roldatesuffix)
if err != nil {
return nil, fmt.Errorf("error when processing rows of deprecated users: %v", err)
@ -331,9 +335,7 @@ func (c *Cluster) cleanupRotatedUsers(rotatedUsers []string) error {
// getDatabases returns the map of current databases with owners
// The caller is responsible for opening and closing the database connection
func (c *Cluster) getDatabases() (dbs map[string]string, err error) {
var (
rows *sql.Rows
)
var rows *sql.Rows
if rows, err = c.pgDb.Query(getDatabasesSQL); err != nil {
return nil, fmt.Errorf("could not query database: %v", err)
@ -551,9 +553,7 @@ func (c *Cluster) getOwnerRoles(dbObjPath string, withUser bool) (owners []strin
// getExtension returns the list of current database extensions
// The caller is responsible for opening and closing the database connection
func (c *Cluster) getExtensions() (dbExtensions map[string]string, err error) {
var (
rows *sql.Rows
)
var rows *sql.Rows
if rows, err = c.pgDb.Query(getExtensionsSQL); err != nil {
return nil, fmt.Errorf("could not query database extensions: %v", err)
@ -598,7 +598,6 @@ func (c *Cluster) executeAlterExtension(extName, schemaName string) error {
}
func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doing, operation string) error {
c.logger.Infof("%s %q schema %q", doing, extName, schemaName)
if _, err := c.pgDb.Exec(fmt.Sprintf(statement, extName, schemaName)); err != nil {
return fmt.Errorf("could not execute %s: %v", operation, err)
@ -610,9 +609,7 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi
// getPublications returns the list of current database publications with tables
// The caller is responsible for opening and closing the database connection
func (c *Cluster) getPublications() (publications map[string]string, err error) {
var (
rows *sql.Rows
)
var rows *sql.Rows
if rows, err = c.pgDb.Query(getPublicationsSQL); err != nil {
return nil, fmt.Errorf("could not query database publications: %v", err)
@ -668,7 +665,6 @@ func (c *Cluster) executeAlterPublication(pubName, tableList string) error {
}
func (c *Cluster) execCreateOrAlterPublication(pubName, tableList, statement, doing, operation string) error {
c.logger.Debugf("%s %q with table list %q", doing, pubName, tableList)
if _, err := c.pgDb.Exec(fmt.Sprintf(statement, pubName, tableList)); err != nil {
return fmt.Errorf("could not execute %s: %v", operation, err)
@ -743,7 +739,6 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
constants.PostgresConnectTimeout,
constants.PostgresConnectRetryTimeout,
func() (bool, error) {
// At this moment we are not connected to any database
if err := c.initDbConnWithName(dbname); err != nil {
msg := "could not init database connection to %s"

View File

@ -52,7 +52,7 @@ type ExpectedValue struct {
}
func TestGenerateSpiloJSONConfiguration(t *testing.T) {
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
@ -1143,7 +1143,7 @@ func TestGetNumberOfInstances(t *testing.T) {
}
for _, tt := range tests {
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: tt.config,
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
@ -1210,7 +1210,7 @@ func TestCloneEnv(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
WALES3Bucket: "wale-bucket",
@ -1384,7 +1384,7 @@ func TestStandbyEnv(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests {
@ -1431,7 +1431,7 @@ func TestNodeAffinity(t *testing.T) {
}
}
cluster = New(
cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -1524,7 +1524,7 @@ func TestPodAffinity(t *testing.T) {
}
for _, tt := range tests {
cluster := New(
cluster := New(context.Background(),
Config{
OpConfig: config.Config{
EnablePodAntiAffinity: tt.anti,
@ -1687,7 +1687,7 @@ func TestTLS(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -1942,7 +1942,7 @@ func TestAdditionalVolume(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -2083,7 +2083,7 @@ func TestVolumeSelector(t *testing.T) {
},
}
cluster := New(
cluster := New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -2182,7 +2182,7 @@ func TestSidecars(t *testing.T) {
},
}
cluster = New(
cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -2491,7 +2491,7 @@ func TestContainerValidation(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cluster := New(tc.clusterConfig, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster := New(context.Background(), tc.clusterConfig, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
_, err := cluster.generateStatefulSet(&tc.spec)
@ -2580,7 +2580,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
}{
{
scenario: "With multiple instances",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2597,7 +2597,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
},
{
scenario: "With zero instances",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2614,7 +2614,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
},
{
scenario: "With PodDisruptionBudget disabled",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2631,7 +2631,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
},
{
scenario: "With non-default PDBNameFormat and PodDisruptionBudget explicitly enabled",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-databass-budget", EnablePodDisruptionBudget: util.True()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2648,7 +2648,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
},
{
scenario: "With PDBMasterLabelSelector disabled",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True(), PDBMasterLabelSelector: util.False()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2665,7 +2665,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
},
{
scenario: "With OwnerReference enabled",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2700,7 +2700,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
}{
{
scenario: "With multiple instances",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2717,7 +2717,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
},
{
scenario: "With zero instances",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2734,7 +2734,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
},
{
scenario: "With PodDisruptionBudget disabled",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2751,7 +2751,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
},
{
scenario: "With OwnerReference enabled",
spec: New(
spec: New(context.Background(),
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
@ -2813,7 +2813,7 @@ func TestGenerateService(t *testing.T) {
EnableMasterLoadBalancer: &enableLB,
}
cluster = New(
cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -2864,7 +2864,7 @@ func TestGenerateService(t *testing.T) {
}
func TestCreateLoadBalancerLogic(t *testing.T) {
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
@ -3076,7 +3076,7 @@ func TestEnableLoadBalancers(t *testing.T) {
}
for _, tt := range tests {
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: tt.config,
}, client, tt.pgSpec, logger, eventRecorder)
@ -3708,7 +3708,7 @@ func TestGenerateResourceRequirements(t *testing.T) {
}
for _, tt := range tests {
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: tt.config,
}, client, tt.pgSpec, logger, newEventRecorder)
@ -3893,7 +3893,7 @@ func TestGenerateLogicalBackupJob(t *testing.T) {
}
for _, tt := range tests {
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: tt.config,
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder)

View File

@ -2,6 +2,7 @@ package cluster
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
@ -24,7 +25,7 @@ func TestGetSwitchoverCandidate(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
var cluster = New(
cluster := New(context.Background(),
Config{
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),

View File

@ -223,7 +223,7 @@ var (
},
}
cluster = New(
cluster = New(context.Background(),
Config{
OpConfig: config.Config{
Auth: config.Auth{
@ -529,7 +529,7 @@ func newFabricEventStream(streams []zalandov1.EventStream, annotations map[strin
func TestSyncStreams(t *testing.T) {
newClusterName := fmt.Sprintf("%s-2", pg.Name)
pg.Name = newClusterName
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -688,7 +688,7 @@ func TestSameStreams(t *testing.T) {
func TestUpdateStreams(t *testing.T) {
pg.Name = fmt.Sprintf("%s-3", pg.Name)
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -787,7 +787,7 @@ func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.Postg
func TestDeleteStreams(t *testing.T) {
pg.Name = fmt.Sprintf("%s-4", pg.Name)
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",

View File

@ -70,7 +70,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
return err
}
//TODO: mind the secrets of the deleted/new users
// TODO: mind the secrets of the deleted/new users
if err = c.syncSecrets(); err != nil {
err = fmt.Errorf("could not sync secrets: %v", err)
return err
@ -856,7 +856,6 @@ func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error {
// AnnotationsToPropagate get the annotations to update if required
// based on the annotations in postgres CRD
func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[string]string {
if annotations == nil {
annotations = make(map[string]string)
}
@ -1110,7 +1109,8 @@ func (c *Cluster) updateSecret(
secretUsername string,
generatedSecret *v1.Secret,
retentionUsers *[]string,
currentTime time.Time) (*v1.Secret, error) {
currentTime time.Time,
) (*v1.Secret, error) {
var (
secret *v1.Secret
err error
@ -1244,7 +1244,8 @@ func (c *Cluster) rotatePasswordInSecret(
secretUsername string,
roleOrigin spec.RoleOrigin,
currentTime time.Time,
retentionUsers *[]string) (string, error) {
retentionUsers *[]string,
) (string, error) {
var (
err error
nextRotationDate time.Time
@ -1469,7 +1470,7 @@ func (c *Cluster) syncDatabases() error {
preparedDatabases := make([]string, 0)
if err := c.initDbConn(); err != nil {
return fmt.Errorf("could not init database connection")
return fmt.Errorf("could not init database connection: %v", err)
}
defer func() {
if err := c.closeDbConn(); err != nil {
@ -1553,7 +1554,7 @@ func (c *Cluster) syncPreparedDatabases() error {
errors := make([]string, 0)
for preparedDbName, preparedDB := range c.Spec.PreparedDatabases {
if err := c.initDbConnWithName(preparedDbName); err != nil {
if err := c.initDbConnWithNameContext(c.ctx, preparedDbName); err != nil {
errors = append(errors, fmt.Sprintf("could not init connection to database %s: %v", preparedDbName, err))
continue
}
@ -1697,7 +1698,8 @@ func (c *Cluster) syncLogicalBackupJob() error {
}
if len(cmp.deletedPodAnnotations) != 0 {
templateMetadataReq := map[string]map[string]map[string]map[string]map[string]map[string]map[string]*string{
"spec": {"jobTemplate": {"spec": {"template": {"metadata": {"annotations": {}}}}}}}
"spec": {"jobTemplate": {"spec": {"template": {"metadata": {"annotations": {}}}}}},
}
for _, anno := range cmp.deletedPodAnnotations {
templateMetadataReq["spec"]["jobTemplate"]["spec"]["template"]["metadata"]["annotations"][anno] = nil
}

View File

@ -88,7 +88,7 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -184,7 +184,7 @@ func TestPodAnnotationsSync(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),
@ -369,7 +369,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
@ -691,7 +691,7 @@ func TestSyncStandbyClusterConfiguration(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),
@ -844,7 +844,7 @@ func TestUpdateSecret(t *testing.T) {
}
// new cluster with enabled password rotation
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
Auth: config.Auth{
@ -988,7 +988,7 @@ func TestUpdateSecretNameConflict(t *testing.T) {
},
}
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
Auth: config.Auth{

View File

@ -288,7 +288,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
},
}
cluster := New(
cluster := New(context.Background(),
Config{
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),

View File

@ -59,7 +59,7 @@ func TestResizeVolumeClaim(t *testing.T) {
assert.NoError(t, err)
// new cluster with pvc storage resize mode and configured labels
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
Resources: config.Resources{
@ -185,7 +185,7 @@ func TestMigrateEBS(t *testing.T) {
namespace := "default"
// new cluster with pvc storage resize mode and configured labels
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
Resources: config.Resources{
@ -293,7 +293,7 @@ func TestMigrateGp3Support(t *testing.T) {
namespace := "default"
// new cluster with pvc storage resize mode and configured labels
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
Resources: config.Resources{
@ -355,7 +355,7 @@ func TestManualGp2Gp3Support(t *testing.T) {
namespace := "default"
// new cluster with pvc storage resize mode and configured labels
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
Resources: config.Resources{
@ -415,7 +415,7 @@ func TestDontTouchType(t *testing.T) {
namespace := "default"
// new cluster with pvc storage resize mode and configured labels
var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
Resources: config.Resources{

View File

@ -166,7 +166,7 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam
}
}
cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
cl := cluster.New(context.Background(), c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
cl.Run(c.stopCh)
teamName := strings.ToLower(cl.Spec.TeamID)
@ -267,6 +267,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
// Check if this cluster has been marked for deletion
if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
cl.Cancel() // Cancel any ongoing operations
if err = cl.Delete(); err != nil {
cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
@ -277,7 +278,6 @@ func (c *Controller) processEvent(event ClusterEvent) {
return
}
lg.Infoln("update of the cluster started")
err = cl.Update(event.OldSpec, event.NewSpec)
if err != nil {
cl.Error = fmt.Sprintf("could not update cluster: %v", err)
@ -305,6 +305,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
// when using finalizers the deletion already happened
if c.opConfig.EnableFinalizers == nil || !*c.opConfig.EnableFinalizers {
lg.Infoln("deletion of the cluster started")
cl.Cancel() // Cancel any ongoing operations
if err := cl.Delete(); err != nil {
cl.Error = fmt.Sprintf("could not delete cluster: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
@ -330,8 +331,6 @@ func (c *Controller) processEvent(event ClusterEvent) {
lg.Infof("cluster has been deleted")
case EventSync:
lg.Infof("syncing of the cluster started")
// no race condition because a cluster is always processed by single worker
if !clusterFound {
cl, err = c.addCluster(lg, clusterName, event.NewSpec)
@ -346,22 +345,42 @@ func (c *Controller) processEvent(event ClusterEvent) {
// has this cluster been marked as deleted already, then we shall start cleaning up
if !cl.ObjectMeta.DeletionTimestamp.IsZero() {
lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
cl.Cancel() // Cancel any ongoing operations
if err = cl.Delete(); err != nil {
cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
lg.Error(cl.Error)
return
}
} else {
if err = cl.Sync(event.NewSpec); err != nil {
return
}
// Try to start sync - returns false if sync already running or cluster deleted
if !cl.StartSync() {
lg.Infof("sync already in progress, will resync when current sync completes")
return
}
// Run sync in background goroutine so we can process other events (like delete)
lg.Infof("syncing of the cluster started (background)")
go func() {
defer cl.EndSync()
if err := cl.Sync(event.NewSpec); err != nil {
cl.Error = fmt.Sprintf("could not sync cluster: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error)
lg.Error(cl.Error)
return
}
cl.Error = ""
lg.Infof("cluster has been synced")
}
cl.Error = ""
// Check if resync was requested while we were syncing
if cl.NeedsResync() {
lg.Infof("resync requested, queueing new sync event")
c.queueClusterEvent(nil, event.NewSpec, EventSync)
}
}()
}
}
@ -477,6 +496,19 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
}
}
// If the cluster is marked for deletion, cancel any ongoing operations immediately
// This unblocks stuck Sync operations so the delete can proceed
if informerNewSpec != nil && !informerNewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
c.clustersMu.RLock()
if cl, found := c.clusters[clusterName]; found {
c.logger.WithField("cluster-name", clusterName).Infof(
"cluster marked for deletion (DeletionTimestamp: %s), cancelling ongoing operations",
informerNewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
cl.Cancel()
}
c.clustersMu.RUnlock()
}
if clusterError != "" && eventType != EventDelete {
c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
@ -566,9 +598,13 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
// Avoid the infinite recursion for status updates
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
c.logger.WithField("cluster-name", clusterName).Debugf(
"UPDATE event: no spec/annotation changes, skipping")
return
}
}
c.logger.WithField("cluster-name", clusterName).Infof("UPDATE event: spec or annotations changed, queueing event")
c.queueClusterEvent(pgOld, pgNew, EventUpdate)
}
}

View File

@ -1,6 +1,7 @@
package retryutil
import (
"context"
"fmt"
"time"
)
@ -25,7 +26,7 @@ func (t *Ticker) Tick() { <-t.ticker.C }
// Retry is a wrapper around RetryWorker that provides a real RetryTicker
func Retry(interval time.Duration, timeout time.Duration, f func() (bool, error)) error {
//TODO: make the retry exponential
// TODO: make the retry exponential
if timeout < interval {
return fmt.Errorf("timeout(%s) should be greater than interval(%v)", timeout, interval)
}
@ -33,6 +34,18 @@ func Retry(interval time.Duration, timeout time.Duration, f func() (bool, error)
return RetryWorker(interval, timeout, tick, f)
}
// RetryWithContext is like Retry but checks for context cancellation before each attempt.
func RetryWithContext(ctx context.Context, interval time.Duration, timeout time.Duration, f func() (bool, error)) error {
return Retry(interval, timeout, func() (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
return f()
}
})
}
// RetryWorker calls ConditionFunc until either:
// * it returns boolean true
// * a timeout expires
@ -41,8 +54,8 @@ func RetryWorker(
interval time.Duration,
timeout time.Duration,
tick RetryTicker,
f func() (bool, error)) error {
f func() (bool, error),
) error {
maxRetries := int(timeout / interval)
defer tick.Stop()