add unit test and documentation for finalizers (#2509)

* add unit test and documentation for finalizers
* error msg with lower case and cover sync case
* try to avoid adding json-patch dependency
* use Update to remove finalizer
* changing status and finalizer during create
* do not call Delete() twice
This commit is contained in:
Felix Kunde 2024-01-22 12:13:40 +01:00 committed by GitHub
parent 3bad9aaded
commit 4a0c483514
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 245 additions and 118 deletions

View File

@ -328,6 +328,14 @@ configuration they are grouped under the `kubernetes` key.
drained if the node_readiness_label is not used. This option if set to `false`
will not add the `spilo-role=master` selector to the PDB.
* **enable_finalizers**
By default, a deletion of the Postgresql resource will trigger a cleanup of
all child resources. However, if the database cluster is in a broken state
(e.g. failed initialization) and the operator cannot fully sync it, there can
be leftovers from a DELETE event. By enabling finalizers the Operator will
ensure all managed resources are deleted prior to the Postgresql resource.
The default is `false`.
* **enable_pod_disruption_budget**
PDB is enabled by default to protect the cluster from voluntarily disruptions
and hence unwanted DB downtime. However, on some cloud providers it could be

Binary file not shown.

View File

@ -13,7 +13,6 @@ import (
"sync"
"time"
jsonpatch "github.com/evanphx/json-patch"
"github.com/sirupsen/logrus"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
@ -248,6 +247,7 @@ func (c *Cluster) Create() (err error) {
defer c.mu.Unlock()
var (
pgCreateStatus *acidv1.Postgresql
service *v1.Service
ep *v1.Endpoints
ss *appsv1.StatefulSet
@ -261,11 +261,15 @@ func (c *Cluster) Create() (err error) {
}
}()
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
if err != nil {
return fmt.Errorf("could not set cluster status: %v", err)
}
c.setSpec(pgCreateStatus)
if c.OpConfig.EnableFinalizers != nil && *c.OpConfig.EnableFinalizers {
c.logger.Info("Adding finalizer.")
if err = c.AddFinalizer(); err != nil {
return fmt.Errorf("could not add Finalizer: %v", err)
if err = c.addFinalizer(); err != nil {
return fmt.Errorf("could not add finalizer: %v", err)
}
}
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
@ -771,60 +775,45 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
return true, ""
}
// AddFinalizer patches the postgresql CR to add our finalizer.
func (c *Cluster) AddFinalizer() error {
if c.HasFinalizer() {
c.logger.Debugf("Finalizer %s already exists.", finalizerName)
// addFinalizer patches the postgresql CR to add finalizer
func (c *Cluster) addFinalizer() error {
if c.hasFinalizer() {
return nil
}
currentSpec := c.DeepCopy()
newSpec := c.DeepCopy()
newSpec.ObjectMeta.SetFinalizers(append(newSpec.ObjectMeta.Finalizers, finalizerName))
patchBytes, err := getPatchBytes(currentSpec, newSpec)
c.logger.Infof("adding finalizer %s", finalizerName)
finalizers := append(c.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)
if err != nil {
return fmt.Errorf("Unable to produce patch to add finalizer: %v", err)
return fmt.Errorf("error adding finalizer: %v", err)
}
updatedSpec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(
context.TODO(), c.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
// update the spec, maintaining the new resourceVersion
c.setSpec(newSpec)
return nil
}
// removeFinalizer patches postgresql CR to remove finalizer
func (c *Cluster) removeFinalizer() error {
if !c.hasFinalizer() {
return nil
}
c.logger.Infof("removing finalizer %s", finalizerName)
finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)
if err != nil {
return fmt.Errorf("Could not add finalizer: %v", err)
return fmt.Errorf("error removing finalizer: %v", err)
}
// update the spec, maintaining the new resourceVersion.
c.setSpec(updatedSpec)
return nil
}
// RemoveFinalizer patches postgresql CR to remove finalizer.
func (c *Cluster) RemoveFinalizer() error {
if !c.HasFinalizer() {
c.logger.Debugf("No finalizer %s exists to remove.", finalizerName)
return nil
}
currentSpec := c.DeepCopy()
newSpec := c.DeepCopy()
newSpec.ObjectMeta.SetFinalizers(removeString(newSpec.ObjectMeta.Finalizers, finalizerName))
patchBytes, err := getPatchBytes(currentSpec, newSpec)
if err != nil {
return fmt.Errorf("Unable to produce patch to remove finalizer: %v", err)
}
updatedSpec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(
context.TODO(), c.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("Could not remove finalizer: %v", err)
}
// update the spec, maintaining the new resourceVersion.
c.setSpec(updatedSpec)
c.setSpec(newSpec)
return nil
}
// HasFinalizer checks if our finalizer is currently set or not
func (c *Cluster) HasFinalizer() bool {
// hasFinalizer checks if finalizer is currently set or not
func (c *Cluster) hasFinalizer() bool {
for _, finalizer := range c.ObjectMeta.Finalizers {
if finalizer == finalizerName {
return true
@ -833,36 +822,6 @@ func (c *Cluster) HasFinalizer() bool {
return false
}
// Iterate through slice and remove certain string, then return cleaned slice
func removeString(slice []string, s string) (result []string) {
for _, item := range slice {
if item == s {
continue
}
result = append(result, item)
}
return result
}
// getPatchBytes will produce a JSONpatch between the two parameters of type acidv1.Postgresql
func getPatchBytes(oldSpec, newSpec *acidv1.Postgresql) ([]byte, error) {
oldData, err := json.Marshal(oldSpec)
if err != nil {
return nil, fmt.Errorf("failed to Marshal oldSpec for postgresql %s/%s: %v", oldSpec.Namespace, oldSpec.Name, err)
}
newData, err := json.Marshal(newSpec)
if err != nil {
return nil, fmt.Errorf("failed to Marshal newSpec for postgresql %s/%s: %v", newSpec.Namespace, newSpec.Name, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, fmt.Errorf("failed to CreateMergePatch for postgresl %s/%s: %v", oldSpec.Namespace, oldSpec.Name, err)
}
return patchBytes, nil
}
// Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object
// (i.e. service) is treated as an error
// logical backup cron jobs are an exception: a user-initiated Update can enable a logical backup job
@ -1106,40 +1065,41 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
func (c *Cluster) Delete() error {
var anyErrors = false
c.mu.Lock()
defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
if err := c.deleteStreams(); err != nil {
anyErrors = true
c.logger.Warningf("could not delete event streams: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete event streams: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete event streams: %v", err)
}
var anyErrors = false
// delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods
// deleting the cron job also removes pods and batch jobs it created
if err := c.deleteLogicalBackupJob(); err != nil {
anyErrors = true
c.logger.Warningf("could not remove the logical backup k8s cron job; %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not remove the logical backup k8s cron job; %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not remove the logical backup k8s cron job; %v", err)
}
if err := c.deleteStatefulSet(); err != nil {
anyErrors = true
c.logger.Warningf("could not delete statefulset: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete statefulset: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete statefulset: %v", err)
}
if err := c.deleteSecrets(); err != nil {
anyErrors = true
c.logger.Warningf("could not delete secrets: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete secrets: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete secrets: %v", err)
}
if err := c.deletePodDisruptionBudget(); err != nil {
anyErrors = true
c.logger.Warningf("could not delete pod disruption budget: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete pod disruption budget: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err)
}
for _, role := range []PostgresRole{Master, Replica} {
@ -1148,21 +1108,21 @@ func (c *Cluster) Delete() error {
if err := c.deleteEndpoint(role); err != nil {
anyErrors = true
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete %s endpoint: %v", role, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s endpoint: %v", role, err)
}
}
if err := c.deleteService(role); err != nil {
anyErrors = true
c.logger.Warningf("could not delete %s service: %v", role, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not delete %s service: %v", role, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err)
}
}
if err := c.deletePatroniClusterObjects(); err != nil {
anyErrors = true
c.logger.Warningf("could not remove leftover patroni objects; %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not remove leftover patroni objects; %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not remove leftover patroni objects; %v", err)
}
// Delete connection pooler objects anyway, even if it's not mentioned in the
@ -1172,20 +1132,19 @@ func (c *Cluster) Delete() error {
if err := c.deleteConnectionPooler(role); err != nil {
anyErrors = true
c.logger.Warningf("could not remove connection pooler: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "Could not remove connection pooler: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not remove connection pooler: %v", err)
}
}
// If we are done deleting our various resources we remove the finalizer to let K8S finally delete the Postgres CR
if anyErrors {
c.eventRecorder.Event(c.GetReference(), v1.EventTypeWarning, "Delete", "Some resources could be successfully deleted yet")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeWarning, "Delete", "some resources could be successfully deleted yet")
return fmt.Errorf("some error(s) occured when deleting resources, NOT removing finalizer yet")
}
if err := c.RemoveFinalizer(); err != nil {
return fmt.Errorf("Done cleaning up, but error when trying to remove our finalizer: %v", err)
if err := c.removeFinalizer(); err != nil {
return fmt.Errorf("done cleaning up, but error when removing finalizer: %v", err)
}
c.logger.Info("Done cleaning up our resources, removed finalizer.")
return nil
}

View File

@ -1,11 +1,13 @@
package cluster
import (
"context"
"fmt"
"net/http"
"reflect"
"strings"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@ -33,7 +35,10 @@ const (
)
var logger = logrus.New().WithField("test", "cluster")
var eventRecorder = record.NewFakeRecorder(1)
// eventRecorder needs buffer for TestCreate which emit events for
// 1 cluster, primary endpoint, 2 services, the secrets, the statefulset and pods being ready
var eventRecorder = record.NewFakeRecorder(7)
var cl = New(
Config{
@ -79,6 +84,79 @@ var cl = New(
eventRecorder,
)
func TestCreate(t *testing.T) {
clientSet := fake.NewSimpleClientset()
acidClientSet := fakeacidv1.NewSimpleClientset()
clusterName := "cluster-with-finalizer"
clusterNamespace := "test"
client := k8sutil.KubernetesClient{
DeploymentsGetter: clientSet.AppsV1(),
EndpointsGetter: clientSet.CoreV1(),
PersistentVolumeClaimsGetter: clientSet.CoreV1(),
PodDisruptionBudgetsGetter: clientSet.PolicyV1(),
PodsGetter: clientSet.CoreV1(),
PostgresqlsGetter: acidClientSet.AcidV1(),
ServicesGetter: clientSet.CoreV1(),
SecretsGetter: clientSet.CoreV1(),
StatefulSetsGetter: clientSet.AppsV1(),
}
pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: clusterNamespace,
},
Spec: acidv1.PostgresSpec{
Volume: acidv1.Volume{
Size: "1Gi",
},
},
}
pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-0", clusterName),
Namespace: clusterNamespace,
Labels: map[string]string{
"application": "spilo",
"cluster-name": clusterName,
"spilo-role": "master",
},
},
}
// manually create resources which must be found by further API calls and are not created by cluster.Create()
client.Postgresqls(clusterNamespace).Create(context.TODO(), &pg, metav1.CreateOptions{})
client.Pods(clusterNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
var cluster = New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
ResourceCheckInterval: time.Duration(3),
ResourceCheckTimeout: time.Duration(10),
},
EnableFinalizers: util.True(),
},
}, client, pg, logger, eventRecorder)
err := cluster.Create()
assert.NoError(t, err)
if !cluster.hasFinalizer() {
t.Errorf("%s - expected finalizer not found on cluster", t.Name())
}
}
func TestStatefulSetAnnotations(t *testing.T) {
spec := acidv1.PostgresSpec{
TeamID: "myapp", NumberOfInstances: 1,

View File

@ -610,7 +610,7 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) {
Delete(context.TODO(), deployment.Name, options)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("connection pooler deployment was already deleted")
c.logger.Debugf("connection pooler deployment %s for role %s has already been deleted", deployment.Name, role)
} else if err != nil {
return fmt.Errorf("could not delete connection pooler deployment: %v", err)
}
@ -629,7 +629,7 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) {
Delete(context.TODO(), service.Name, options)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("connection pooler service was already deleted")
c.logger.Debugf("connection pooler service %s for role %s has already been already deleted", service.Name, role)
} else if err != nil {
return fmt.Errorf("could not delete connection pooler service: %v", err)
}

View File

@ -2741,7 +2741,7 @@ func TestGenerateResourceRequirements(t *testing.T) {
clusterNameLabel := "cluster-name"
sidecarName := "postgres-exporter"
// enforceMinResourceLimits will be called 2 twice emitting 4 events (2x cpu, 2x memory raise)
// enforceMinResourceLimits will be called 2 times emitting 4 events (2x cpu, 2x memory raise)
// enforceMaxResourceRequests will be called 4 times emitting 6 events (2x cpu, 4x memory cap)
// hence event bufferSize of 10 is required
newEventRecorder := record.NewFakeRecorder(10)

View File

@ -17,7 +17,6 @@ var VersionMap = map[string]int{
"13": 130000,
"14": 140000,
"15": 150000,
}
// IsBiggerPostgresVersion Compare two Postgres version numbers
@ -104,31 +103,31 @@ func (c *Cluster) majorVersionUpgrade() error {
if c.currentMajorVersion < desiredVersion {
podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name}
c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "Starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
upgradeCommand := fmt.Sprintf("set -o pipefail && /usr/bin/python3 /scripts/inplace_upgrade.py %d 2>&1 | tee last_upgrade.log", numberOfPods)
c.logger.Debugf("checking if the spilo image runs with root or non-root (check for user id=0)")
resultIdCheck, errIdCheck := c.ExecCommand(podName, "/bin/bash", "-c", "/usr/bin/id -u")
if errIdCheck != nil {
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Major Version Upgrade", "Checking user id to run upgrade from %d to %d FAILED: %v", c.currentMajorVersion, desiredVersion, errIdCheck)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Major Version Upgrade", "checking user id to run upgrade from %d to %d FAILED: %v", c.currentMajorVersion, desiredVersion, errIdCheck)
}
resultIdCheck = strings.TrimSuffix(resultIdCheck, "\n")
var result string
if resultIdCheck != "0" {
c.logger.Infof("User id was identified as: %s, hence default user is non-root already", resultIdCheck)
c.logger.Infof("user id was identified as: %s, hence default user is non-root already", resultIdCheck)
result, err = c.ExecCommand(podName, "/bin/bash", "-c", upgradeCommand)
} else {
c.logger.Infof("User id was identified as: %s, using su to reach the postgres user", resultIdCheck)
c.logger.Infof("user id was identified as: %s, using su to reach the postgres user", resultIdCheck)
result, err = c.ExecCommand(podName, "/bin/su", "postgres", "-c", upgradeCommand)
}
if err != nil {
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Major Version Upgrade", "Upgrade from %d to %d FAILED: %v", c.currentMajorVersion, desiredVersion, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Major Version Upgrade", "upgrade from %d to %d FAILED: %v", c.currentMajorVersion, desiredVersion, err)
return err
}
c.logger.Infof("upgrade action triggered and command completed: %s", result[:100])
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "Upgrade from %d to %d finished", c.currentMajorVersion, desiredVersion)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "upgrade from %d to %d finished", c.currentMajorVersion, desiredVersion)
}
}

View File

@ -248,7 +248,7 @@ func (c *Cluster) deleteStatefulSet() error {
err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(context.TODO(), c.Statefulset.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("StatefulSet was already deleted")
c.logger.Debugf("statefulset %q has already been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta))
} else if err != nil {
return err
}
@ -346,7 +346,7 @@ func (c *Cluster) deleteService(role PostgresRole) error {
if err := c.KubeClient.Services(c.Services[role].Namespace).Delete(context.TODO(), c.Services[role].Name, c.deleteOptions); err != nil {
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Service was already deleted")
c.logger.Debugf("%s service has already been deleted", role)
} else if err != nil {
return err
}
@ -455,7 +455,7 @@ func (c *Cluster) deletePodDisruptionBudget() error {
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace).
Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("PodDisruptionBudget was already deleted")
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta))
} else if err != nil {
return fmt.Errorf("could not delete PodDisruptionBudget: %v", err)
}
@ -490,13 +490,13 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error {
if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(context.TODO(), c.Endpoints[role].Name, c.deleteOptions); err != nil {
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Endpoint was already deleted")
c.logger.Debugf("%s endpoint has already been deleted", role)
} else if err != nil {
return fmt.Errorf("could not delete endpoint: %v", err)
}
}
c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoints[role].ObjectMeta))
c.logger.Infof("%s endpoint %q has been deleted", role, util.NameFromMeta(c.Endpoints[role].ObjectMeta))
delete(c.Endpoints, role)
return nil
@ -526,7 +526,7 @@ func (c *Cluster) deleteSecret(uid types.UID, secret v1.Secret) error {
c.logger.Debugf("deleting secret %q", secretName)
err := c.KubeClient.Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Secret was already deleted")
c.logger.Debugf("secret %q has already been deleted", secretName)
} else if err != nil {
return fmt.Errorf("could not delete secret %q: %v", secretName, err)
}
@ -588,7 +588,7 @@ func (c *Cluster) deleteLogicalBackupJob() error {
err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(context.TODO(), c.getLogicalBackupJobName(), c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("LogicalBackup CronJob was already deleted")
c.logger.Debugf("logical backup cron job %q has already been deleted", c.getLogicalBackupJobName())
} else if err != nil {
return err
}

View File

@ -48,6 +48,10 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
}()
if err = c.syncFinalizer(); err != nil {
c.logger.Debugf("could not sync finalizers: %v", err)
}
if err = c.initUsers(); err != nil {
err = fmt.Errorf("could not init users: %v", err)
return err
@ -144,6 +148,20 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
return err
}
func (c *Cluster) syncFinalizer() error {
var err error
if c.OpConfig.EnableFinalizers != nil && *c.OpConfig.EnableFinalizers {
err = c.addFinalizer()
} else {
err = c.removeFinalizer()
}
if err != nil {
return fmt.Errorf("could not sync finalizer: %v", err)
}
return nil
}
func (c *Cluster) syncServices() error {
for _, role := range []PostgresRole{Master, Replica} {
c.logger.Debugf("syncing %s service", role)

View File

@ -285,15 +285,18 @@ func (c *Controller) processEvent(event ClusterEvent) {
lg.Errorf("unknown cluster: %q", clusterName)
return
}
lg.Infoln("deletion of the cluster started")
teamName := strings.ToLower(cl.Spec.TeamID)
c.curWorkerCluster.Store(event.WorkerID, cl)
// when using finalizers the deletion already happened
if c.opConfig.EnableFinalizers == nil || !*c.opConfig.EnableFinalizers {
lg.Infoln("deletion of the cluster started")
if err := cl.Delete(); err != nil {
cl.Error = fmt.Sprintf("Could not delete cluster: %v", err)
cl.Error = fmt.Sprintf("could not delete cluster: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
}
}
func() {
defer c.clustersMu.Unlock()
@ -329,9 +332,9 @@ func (c *Controller) processEvent(event ClusterEvent) {
// has this cluster been marked as deleted already, then we shall start cleaning up
if !cl.ObjectMeta.DeletionTimestamp.IsZero() {
lg.Infof("Cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", cl.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
if err = cl.Delete(); err != nil {
cl.Error = fmt.Sprintf("Error deleting cluster and its resources: %v", err)
cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
lg.Error(cl.Error)
return

View File

@ -213,6 +213,37 @@ func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.Namespaced
return pg, nil
}
// SetFinalizer of Postgres cluster
func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql, finalizers []string) (*apiacidv1.Postgresql, error) {
var (
updatedPg *apiacidv1.Postgresql
patch []byte
err error
)
pg.ObjectMeta.SetFinalizers(finalizers)
if len(finalizers) > 0 {
patch, err = json.Marshal(struct {
PgMetadata interface{} `json:"metadata"`
}{&pg.ObjectMeta})
if err != nil {
return pg, fmt.Errorf("could not marshal ObjectMeta: %v", err)
}
updatedPg, err = client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Patch(
context.TODO(), clusterName.Name, types.MergePatchType, patch, metav1.PatchOptions{})
} else {
// in case finalizers are empty and update is needed to remove
updatedPg, err = client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Update(
context.TODO(), pg, metav1.UpdateOptions{})
}
if err != nil {
return updatedPg, fmt.Errorf("could not set finalizer: %v", err)
}
return updatedPg, nil
}
// SamePDB compares the PodDisruptionBudgets
func SamePDB(cur, new *apipolicyv1.PodDisruptionBudget) (match bool, reason string) {
//TODO: improve comparison

View File

@ -152,6 +152,17 @@ func IsEqualIgnoreOrder(a, b []string) bool {
return reflect.DeepEqual(a_copy, b_copy)
}
// Iterate through slice and remove certain string, then return cleaned slice
func RemoveString(slice []string, s string) (result []string) {
for _, item := range slice {
if item == s {
continue
}
result = append(result, item)
}
return result
}
// SliceReplaceElement
func StringSliceReplaceElement(s []string, a, b string) (result []string) {
tmp := make([]string, 0, len(s))

View File

@ -66,6 +66,17 @@ var substractTest = []struct {
{[]string{"a"}, []string{""}, []string{"a"}, false},
}
var removeStringTest = []struct {
slice []string
item string
result []string
}{
{[]string{"a", "b", "c"}, "b", []string{"a", "c"}},
{[]string{"a"}, "b", []string{"a"}},
{[]string{"a"}, "a", []string{}},
{[]string{}, "a", []string{}},
}
var sliceContaintsTest = []struct {
slice []string
item string
@ -200,6 +211,15 @@ func TestFindNamedStringSubmatch(t *testing.T) {
}
}
func TestRemoveString(t *testing.T) {
for _, tt := range removeStringTest {
res := RemoveString(tt.slice, tt.item)
if !IsEqualIgnoreOrder(res, tt.result) {
t.Errorf("RemoveString expected: %#v, got: %#v", tt.result, res)
}
}
}
func TestSliceContains(t *testing.T) {
for _, tt := range sliceContaintsTest {
res := SliceContains(tt.slice, tt.item)