From 9ca87d9db460e8dcfa59b80ae06f7378b9cd2494 Mon Sep 17 00:00:00 2001 From: Thomas Rosenstein Date: Sun, 14 Dec 2025 19:26:40 +0000 Subject: [PATCH] Fix deletion timestamp handling for clusters with finalizers When a Postgres cluster has a finalizer, deleting it sets a DeletionTimestamp but doesn't remove the object until the finalizer is cleared. The operator was not properly handling these DeletionTimestamp changes: 1. postgresqlUpdate() was filtering out events where only DeletionTimestamp changed (it only checked Spec and Annotations), causing the delete to never be processed. 2. EventUpdate case in processEvent() didn't check for DeletionTimestamp, so even if the event reached the processor, it would run Update() instead of Delete(). 3. removeFinalizer() used a cached object with stale resourceVersion, causing "object has been modified" errors. Fixes: - Add explicit DeletionTimestamp check in postgresqlUpdate() to queue the event - Add DeletionTimestamp check in EventUpdate to call Delete() when set - Fetch latest object from API before removing finalizer to avoid conflicts --- pkg/cluster/cluster.go | 48 ++++++++++++++++++------------------ pkg/controller/controller.go | 2 +- pkg/controller/postgresql.go | 39 ++++++++++++++++++++++------- 3 files changed, 55 insertions(+), 34 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b6a4e24a8..f2f3651e7 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -3,6 +3,7 @@ package cluster // Postgres CustomResourceDefinition object i.e. Spilo import ( + "context" "database/sql" "encoding/json" "fmt" @@ -70,7 +71,7 @@ type kubeResources struct { CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget LogicalBackupJob *batchv1.CronJob Streams map[string]*zalandov1.FabricEventStream - //Pods are treated separately + // Pods are treated separately } // Cluster describes postgresql cluster @@ -95,7 +96,7 @@ type Cluster struct { teamsAPIClient teams.Interface oauthTokenGetter OAuthTokenGetter - KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? + KubeClient k8sutil.KubernetesClient // TODO: move clients to the better place? currentProcess Process processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex @@ -149,7 +150,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres PatroniEndpoints: make(map[string]*v1.Endpoints), PatroniConfigMaps: make(map[string]*v1.ConfigMap), VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim), - Streams: make(map[string]*zalandov1.FabricEventStream)}, + Streams: make(map[string]*zalandov1.FabricEventStream), + }, userSyncStrategy: users.DefaultUserSyncStrategy{ PasswordEncryption: passwordEncryption, RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix, @@ -276,7 +278,7 @@ func (c *Cluster) Create() (err error) { errStatus error ) if err == nil { - pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running? + pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) // TODO: are you sure it's running? } else { c.logger.Warningf("cluster created failed: %v", err) pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed) @@ -440,7 +442,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa var match, needsRollUpdate, needsReplace bool match = true - //TODO: improve me + // TODO: improve me if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas { match = false reasons = append(reasons, "new statefulset's number of replicas does not match the current one") @@ -672,7 +674,6 @@ func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resourc } } return true - } func compareEnv(a, b []v1.EnvVar) bool { @@ -707,9 +708,7 @@ func compareEnv(a, b []v1.EnvVar) bool { } func compareSpiloConfiguration(configa, configb string) bool { - var ( - oa, ob spiloConfiguration - ) + var oa, ob spiloConfiguration var err error err = json.Unmarshal([]byte(configa), &oa) @@ -818,7 +817,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[] } return reason != "", reason - } func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { @@ -895,7 +893,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLog } func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) { - //TODO: improve comparison + // TODO: improve comparison if !reflect.DeepEqual(new.Spec, cur.Spec) { return false, "new PDB's spec does not match the current one" } @@ -944,8 +942,17 @@ func (c *Cluster) removeFinalizer() error { } c.logger.Infof("removing finalizer %s", finalizerName) - finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName) - newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers) + + // Fetch the latest version of the object to avoid resourceVersion conflicts + clusterName := c.clusterName() + latestPg, err := c.KubeClient.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get( + context.TODO(), clusterName.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error fetching latest postgresql for finalizer removal: %v", err) + } + + finalizers := util.RemoveString(latestPg.ObjectMeta.Finalizers, finalizerName) + newSpec, err := c.KubeClient.SetFinalizer(clusterName, latestPg, finalizers) if err != nil { return fmt.Errorf("error removing finalizer: %v", err) } @@ -1063,7 +1070,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } c.logger.Debug("syncing secrets") - //TODO: mind the secrets of the deleted/new users + // TODO: mind the secrets of the deleted/new users if err := c.syncSecrets(); err != nil { c.logger.Errorf("could not sync secrets: %v", err) updateFailed = true @@ -1101,7 +1108,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // logical backup job func() { - // create if it did not exist if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup { c.logger.Debug("creating backup cron job") @@ -1129,7 +1135,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true } } - }() // Roles and Databases @@ -1206,7 +1211,7 @@ func syncResources(a, b *v1.ResourceRequirements) bool { // before the pods, it will be re-created by the current master pod and will remain, obstructing the // creation of the new cluster with the same name. Therefore, the endpoints should be deleted last. func (c *Cluster) Delete() error { - var anyErrors = false + anyErrors := false c.mu.Lock() defer c.mu.Unlock() c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources") @@ -1297,7 +1302,6 @@ func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) { c.specMu.RLock() defer c.specMu.RUnlock() return !c.Status.Success(), c.Status - } // ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue. @@ -1406,7 +1410,6 @@ func (c *Cluster) initSystemUsers() { } func (c *Cluster) initPreparedDatabaseRoles() error { - if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}} } @@ -1472,10 +1475,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error { } func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error { - for defaultRole, inherits := range defaultRoles { namespace := c.Namespace - //if namespaced secrets are allowed + // if namespaced secrets are allowed if secretNamespace != "" { if c.Config.OpConfig.EnableCrossNamespaceSecret { namespace = secretNamespace @@ -1543,7 +1545,7 @@ func (c *Cluster) initRobotUsers() error { } } - //if namespaced secrets are allowed + // if namespaced secrets are allowed if c.Config.OpConfig.EnableCrossNamespaceSecret { if strings.Contains(username, ".") { splits := strings.Split(username, ".") @@ -1594,7 +1596,6 @@ func (c *Cluster) initAdditionalOwnerRoles() { func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error { teamMembers, err := c.getTeamMembers(teamID) - if err != nil { return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err) } @@ -1633,7 +1634,6 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e } func (c *Cluster) initHumanUsers() error { - var clusterIsOwnedBySuperuserTeam bool superuserTeams := []string{} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e46b9ee44..aa6262264 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -277,7 +277,7 @@ func (c *Controller) initRoleBinding() { }`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name) c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue) } - c.logger.Info("Parse role bindings") + // re-uses k8s internal parsing. See k8s client-go issue #193 for explanation decode := scheme.Codecs.UniversalDeserializer().Decode obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 824a030f4..22042377e 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -258,13 +258,26 @@ func (c *Controller) processEvent(event ClusterEvent) { lg.Infoln("cluster has been created") case EventUpdate: - lg.Infoln("update of the cluster started") - if !clusterFound { lg.Warningln("cluster does not exist") return } c.curWorkerCluster.Store(event.WorkerID, cl) + + // Check if this cluster has been marked for deletion + if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() { + lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339)) + if err = cl.Delete(); err != nil { + cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error) + lg.Error(cl.Error) + return + } + lg.Infoln("cluster has been deleted via update event") + return + } + + lg.Infoln("update of the cluster started") err = cl.Update(event.OldSpec, event.NewSpec) if err != nil { cl.Error = fmt.Sprintf("could not update cluster: %v", err) @@ -379,7 +392,6 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, } func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) { - deprecate := func(deprecated, replacement string) { c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement) } @@ -425,7 +437,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. clusterError string ) - if informerOldSpec != nil { //update, delete + if informerOldSpec != nil { // update, delete uid = informerOldSpec.GetUID() clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta) @@ -440,7 +452,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. } else { clusterError = informerOldSpec.Error } - } else { //add, sync + } else { // add, sync uid = informerNewSpec.GetUID() clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta) clusterError = informerNewSpec.Error @@ -539,7 +551,19 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { pgOld := c.postgresqlCheck(prev) pgNew := c.postgresqlCheck(cur) if pgOld != nil && pgNew != nil { - // Avoid the inifinite recursion for status updates + clusterName := util.NameFromMeta(pgNew.ObjectMeta) + + // Check if DeletionTimestamp was set (resource marked for deletion) + deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero() + if deletionTimestampChanged { + c.logger.WithField("cluster-name", clusterName).Infof( + "UPDATE event: DeletionTimestamp set to %s, queueing event", + pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339)) + c.queueClusterEvent(pgOld, pgNew, EventUpdate) + return + } + + // Avoid the infinite recursion for status updates if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) { return @@ -578,7 +602,6 @@ or config maps. The operator does not sync accounts/role bindings after creation. */ func (c *Controller) submitRBACCredentials(event ClusterEvent) error { - namespace := event.NewSpec.GetNamespace() if err := c.createPodServiceAccount(namespace); err != nil { @@ -592,7 +615,6 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error { } func (c *Controller) createPodServiceAccount(namespace string) error { - podServiceAccountName := c.opConfig.PodServiceAccountName _, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{}) if k8sutil.ResourceNotFound(err) { @@ -615,7 +637,6 @@ func (c *Controller) createPodServiceAccount(namespace string) error { } func (c *Controller) createRoleBindings(namespace string) error { - podServiceAccountName := c.opConfig.PodServiceAccountName podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name