Fix deletion timestamp handling for clusters with finalizers
When a Postgres cluster has a finalizer, deleting it sets a DeletionTimestamp but doesn't remove the object until the finalizer is cleared. The operator was not properly handling these DeletionTimestamp changes: 1. postgresqlUpdate() was filtering out events where only DeletionTimestamp changed (it only checked Spec and Annotations), causing the delete to never be processed. 2. EventUpdate case in processEvent() didn't check for DeletionTimestamp, so even if the event reached the processor, it would run Update() instead of Delete(). 3. removeFinalizer() used a cached object with stale resourceVersion, causing "object has been modified" errors. Fixes: - Add explicit DeletionTimestamp check in postgresqlUpdate() to queue the event - Add DeletionTimestamp check in EventUpdate to call Delete() when set - Fetch latest object from API before removing finalizer to avoid conflicts
This commit is contained in:
parent
a27727f8d0
commit
9ca87d9db4
|
|
@ -3,6 +3,7 @@ package cluster
|
|||
// Postgres CustomResourceDefinition object i.e. Spilo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
|
@ -70,7 +71,7 @@ type kubeResources struct {
|
|||
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
|
||||
LogicalBackupJob *batchv1.CronJob
|
||||
Streams map[string]*zalandov1.FabricEventStream
|
||||
//Pods are treated separately
|
||||
// Pods are treated separately
|
||||
}
|
||||
|
||||
// Cluster describes postgresql cluster
|
||||
|
|
@ -95,7 +96,7 @@ type Cluster struct {
|
|||
|
||||
teamsAPIClient teams.Interface
|
||||
oauthTokenGetter OAuthTokenGetter
|
||||
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
||||
KubeClient k8sutil.KubernetesClient // TODO: move clients to the better place?
|
||||
currentProcess Process
|
||||
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
|
||||
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
|
||||
|
|
@ -149,7 +150,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
|
|||
PatroniEndpoints: make(map[string]*v1.Endpoints),
|
||||
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
|
||||
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
|
||||
Streams: make(map[string]*zalandov1.FabricEventStream)},
|
||||
Streams: make(map[string]*zalandov1.FabricEventStream),
|
||||
},
|
||||
userSyncStrategy: users.DefaultUserSyncStrategy{
|
||||
PasswordEncryption: passwordEncryption,
|
||||
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
|
||||
|
|
@ -276,7 +278,7 @@ func (c *Cluster) Create() (err error) {
|
|||
errStatus error
|
||||
)
|
||||
if err == nil {
|
||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
|
||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) // TODO: are you sure it's running?
|
||||
} else {
|
||||
c.logger.Warningf("cluster created failed: %v", err)
|
||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
|
||||
|
|
@ -440,7 +442,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
|
|||
var match, needsRollUpdate, needsReplace bool
|
||||
|
||||
match = true
|
||||
//TODO: improve me
|
||||
// TODO: improve me
|
||||
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
|
||||
match = false
|
||||
reasons = append(reasons, "new statefulset's number of replicas does not match the current one")
|
||||
|
|
@ -672,7 +674,6 @@ func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resourc
|
|||
}
|
||||
}
|
||||
return true
|
||||
|
||||
}
|
||||
|
||||
func compareEnv(a, b []v1.EnvVar) bool {
|
||||
|
|
@ -707,9 +708,7 @@ func compareEnv(a, b []v1.EnvVar) bool {
|
|||
}
|
||||
|
||||
func compareSpiloConfiguration(configa, configb string) bool {
|
||||
var (
|
||||
oa, ob spiloConfiguration
|
||||
)
|
||||
var oa, ob spiloConfiguration
|
||||
|
||||
var err error
|
||||
err = json.Unmarshal([]byte(configa), &oa)
|
||||
|
|
@ -818,7 +817,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]
|
|||
}
|
||||
|
||||
return reason != "", reason
|
||||
|
||||
}
|
||||
|
||||
func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
|
||||
|
|
@ -895,7 +893,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLog
|
|||
}
|
||||
|
||||
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
|
||||
//TODO: improve comparison
|
||||
// TODO: improve comparison
|
||||
if !reflect.DeepEqual(new.Spec, cur.Spec) {
|
||||
return false, "new PDB's spec does not match the current one"
|
||||
}
|
||||
|
|
@ -944,8 +942,17 @@ func (c *Cluster) removeFinalizer() error {
|
|||
}
|
||||
|
||||
c.logger.Infof("removing finalizer %s", finalizerName)
|
||||
finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName)
|
||||
newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)
|
||||
|
||||
// Fetch the latest version of the object to avoid resourceVersion conflicts
|
||||
clusterName := c.clusterName()
|
||||
latestPg, err := c.KubeClient.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get(
|
||||
context.TODO(), clusterName.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error fetching latest postgresql for finalizer removal: %v", err)
|
||||
}
|
||||
|
||||
finalizers := util.RemoveString(latestPg.ObjectMeta.Finalizers, finalizerName)
|
||||
newSpec, err := c.KubeClient.SetFinalizer(clusterName, latestPg, finalizers)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error removing finalizer: %v", err)
|
||||
}
|
||||
|
|
@ -1063,7 +1070,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
}
|
||||
|
||||
c.logger.Debug("syncing secrets")
|
||||
//TODO: mind the secrets of the deleted/new users
|
||||
// TODO: mind the secrets of the deleted/new users
|
||||
if err := c.syncSecrets(); err != nil {
|
||||
c.logger.Errorf("could not sync secrets: %v", err)
|
||||
updateFailed = true
|
||||
|
|
@ -1101,7 +1108,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
|
||||
// logical backup job
|
||||
func() {
|
||||
|
||||
// create if it did not exist
|
||||
if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
|
||||
c.logger.Debug("creating backup cron job")
|
||||
|
|
@ -1129,7 +1135,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
updateFailed = true
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
// Roles and Databases
|
||||
|
|
@ -1206,7 +1211,7 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
|
|||
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
|
||||
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
|
||||
func (c *Cluster) Delete() error {
|
||||
var anyErrors = false
|
||||
anyErrors := false
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
|
||||
|
|
@ -1297,7 +1302,6 @@ func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) {
|
|||
c.specMu.RLock()
|
||||
defer c.specMu.RUnlock()
|
||||
return !c.Status.Success(), c.Status
|
||||
|
||||
}
|
||||
|
||||
// ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue.
|
||||
|
|
@ -1406,7 +1410,6 @@ func (c *Cluster) initSystemUsers() {
|
|||
}
|
||||
|
||||
func (c *Cluster) initPreparedDatabaseRoles() error {
|
||||
|
||||
if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
|
||||
c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}}
|
||||
}
|
||||
|
|
@ -1472,10 +1475,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error {
|
|||
}
|
||||
|
||||
func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error {
|
||||
|
||||
for defaultRole, inherits := range defaultRoles {
|
||||
namespace := c.Namespace
|
||||
//if namespaced secrets are allowed
|
||||
// if namespaced secrets are allowed
|
||||
if secretNamespace != "" {
|
||||
if c.Config.OpConfig.EnableCrossNamespaceSecret {
|
||||
namespace = secretNamespace
|
||||
|
|
@ -1543,7 +1545,7 @@ func (c *Cluster) initRobotUsers() error {
|
|||
}
|
||||
}
|
||||
|
||||
//if namespaced secrets are allowed
|
||||
// if namespaced secrets are allowed
|
||||
if c.Config.OpConfig.EnableCrossNamespaceSecret {
|
||||
if strings.Contains(username, ".") {
|
||||
splits := strings.Split(username, ".")
|
||||
|
|
@ -1594,7 +1596,6 @@ func (c *Cluster) initAdditionalOwnerRoles() {
|
|||
|
||||
func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error {
|
||||
teamMembers, err := c.getTeamMembers(teamID)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err)
|
||||
}
|
||||
|
|
@ -1633,7 +1634,6 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e
|
|||
}
|
||||
|
||||
func (c *Cluster) initHumanUsers() error {
|
||||
|
||||
var clusterIsOwnedBySuperuserTeam bool
|
||||
superuserTeams := []string{}
|
||||
|
||||
|
|
|
|||
|
|
@ -277,7 +277,7 @@ func (c *Controller) initRoleBinding() {
|
|||
}`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name)
|
||||
c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue)
|
||||
}
|
||||
c.logger.Info("Parse role bindings")
|
||||
|
||||
// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
|
||||
decode := scheme.Codecs.UniversalDeserializer().Decode
|
||||
obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil)
|
||||
|
|
|
|||
|
|
@ -258,13 +258,26 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
|||
|
||||
lg.Infoln("cluster has been created")
|
||||
case EventUpdate:
|
||||
lg.Infoln("update of the cluster started")
|
||||
|
||||
if !clusterFound {
|
||||
lg.Warningln("cluster does not exist")
|
||||
return
|
||||
}
|
||||
c.curWorkerCluster.Store(event.WorkerID, cl)
|
||||
|
||||
// Check if this cluster has been marked for deletion
|
||||
if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
|
||||
if err = cl.Delete(); err != nil {
|
||||
cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
|
||||
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
|
||||
lg.Error(cl.Error)
|
||||
return
|
||||
}
|
||||
lg.Infoln("cluster has been deleted via update event")
|
||||
return
|
||||
}
|
||||
|
||||
lg.Infoln("update of the cluster started")
|
||||
err = cl.Update(event.OldSpec, event.NewSpec)
|
||||
if err != nil {
|
||||
cl.Error = fmt.Sprintf("could not update cluster: %v", err)
|
||||
|
|
@ -379,7 +392,6 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
|
|||
}
|
||||
|
||||
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
|
||||
|
||||
deprecate := func(deprecated, replacement string) {
|
||||
c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
|
||||
}
|
||||
|
|
@ -425,7 +437,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
|||
clusterError string
|
||||
)
|
||||
|
||||
if informerOldSpec != nil { //update, delete
|
||||
if informerOldSpec != nil { // update, delete
|
||||
uid = informerOldSpec.GetUID()
|
||||
clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
|
||||
|
||||
|
|
@ -440,7 +452,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
|||
} else {
|
||||
clusterError = informerOldSpec.Error
|
||||
}
|
||||
} else { //add, sync
|
||||
} else { // add, sync
|
||||
uid = informerNewSpec.GetUID()
|
||||
clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
|
||||
clusterError = informerNewSpec.Error
|
||||
|
|
@ -539,7 +551,19 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
|
|||
pgOld := c.postgresqlCheck(prev)
|
||||
pgNew := c.postgresqlCheck(cur)
|
||||
if pgOld != nil && pgNew != nil {
|
||||
// Avoid the inifinite recursion for status updates
|
||||
clusterName := util.NameFromMeta(pgNew.ObjectMeta)
|
||||
|
||||
// Check if DeletionTimestamp was set (resource marked for deletion)
|
||||
deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero()
|
||||
if deletionTimestampChanged {
|
||||
c.logger.WithField("cluster-name", clusterName).Infof(
|
||||
"UPDATE event: DeletionTimestamp set to %s, queueing event",
|
||||
pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
|
||||
c.queueClusterEvent(pgOld, pgNew, EventUpdate)
|
||||
return
|
||||
}
|
||||
|
||||
// Avoid the infinite recursion for status updates
|
||||
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
|
||||
if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
|
||||
return
|
||||
|
|
@ -578,7 +602,6 @@ or config maps.
|
|||
The operator does not sync accounts/role bindings after creation.
|
||||
*/
|
||||
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
|
||||
|
||||
namespace := event.NewSpec.GetNamespace()
|
||||
|
||||
if err := c.createPodServiceAccount(namespace); err != nil {
|
||||
|
|
@ -592,7 +615,6 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
|
|||
}
|
||||
|
||||
func (c *Controller) createPodServiceAccount(namespace string) error {
|
||||
|
||||
podServiceAccountName := c.opConfig.PodServiceAccountName
|
||||
_, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
|
||||
if k8sutil.ResourceNotFound(err) {
|
||||
|
|
@ -615,7 +637,6 @@ func (c *Controller) createPodServiceAccount(namespace string) error {
|
|||
}
|
||||
|
||||
func (c *Controller) createRoleBindings(namespace string) error {
|
||||
|
||||
podServiceAccountName := c.opConfig.PodServiceAccountName
|
||||
podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue