diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml index a8025c212..4c5f7a7bf 100644 --- a/charts/postgres-operator/crds/operatorconfigurations.yaml +++ b/charts/postgres-operator/crds/operatorconfigurations.yaml @@ -205,6 +205,9 @@ spec: enable_cross_namespace_secret: type: boolean default: false + enable_finalizers: + type: boolean + default: false enable_init_containers: type: boolean default: true diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index 85adb2428..bf44b765f 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -123,6 +123,10 @@ configKubernetes: # allow user secrets in other namespaces than the Postgres cluster enable_cross_namespace_secret: false + # use finalizers to ensure all managed resources are deleted prior to the postgresql CR + # this avoids stale resources in case the operator misses a delete event or is not running + # during deletion + enable_finalizers: false # enables initContainers to run actions before Spilo is started enable_init_containers: true # toggles pod anti affinity on the Postgres pods @@ -166,7 +170,7 @@ configKubernetes: # defines the template for PDB (Pod Disruption Budget) names pdb_name_format: "postgres-{cluster}-pdb" # specify the PVC retention policy when scaling down and/or deleting - persistent_volume_claim_retention_policy: + persistent_volume_claim_retention_policy: when_deleted: "retain" when_scaled: "retain" # switches pod anti affinity type to `preferredDuringSchedulingIgnoredDuringExecution` diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index e5a089978..69d591e42 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -104,6 +104,7 @@ type KubernetesMetaConfiguration struct { PersistentVolumeClaimRetentionPolicy map[string]string `json:"persistent_volume_claim_retention_policy,omitempty"` EnableReadinessProbe bool `json:"enable_readiness_probe,omitempty"` EnableCrossNamespaceSecret bool `json:"enable_cross_namespace_secret,omitempty"` + EnableFinalizers *bool `json:"enable_finalizers,omitempty"` } // PostgresPodResourcesDefaults defines the spec of default resources diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index db947802c..dca1f2eee 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -13,6 +13,7 @@ import ( "sync" "time" + jsonpatch "github.com/evanphx/json-patch" "github.com/sirupsen/logrus" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" @@ -44,6 +45,7 @@ var ( databaseNameRegexp = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$") userRegexp = regexp.MustCompile(`^[a-z0-9]([-_a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-_a-z0-9]*[a-z0-9])?)*$`) patroniObjectSuffixes = []string{"leader", "config", "sync", "failover"} + finalizerName = "postgres-operator.acid.zalan.do" ) // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. @@ -260,6 +262,12 @@ func (c *Cluster) Create() (err error) { }() c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating) + if c.OpConfig.EnableFinalizers != nil && *c.OpConfig.EnableFinalizers { + c.logger.Info("Adding finalizer.") + if err = c.AddFinalizer(); err != nil { + return fmt.Errorf("could not add Finalizer: %v", err) + } + } c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources") for _, role := range []PostgresRole{Master, Replica} { @@ -763,6 +771,98 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { return true, "" } +// AddFinalizer patches the postgresql CR to add our finalizer. +func (c *Cluster) AddFinalizer() error { + if c.HasFinalizer() { + c.logger.Debugf("Finalizer %s already exists.", finalizerName) + return nil + } + + currentSpec := c.DeepCopy() + newSpec := c.DeepCopy() + newSpec.ObjectMeta.SetFinalizers(append(newSpec.ObjectMeta.Finalizers, finalizerName)) + patchBytes, err := getPatchBytes(currentSpec, newSpec) + if err != nil { + return fmt.Errorf("Unable to produce patch to add finalizer: %v", err) + } + + updatedSpec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch( + context.TODO(), c.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("Could not add finalizer: %v", err) + } + + // update the spec, maintaining the new resourceVersion. + c.setSpec(updatedSpec) + return nil +} + +// RemoveFinalizer patches postgresql CR to remove finalizer. +func (c *Cluster) RemoveFinalizer() error { + if !c.HasFinalizer() { + c.logger.Debugf("No finalizer %s exists to remove.", finalizerName) + return nil + } + currentSpec := c.DeepCopy() + newSpec := c.DeepCopy() + newSpec.ObjectMeta.SetFinalizers(removeString(newSpec.ObjectMeta.Finalizers, finalizerName)) + patchBytes, err := getPatchBytes(currentSpec, newSpec) + if err != nil { + return fmt.Errorf("Unable to produce patch to remove finalizer: %v", err) + } + + updatedSpec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch( + context.TODO(), c.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("Could not remove finalizer: %v", err) + } + + // update the spec, maintaining the new resourceVersion. + c.setSpec(updatedSpec) + + return nil +} + +// HasFinalizer checks if our finalizer is currently set or not +func (c *Cluster) HasFinalizer() bool { + for _, finalizer := range c.ObjectMeta.Finalizers { + if finalizer == finalizerName { + return true + } + } + return false +} + +// Iterate through slice and remove certain string, then return cleaned slice +func removeString(slice []string, s string) (result []string) { + for _, item := range slice { + if item == s { + continue + } + result = append(result, item) + } + return result +} + +// getPatchBytes will produce a JSONpatch between the two parameters of type acidv1.Postgresql +func getPatchBytes(oldSpec, newSpec *acidv1.Postgresql) ([]byte, error) { + oldData, err := json.Marshal(oldSpec) + if err != nil { + return nil, fmt.Errorf("failed to Marshal oldSpec for postgresql %s/%s: %v", oldSpec.Namespace, oldSpec.Name, err) + } + + newData, err := json.Marshal(newSpec) + if err != nil { + return nil, fmt.Errorf("failed to Marshal newSpec for postgresql %s/%s: %v", newSpec.Namespace, newSpec.Name, err) + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return nil, fmt.Errorf("failed to CreateMergePatch for postgresl %s/%s: %v", oldSpec.Namespace, oldSpec.Name, err) + } + return patchBytes, nil +} + // Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object // (i.e. service) is treated as an error // logical backup cron jobs are an exception: a user-initiated Update can enable a logical backup job @@ -1005,48 +1105,64 @@ func syncResources(a, b *v1.ResourceRequirements) bool { // DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint // before the pods, it will be re-created by the current master pod and will remain, obstructing the // creation of the new cluster with the same name. Therefore, the endpoints should be deleted last. -func (c *Cluster) Delete() { +func (c *Cluster) Delete() error { c.mu.Lock() defer c.mu.Unlock() - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources") if err := c.deleteStreams(); err != nil { c.logger.Warningf("could not delete event streams: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete event streams: %v", err) } + var anyErrors = false // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods // deleting the cron job also removes pods and batch jobs it created if err := c.deleteLogicalBackupJob(); err != nil { + anyErrors = true c.logger.Warningf("could not remove the logical backup k8s cron job; %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not remove the logical backup k8s cron job; %v", err) } if err := c.deleteStatefulSet(); err != nil { + anyErrors = true c.logger.Warningf("could not delete statefulset: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete statefulset: %v", err) } if err := c.deleteSecrets(); err != nil { + anyErrors = true c.logger.Warningf("could not delete secrets: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete secrets: %v", err) } if err := c.deletePodDisruptionBudget(); err != nil { + anyErrors = true c.logger.Warningf("could not delete pod disruption budget: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete pod disruption budget: %v", err) } for _, role := range []PostgresRole{Master, Replica} { if !c.patroniKubernetesUseConfigMaps() { if err := c.deleteEndpoint(role); err != nil { + anyErrors = true c.logger.Warningf("could not delete %s endpoint: %v", role, err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete %s endpoint: %v", role, err) } } if err := c.deleteService(role); err != nil { + anyErrors = true c.logger.Warningf("could not delete %s service: %v", role, err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete %s service: %v", role, err) } } if err := c.deletePatroniClusterObjects(); err != nil { + anyErrors = true c.logger.Warningf("could not remove leftover patroni objects; %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not remove leftover patroni objects; %v", err) } // Delete connection pooler objects anyway, even if it's not mentioned in the @@ -1054,10 +1170,23 @@ func (c *Cluster) Delete() { // wrong for _, role := range [2]PostgresRole{Master, Replica} { if err := c.deleteConnectionPooler(role); err != nil { + anyErrors = true c.logger.Warningf("could not remove connection pooler: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not remove connection pooler: %v", err) } } + // If we are done deleting our various resources we remove the finalizer to let K8S finally delete the Postgres CR + if anyErrors { + c.eventRecorder.Event(c.GetReference(), v1.EventTypeWarning, "Delete", "Some resources could be successfully deleted yet") + return fmt.Errorf("some error(s) occured when deleting resources, NOT removing finalizer yet") + } + if err := c.RemoveFinalizer(); err != nil { + return fmt.Errorf("Done cleaning up, but error when trying to remove our finalizer: %v", err) + } + + c.logger.Info("Done cleaning up our resources, removed finalizer.") + return nil } // NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status). diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index eb68e9fb2..13c7125f5 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -247,9 +247,12 @@ func (c *Cluster) deleteStatefulSet() error { } err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(context.TODO(), c.Statefulset.Name, c.deleteOptions) - if err != nil { + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("StatefulSet was already deleted") + } else if err != nil { return err } + c.logger.Infof("statefulset %q has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) c.Statefulset = nil @@ -336,18 +339,21 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe func (c *Cluster) deleteService(role PostgresRole) error { c.logger.Debugf("deleting service %s", role) - service, ok := c.Services[role] - if !ok { + if c.Services[role] == nil { c.logger.Debugf("No service for %s role was found, nothing to delete", role) return nil } - if err := c.KubeClient.Services(service.Namespace).Delete(context.TODO(), service.Name, c.deleteOptions); err != nil { - return err + if err := c.KubeClient.Services(c.Services[role].Namespace).Delete(context.TODO(), c.Services[role].Name, c.deleteOptions); err != nil { + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Service was already deleted") + } else if err != nil { + return err + } } - c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) - c.Services[role] = nil + c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(c.Services[role].ObjectMeta)) + delete(c.Services, role) return nil } @@ -448,9 +454,12 @@ func (c *Cluster) deletePodDisruptionBudget() error { err := c.KubeClient. PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions) - if err != nil { - return fmt.Errorf("could not delete pod disruption budget: %v", err) + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("PodDisruptionBudget was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete PodDisruptionBudget: %v", err) } + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) c.PodDisruptionBudget = nil @@ -479,14 +488,16 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error { return fmt.Errorf("there is no %s endpoint in the cluster", role) } - if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete( - context.TODO(), c.Endpoints[role].Name, c.deleteOptions); err != nil { - return fmt.Errorf("could not delete endpoint: %v", err) + if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(context.TODO(), c.Endpoints[role].Name, c.deleteOptions); err != nil { + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Endpoint was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete endpoint: %v", err) + } } c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoints[role].ObjectMeta)) - - c.Endpoints[role] = nil + delete(c.Endpoints, role) return nil } @@ -514,7 +525,9 @@ func (c *Cluster) deleteSecret(uid types.UID, secret v1.Secret) error { secretName := util.NameFromMeta(secret.ObjectMeta) c.logger.Debugf("deleting secret %q", secretName) err := c.KubeClient.Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, c.deleteOptions) - if err != nil { + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Secret was already deleted") + } else if err != nil { return fmt.Errorf("could not delete secret %q: %v", secretName, err) } c.logger.Infof("secret %q has been deleted", secretName) @@ -573,7 +586,14 @@ func (c *Cluster) deleteLogicalBackupJob() error { c.logger.Info("removing the logical backup job") - return c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions) + err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions) + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("LogicalBackup CronJob was already deleted") + } else if err != nil { + return err + } + + return nil } // GetServiceMaster returns cluster's kubernetes master Service diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index 346bf27dd..7c964e292 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -91,6 +91,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.SecretNameTemplate = fromCRD.Kubernetes.SecretNameTemplate result.OAuthTokenSecretName = fromCRD.Kubernetes.OAuthTokenSecretName result.EnableCrossNamespaceSecret = fromCRD.Kubernetes.EnableCrossNamespaceSecret + result.EnableFinalizers = util.CoalesceBool(fromCRD.Kubernetes.EnableFinalizers, util.False()) result.InfrastructureRolesSecretName = fromCRD.Kubernetes.InfrastructureRolesSecretName if fromCRD.Kubernetes.InfrastructureRolesDefs != nil { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index ede7a99a3..541378ce7 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -290,9 +290,10 @@ func (c *Controller) processEvent(event ClusterEvent) { teamName := strings.ToLower(cl.Spec.TeamID) c.curWorkerCluster.Store(event.WorkerID, cl) - cl.Delete() - // Fixme - no error handling for delete ? - // c.eventRecorder.Eventf(cl.GetReference, v1.EventTypeWarning, "Delete", "%v", cl.Error) + if err := cl.Delete(); err != nil { + cl.Error = fmt.Sprintf("Could not delete cluster: %v", err) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error) + } func() { defer c.clustersMu.Unlock() @@ -325,16 +326,27 @@ func (c *Controller) processEvent(event ClusterEvent) { } c.curWorkerCluster.Store(event.WorkerID, cl) - err = cl.Sync(event.NewSpec) - if err != nil { - cl.Error = fmt.Sprintf("could not sync cluster: %v", err) - c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error) - lg.Error(cl.Error) - return + + // has this cluster been marked as deleted already, then we shall start cleaning up + if !cl.ObjectMeta.DeletionTimestamp.IsZero() { + lg.Infof("Cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339)) + if err = cl.Delete(); err != nil { + cl.Error = fmt.Sprintf("Error deleting cluster and its resources: %v", err) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error) + lg.Error(cl.Error) + return + } + lg.Infof("cluster has been deleted") + } else { + if err = cl.Sync(event.NewSpec); err != nil { + cl.Error = fmt.Sprintf("could not sync cluster: %v", err) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error) + lg.Error(cl.Error) + return + } + lg.Infof("cluster has been synced") } cl.Error = "" - - lg.Infof("cluster has been synced") } } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 82df74e9c..c0fda940b 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -238,6 +238,7 @@ type Config struct { SetMemoryRequestToLimit bool `name:"set_memory_request_to_limit" default:"false"` EnableLazySpiloUpgrade bool `name:"enable_lazy_spilo_upgrade" default:"false"` EnableCrossNamespaceSecret bool `name:"enable_cross_namespace_secret" default:"false"` + EnableFinalizers *bool `name:"enable_finalizers" default:"false"` EnablePgVersionEnvVar bool `name:"enable_pgversion_env_var" default:"true"` EnableSpiloWalPathCompat bool `name:"enable_spilo_wal_path_compat" default:"false"` EnableTeamIdClusternamePrefix bool `name:"enable_team_id_clustername_prefix" default:"false"`