Fix deletion timestamp handling for clusters with finalizers (#3015)

When a Postgres cluster has a finalizer, deleting it sets a DeletionTimestamp
but doesn't remove the object until the finalizer is cleared. The operator
was not properly handling these DeletionTimestamp changes:

1. postgresqlUpdate() was filtering out events where only DeletionTimestamp
   changed (it only checked Spec and Annotations), causing the delete to
   never be processed.

2. EventUpdate case in processEvent() didn't check for DeletionTimestamp,
   so even if the event reached the processor, it would run Update() instead
   of Delete().

3. removeFinalizer() used a cached object with stale resourceVersion,
   causing "object has been modified" errors.

Fixes:
- Add explicit DeletionTimestamp check in postgresqlUpdate() to queue the event
- Add DeletionTimestamp check in EventUpdate to call Delete() when set
- Fetch latest object from API before removing finalizer to avoid conflicts

Co-authored-by: Felix Kunde <felix-kunde@gmx.de>
This commit is contained in:
thoro 2026-06-01 09:18:20 +02:00 committed by GitHub
parent e871a167ed
commit f988e4cf0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 54 additions and 33 deletions

View File

@ -3,6 +3,7 @@ package cluster
// Postgres CustomResourceDefinition object i.e. Spilo // Postgres CustomResourceDefinition object i.e. Spilo
import ( import (
"context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -71,7 +72,7 @@ type kubeResources struct {
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately // Pods are treated separately
} }
// Cluster describes postgresql cluster // Cluster describes postgresql cluster
@ -96,7 +97,7 @@ type Cluster struct {
teamsAPIClient teams.Interface teamsAPIClient teams.Interface
oauthTokenGetter OAuthTokenGetter oauthTokenGetter OAuthTokenGetter
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? KubeClient k8sutil.KubernetesClient // TODO: move clients to the better place?
currentProcess Process currentProcess Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
@ -150,7 +151,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
PatroniEndpoints: make(map[string]*v1.Endpoints), PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap), PatroniConfigMaps: make(map[string]*v1.ConfigMap),
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim), VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
Streams: make(map[string]*zalandov1.FabricEventStream)}, Streams: make(map[string]*zalandov1.FabricEventStream),
},
userSyncStrategy: users.DefaultUserSyncStrategy{ userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption, PasswordEncryption: passwordEncryption,
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix, RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
@ -445,7 +447,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
var match, needsRollUpdate, needsReplace bool var match, needsRollUpdate, needsReplace bool
match = true match = true
//TODO: improve me // TODO: improve me
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas { if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
match = false match = false
reasons = append(reasons, "new statefulset's number of replicas does not match the current one") reasons = append(reasons, "new statefulset's number of replicas does not match the current one")
@ -682,7 +684,6 @@ func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resourc
} }
} }
return true return true
} }
func compareEnv(a, b []v1.EnvVar) bool { func compareEnv(a, b []v1.EnvVar) bool {
@ -717,9 +718,7 @@ func compareEnv(a, b []v1.EnvVar) bool {
} }
func compareSpiloConfiguration(configa, configb string) bool { func compareSpiloConfiguration(configa, configb string) bool {
var ( var oa, ob spiloConfiguration
oa, ob spiloConfiguration
)
var err error var err error
err = json.Unmarshal([]byte(configa), &oa) err = json.Unmarshal([]byte(configa), &oa)
@ -828,7 +827,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]
} }
return reason != "", reason return reason != "", reason
} }
func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
@ -905,7 +903,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLog
} }
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) { func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison // TODO: improve comparison
if !reflect.DeepEqual(new.Spec, cur.Spec) { if !reflect.DeepEqual(new.Spec, cur.Spec) {
return false, "new PDB's spec does not match the current one" return false, "new PDB's spec does not match the current one"
} }
@ -954,8 +952,17 @@ func (c *Cluster) removeFinalizer() error {
} }
c.logger.Infof("removing finalizer %s", finalizerName) c.logger.Infof("removing finalizer %s", finalizerName)
finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers) // Fetch the latest version of the object to avoid resourceVersion conflicts
clusterName := c.clusterName()
latestPg, err := c.KubeClient.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get(
context.TODO(), clusterName.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching latest postgresql for finalizer removal: %v", err)
}
finalizers := util.RemoveString(latestPg.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(clusterName, latestPg, finalizers)
if err != nil { if err != nil {
return fmt.Errorf("error removing finalizer: %v", err) return fmt.Errorf("error removing finalizer: %v", err)
} }
@ -1078,7 +1085,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
} }
c.logger.Debug("syncing secrets") c.logger.Debug("syncing secrets")
//TODO: mind the secrets of the deleted/new users // TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil { if err := c.syncSecrets(); err != nil {
c.logger.Errorf("could not sync secrets: %v", err) c.logger.Errorf("could not sync secrets: %v", err)
updateFailed = true updateFailed = true
@ -1116,7 +1123,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// logical backup job // logical backup job
func() { func() {
// create if it did not exist // create if it did not exist
if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup { if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
c.logger.Debug("creating backup cron job") c.logger.Debug("creating backup cron job")
@ -1144,7 +1150,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true updateFailed = true
} }
} }
}() }()
// Roles and Databases // Roles and Databases
@ -1221,7 +1226,7 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
// before the pods, it will be re-created by the current master pod and will remain, obstructing the // before the pods, it will be re-created by the current master pod and will remain, obstructing the
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last. // creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
func (c *Cluster) Delete() error { func (c *Cluster) Delete() error {
var anyErrors = false anyErrors := false
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
@ -1312,7 +1317,6 @@ func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) {
c.specMu.RLock() c.specMu.RLock()
defer c.specMu.RUnlock() defer c.specMu.RUnlock()
return !c.Status.Success(), c.Status return !c.Status.Success(), c.Status
} }
// ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue. // ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue.
@ -1421,7 +1425,6 @@ func (c *Cluster) initSystemUsers() {
} }
func (c *Cluster) initPreparedDatabaseRoles() error { func (c *Cluster) initPreparedDatabaseRoles() error {
if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}} c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}}
} }
@ -1487,10 +1490,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error {
} }
func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error { func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error {
for defaultRole, inherits := range defaultRoles { for defaultRole, inherits := range defaultRoles {
namespace := c.Namespace namespace := c.Namespace
//if namespaced secrets are allowed // if namespaced secrets are allowed
if secretNamespace != "" { if secretNamespace != "" {
if c.Config.OpConfig.EnableCrossNamespaceSecret { if c.Config.OpConfig.EnableCrossNamespaceSecret {
namespace = secretNamespace namespace = secretNamespace
@ -1558,7 +1560,7 @@ func (c *Cluster) initRobotUsers() error {
} }
} }
//if namespaced secrets are allowed // if namespaced secrets are allowed
if c.Config.OpConfig.EnableCrossNamespaceSecret { if c.Config.OpConfig.EnableCrossNamespaceSecret {
if strings.Contains(username, ".") { if strings.Contains(username, ".") {
splits := strings.Split(username, ".") splits := strings.Split(username, ".")
@ -1609,7 +1611,6 @@ func (c *Cluster) initAdditionalOwnerRoles() {
func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error { func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error {
teamMembers, err := c.getTeamMembers(teamID) teamMembers, err := c.getTeamMembers(teamID)
if err != nil { if err != nil {
return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err) return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err)
} }
@ -1648,7 +1649,6 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e
} }
func (c *Cluster) initHumanUsers() error { func (c *Cluster) initHumanUsers() error {
var clusterIsOwnedBySuperuserTeam bool var clusterIsOwnedBySuperuserTeam bool
superuserTeams := []string{} superuserTeams := []string{}

View File

@ -277,7 +277,7 @@ func (c *Controller) initRoleBinding() {
}`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name) }`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name)
c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue) c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue)
} }
c.logger.Info("Parse role bindings")
// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation // re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
decode := scheme.Codecs.UniversalDeserializer().Decode decode := scheme.Codecs.UniversalDeserializer().Decode
obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil) obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil)

View File

@ -259,13 +259,26 @@ func (c *Controller) processEvent(event ClusterEvent) {
lg.Infoln("cluster has been created") lg.Infoln("cluster has been created")
case EventUpdate: case EventUpdate:
lg.Infoln("update of the cluster started")
if !clusterFound { if !clusterFound {
lg.Warningln("cluster does not exist") lg.Warningln("cluster does not exist")
return return
} }
c.curWorkerCluster.Store(event.WorkerID, cl) c.curWorkerCluster.Store(event.WorkerID, cl)
// Check if this cluster has been marked for deletion
if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
if err = cl.Delete(); err != nil {
cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
lg.Error(cl.Error)
return
}
lg.Infoln("cluster has been deleted via update event")
return
}
lg.Infoln("update of the cluster started")
err = cl.Update(event.OldSpec, event.NewSpec) err = cl.Update(event.OldSpec, event.NewSpec)
if err != nil { if err != nil {
cl.Error = fmt.Sprintf("could not update cluster: %v", err) cl.Error = fmt.Sprintf("could not update cluster: %v", err)
@ -380,7 +393,6 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
} }
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) { func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
deprecate := func(deprecated, replacement string) { deprecate := func(deprecated, replacement string) {
c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement) c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
} }
@ -426,7 +438,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
clusterError string clusterError string
) )
if informerOldSpec != nil { //update, delete if informerOldSpec != nil { // update, delete
uid = informerOldSpec.GetUID() uid = informerOldSpec.GetUID()
clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta) clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
@ -441,7 +453,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
} else { } else {
clusterError = informerOldSpec.Error clusterError = informerOldSpec.Error
} }
} else { //add, sync } else { // add, sync
uid = informerNewSpec.GetUID() uid = informerNewSpec.GetUID()
clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta) clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
clusterError = informerNewSpec.Error clusterError = informerNewSpec.Error
@ -552,7 +564,19 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
pgOld := c.postgresqlCheck(prev) pgOld := c.postgresqlCheck(prev)
pgNew := c.postgresqlCheck(cur) pgNew := c.postgresqlCheck(cur)
if pgOld != nil && pgNew != nil { if pgOld != nil && pgNew != nil {
// Avoid the inifinite recursion for status updates clusterName := util.NameFromMeta(pgNew.ObjectMeta)
// Check if DeletionTimestamp was set (resource marked for deletion)
deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero()
if deletionTimestampChanged {
c.logger.WithField("cluster-name", clusterName).Infof(
"UPDATE event: DeletionTimestamp set to %s, queueing event",
pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
c.queueClusterEvent(pgOld, pgNew, EventUpdate)
return
}
// Avoid the infinite recursion for status updates
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) { if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
return return
@ -591,7 +615,6 @@ or config maps.
The operator does not sync accounts/role bindings after creation. The operator does not sync accounts/role bindings after creation.
*/ */
func (c *Controller) submitRBACCredentials(event ClusterEvent) error { func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
namespace := event.NewSpec.GetNamespace() namespace := event.NewSpec.GetNamespace()
if err := c.createPodServiceAccount(namespace); err != nil { if err := c.createPodServiceAccount(namespace); err != nil {
@ -605,7 +628,6 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
} }
func (c *Controller) createPodServiceAccount(namespace string) error { func (c *Controller) createPodServiceAccount(namespace string) error {
podServiceAccountName := c.opConfig.PodServiceAccountName podServiceAccountName := c.opConfig.PodServiceAccountName
_, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{}) _, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
if k8sutil.ResourceNotFound(err) { if k8sutil.ResourceNotFound(err) {
@ -628,7 +650,6 @@ func (c *Controller) createPodServiceAccount(namespace string) error {
} }
func (c *Controller) createRoleBindings(namespace string) error { func (c *Controller) createRoleBindings(namespace string) error {
podServiceAccountName := c.opConfig.PodServiceAccountName podServiceAccountName := c.opConfig.PodServiceAccountName
podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name