diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 10eeca7bf..43dd467b5 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -909,22 +909,8 @@ class EndToEndTestCase(unittest.TestCase): ''' k8s = self.k8s - annotation_patch = { - "metadata": { - "annotations": { - "k8s-status": "healthy" - }, - } - } try: - sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default') - old_sts_creation_timestamp = sts.metadata.creation_timestamp - k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch) - svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default') - old_svc_creation_timestamp = svc.metadata.creation_timestamp - k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch) - patch_config_ignored_annotations = { "data": { "ignored_annotations": "k8s-status", @@ -933,6 +919,25 @@ class EndToEndTestCase(unittest.TestCase): k8s.update_config(patch_config_ignored_annotations) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default') + svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default') + + annotation_patch = { + "metadata": { + "annotations": { + "k8s-status": "healthy" + }, + } + } + + old_sts_creation_timestamp = sts.metadata.creation_timestamp + k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch) + old_svc_creation_timestamp = svc.metadata.creation_timestamp + k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch) + + k8s.delete_operator_pod() + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default') new_sts_creation_timestamp = sts.metadata.creation_timestamp svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default') diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index c10dc5fd7..97629ee95 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -102,6 +102,7 @@ rules: - delete - get - update + - patch # to check nodes for node readiness label - apiGroups: - "" diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index fe8f3195b..23004ef9b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -30,6 +30,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + apipolicyv1 "k8s.io/api/policy/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -433,6 +434,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa reasons = append(reasons, "new statefulset's pod management policy do not match") } + if c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy == nil { + c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{ + WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType, + WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType, + } + } if !reflect.DeepEqual(c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy, statefulSet.Spec.PersistentVolumeClaimRetentionPolicy) { match = false needsReplace = true @@ -493,7 +500,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed { match = false needsReplace = true - needsRollUpdate = true reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason) } if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) { @@ -513,9 +519,9 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i)) continue } - if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { + if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed { needsReplace = true - reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name)) + reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: %s", name, reason)) } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name @@ -780,10 +786,6 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { } } - if changed, reason := c.compareAnnotations(old.Annotations, new.Annotations); changed { - return !changed, "new service's annotations does not match the current one:" + reason - } - return true, "" } @@ -801,6 +803,12 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool newImage, curImage) } + newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations + curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations + if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed { + return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason) + } + newPgVersion := getPgVersion(new) curPgVersion := getPgVersion(cur) if newPgVersion != curPgVersion { @@ -818,6 +826,17 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool return true, "" } +func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) { + //TODO: improve comparison + if match := reflect.DeepEqual(new.Spec, cur.Spec); !match { + return false, "new PDB spec does not match the current one" + } + if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed { + return false, "new PDB's annotations does not match the current one:" + reason + } + return true, "" +} + func getPgVersion(cronJob *batchv1.CronJob) string { envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env for _, env := range envs { @@ -883,7 +902,6 @@ func (c *Cluster) hasFinalizer() bool { func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed := false userInitFailed := false - syncStatefulSet := false c.mu.Lock() defer c.mu.Unlock() @@ -914,7 +932,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) { c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed", oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) - syncStatefulSet = true } else { c.logger.Infof("postgresql major version unchanged or smaller, no changes needed") // sticking with old version, this will also advance GetDesiredVersion next time. @@ -922,12 +939,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } // Service - if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) || - !reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) { - if err := c.syncServices(); err != nil { - c.logger.Errorf("could not sync services: %v", err) - updateFailed = true - } + if err := c.syncServices(); err != nil { + c.logger.Errorf("could not sync services: %v", err) + updateFailed = true } // Users @@ -946,7 +960,10 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // only when streams were not specified in oldSpec but in newSpec needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 - if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser { + annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations) + + initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser + if initUsers { c.logger.Debugf("initialize users") if err := c.initUsers(); err != nil { c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err) @@ -954,7 +971,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true return } - + } + if initUsers || annotationsChanged { c.logger.Debugf("syncing secrets") //TODO: mind the secrets of the deleted/new users if err := c.syncSecrets(); err != nil { @@ -968,38 +986,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { if c.OpConfig.StorageResizeMode != "off" { c.syncVolumes() } else { - c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.") - } - - // streams configuration - if len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 { - syncStatefulSet = true + c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.") } // Statefulset func() { - oldSs, err := c.generateStatefulSet(&oldSpec.Spec) - if err != nil { - c.logger.Errorf("could not generate old statefulset spec: %v", err) + if err := c.syncStatefulSet(); err != nil { + c.logger.Errorf("could not sync statefulsets: %v", err) updateFailed = true - return - } - - newSs, err := c.generateStatefulSet(&newSpec.Spec) - if err != nil { - c.logger.Errorf("could not generate new statefulset spec: %v", err) - updateFailed = true - return - } - - if syncStatefulSet || !reflect.DeepEqual(oldSs, newSs) { - c.logger.Debugf("syncing statefulsets") - syncStatefulSet = false - // TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet - if err := c.syncStatefulSet(); err != nil { - c.logger.Errorf("could not sync statefulsets: %v", err) - updateFailed = true - } } }() @@ -1011,12 +1005,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } // pod disruption budget - if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances { - c.logger.Debug("syncing pod disruption budgets") - if err := c.syncPodDisruptionBudget(true); err != nil { - c.logger.Errorf("could not sync pod disruption budget: %v", err) - updateFailed = true - } + if err := c.syncPodDisruptionBudget(true); err != nil { + c.logger.Errorf("could not sync pod disruption budget: %v", err) + updateFailed = true } // logical backup job diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 9c6587746..e7d38928b 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -1443,205 +1443,6 @@ func TestCompareServices(t *testing.T) { match: false, reason: `new service's LoadBalancerSourceRange does not match the current one`, }, - { - about: "services differ on DNS annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "new_clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: "external-dns.alpha.kubernetes.io/hostname" changed from "clstr.acid.zalan.do" to "new_clstr.acid.zalan.do".`, - }, - { - about: "services differ on AWS ELB annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: "1800", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" changed from "3600" to "1800".`, - }, - { - about: "service changes existing annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "baz", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: "foo" changed from "bar" to "baz".`, - }, - { - about: "service changes multiple existing annotations", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - "bar": "foo", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "baz", - "bar": "fooz", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - // Test just the prefix to avoid flakiness and map sorting - reason: `new service's annotations does not match the current one:`, - }, - { - about: "service adds a new custom annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: Added "foo" with value "bar".`, - }, - { - about: "service removes a custom annotation", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: Removed "foo".`, - }, - { - about: "service removes a custom annotation and adds a new one", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "bar": "foo", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - reason: `new service's annotations does not match the current one: Removed "foo". Added "bar" with value "foo".`, - }, - { - about: "service removes a custom annotation, adds a new one and change another", - current: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "foo": "bar", - "zalan": "do", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - "bar": "foo", - "zalan": "do.com", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - // Test just the prefix to avoid flakiness and map sorting - reason: `new service's annotations does not match the current one: Removed "foo".`, - }, - { - about: "service add annotations", - current: newService( - map[string]string{}, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do", - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: false, - // Test just the prefix to avoid flakiness and map sorting - reason: `new service's annotations does not match the current one: Added `, - }, - { - about: "ignored annotations", - current: newService( - map[string]string{}, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - new: newService( - map[string]string{ - "k8s.v1.cni.cncf.io/network-status": "up", - }, - v1.ServiceTypeLoadBalancer, - []string{"128.141.0.0/16", "137.138.0.0/16"}), - match: true, - }, } for _, tt := range tests { diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 7a97497d7..48f4ea849 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -691,8 +691,8 @@ func updateConnectionPoolerDeployment(KubeClient k8sutil.KubernetesClient, newDe return deployment, nil } -// updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment -func updateConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) { +// patchConnectionPoolerAnnotations updates the annotations of connection pooler deployment +func patchConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) { patchData, err := metaAnnotationsPatch(annotations) if err != nil { return nil, fmt.Errorf("could not form patch for the connection pooler deployment metadata: %v", err) @@ -1022,6 +1022,13 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql syncReason = append(syncReason, specReason...) } + newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec)) + if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed { + specSync = true + syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current one: " + reason}...) + deployment.Spec.Template.Annotations = newPodAnnotations + } + defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(&c.Config, newConnectionPooler, deployment) syncReason = append(syncReason, defaultsReason...) @@ -1040,15 +1047,15 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql } c.ConnectionPooler[role].Deployment = deployment } - } - newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(c.ConnectionPooler[role].Deployment.Annotations)) - if newAnnotations != nil { - deployment, err = updateConnectionPoolerAnnotations(c.KubeClient, c.ConnectionPooler[role].Deployment, newAnnotations) - if err != nil { - return nil, err + newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations + if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations); changed { + deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations) + if err != nil { + return nil, err + } + c.ConnectionPooler[role].Deployment = deployment } - c.ConnectionPooler[role].Deployment = deployment } // check if pooler pods must be replaced due to secret update @@ -1076,22 +1083,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return nil, fmt.Errorf("could not delete pooler pod: %v", err) } + } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { + patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) + if err != nil { + return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) + } + _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) + } } } if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { c.ConnectionPooler[role].Service = service desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role]) - if match, reason := c.compareServices(service, desiredSvc); !match { - syncReason = append(syncReason, reason) - c.logServiceChanges(role, service, desiredSvc, false, reason) - newService, err = c.updateService(role, service, desiredSvc) - if err != nil { - return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err) - } - c.ConnectionPooler[role].Service = newService - c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) + newService, err = c.updateService(role, service, desiredSvc) + if err != nil { + return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err) } + c.ConnectionPooler[role].Service = newService + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) return NoSync, nil } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 8a86de8c1..5a2ce6600 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -2061,9 +2061,10 @@ func (c *Cluster) getCustomServiceAnnotations(role PostgresRole, spec *acidv1.Po func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints { endpoints := &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: c.endpointName(role), - Namespace: c.Namespace, - Labels: c.roleLabelsSet(true, role), + Name: c.endpointName(role), + Namespace: c.Namespace, + Annotations: c.annotationsSet(nil), + Labels: c.roleLabelsSet(true, role), }, } if len(subsets) > 0 { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 1d4758c02..8c97dc6a2 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -286,55 +286,37 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { } func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newService *v1.Service) (*v1.Service, error) { - var ( - svc *v1.Service - err error - ) - - c.setProcessName("updating %v service", role) + var err error + svc := oldService serviceName := util.NameFromMeta(oldService.ObjectMeta) + match, reason := c.compareServices(oldService, newService) + if !match { + c.logServiceChanges(role, oldService, newService, false, reason) + c.setProcessName("updating %v service", role) - // update the service annotation in order to propagate ELB notation. - if len(newService.ObjectMeta.Annotations) > 0 { - if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil { - _, err = c.KubeClient.Services(serviceName.Namespace).Patch( - context.TODO(), - serviceName.Name, - types.MergePatchType, - []byte(annotationsPatchData), - metav1.PatchOptions{}, - "") - - if err != nil { - return nil, fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) - } - } else { - return nil, fmt.Errorf("could not form patch for the service metadata: %v", err) + // now, patch the service spec, but when disabling LoadBalancers do update instead + // patch does not work because of LoadBalancerSourceRanges field (even if set to nil) + oldServiceType := oldService.Spec.Type + newServiceType := newService.Spec.Type + if newServiceType == "ClusterIP" && newServiceType != oldServiceType { + newService.ResourceVersion = oldService.ResourceVersion + newService.Spec.ClusterIP = oldService.Spec.ClusterIP } - } - - // now, patch the service spec, but when disabling LoadBalancers do update instead - // patch does not work because of LoadBalancerSourceRanges field (even if set to nil) - oldServiceType := oldService.Spec.Type - newServiceType := newService.Spec.Type - if newServiceType == "ClusterIP" && newServiceType != oldServiceType { - newService.ResourceVersion = oldService.ResourceVersion - newService.Spec.ClusterIP = oldService.Spec.ClusterIP svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{}) if err != nil { return nil, fmt.Errorf("could not update service %q: %v", serviceName, err) } - } else { - patchData, err := specPatch(newService.Spec) - if err != nil { - return nil, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) - } + } - svc, err = c.KubeClient.Services(serviceName.Namespace).Patch( - context.TODO(), serviceName.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "") + if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed { + patchData, err := metaAnnotationsPatch(newService.Annotations) if err != nil { - return nil, fmt.Errorf("could not patch service %q: %v", serviceName, err) + return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err) + } + svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(context.TODO(), newService.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return nil, fmt.Errorf("could not patch annotations for service %q: %v", oldService.Name, err) } } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 4c89c98fe..b106fc722 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -20,6 +20,7 @@ import ( v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) var requirePrimaryRestartWhenDecreased = []string{ @@ -91,7 +92,6 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } - c.logger.Debug("syncing statefulsets") if err = c.syncStatefulSet(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { err = fmt.Errorf("could not sync statefulsets: %v", err) @@ -200,15 +200,12 @@ func (c *Cluster) syncService(role PostgresRole) error { if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil { c.Services[role] = svc desiredSvc := c.generateService(role, &c.Spec) - if match, reason := c.compareServices(svc, desiredSvc); !match { - c.logServiceChanges(role, svc, desiredSvc, false, reason) - updatedSvc, err := c.updateService(role, svc, desiredSvc) - if err != nil { - return fmt.Errorf("could not update %s service to match desired state: %v", role, err) - } - c.Services[role] = updatedSvc - c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) + updatedSvc, err := c.updateService(role, svc, desiredSvc) + if err != nil { + return fmt.Errorf("could not update %s service to match desired state: %v", role, err) } + c.Services[role] = updatedSvc + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) return nil } if !k8sutil.ResourceNotFound(err) { @@ -241,7 +238,17 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { c.setProcessName("syncing %s endpoint", role) if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err == nil { - // TODO: No syncing of endpoints here, is this covered completely by updateService? + desiredEp := c.generateEndpoint(role, ep.Subsets) + if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed { + patchData, err := metaAnnotationsPatch(desiredEp.Annotations) + if err != nil { + return fmt.Errorf("could not form patch for %s endpoint: %v", role, err) + } + ep, err = c.KubeClient.Endpoints(c.Namespace).Patch(context.TODO(), c.endpointName(role), types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations of %s endpoint: %v", role, err) + } + } c.Endpoints[role] = ep return nil } @@ -275,7 +282,8 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { c.PodDisruptionBudget = pdb newPDB := c.generatePodDisruptionBudget() - if match, reason := k8sutil.SamePDB(pdb, newPDB); !match { + match, reason := c.comparePodDisruptionBudget(pdb, newPDB) + if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) if err = c.updatePodDisruptionBudget(newPDB); err != nil { return err @@ -326,10 +334,11 @@ func (c *Cluster) syncStatefulSet() error { // NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early. sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{}) + if err != nil && !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("error during reading of statefulset: %v", err) + } + if err != nil { - if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("error during reading of statefulset: %v", err) - } // statefulset does not exist, try to re-create it c.Statefulset = nil c.logger.Infof("cluster's statefulset does not exist") @@ -354,6 +363,11 @@ func (c *Cluster) syncStatefulSet() error { c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) } else { + desiredSts, err := c.generateStatefulSet(&c.Spec) + if err != nil { + return fmt.Errorf("could not generate statefulset: %v", err) + } + c.logger.Debugf("syncing statefulsets") // check if there are still pods with a rolling update flag for _, pod := range pods { if c.getRollingUpdateFlagFromPod(&pod) { @@ -374,12 +388,21 @@ func (c *Cluster) syncStatefulSet() error { // statefulset is already there, make sure we use its definition in order to compare with the spec. c.Statefulset = sset - desiredSts, err := c.generateStatefulSet(&c.Spec) - if err != nil { - return fmt.Errorf("could not generate statefulset: %v", err) - } - cmp := c.compareStatefulSetWith(desiredSts) + if !cmp.rollingUpdate { + for _, pod := range pods { + if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations); changed { + patchData, err := metaAnnotationsPatch(desiredSts.Spec.Template.Annotations) + if err != nil { + return fmt.Errorf("could not form patch for pod %q annotations: %v", pod.Name, err) + } + _, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations for pod %q: %v", pod.Name, err) + } + } + } + } if !cmp.match { if cmp.rollingUpdate { podsToRecreate = make([]v1.Pod, 0) @@ -942,6 +965,17 @@ func (c *Cluster) updateSecret( c.Secrets[secret.UID] = secret } + if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed { + patchData, err := metaAnnotationsPatch(generatedSecret.Annotations) + if err != nil { + return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) + } + _, err = c.KubeClient.Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err) + } + } + return nil } @@ -1379,6 +1413,16 @@ func (c *Cluster) syncLogicalBackupJob() error { } c.logger.Info("the logical backup job is synced") } + if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations); changed { + patchData, err := metaAnnotationsPatch(desiredJob.Annotations) + if err != nil { + return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err) + } + _, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), jobName, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations of the logical backup job %q: %v", jobName, err) + } + } return nil } if !k8sutil.ResourceNotFound(err) { diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 5d8b92f2c..3bd23f4b4 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -1,57 +1,259 @@ package cluster import ( + "bytes" "context" + "fmt" + "io" + "maps" + "net/http" + "reflect" "testing" + "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/zalando/postgres-operator/mocks" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" - "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + "github.com/zalando/postgres-operator/pkg/util/patroni" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" k8sFake "k8s.io/client-go/kubernetes/fake" ) +var externalAnnotations = map[string]string{"existing": "annotation"} + func newFakeK8sAnnotationsClient() (k8sutil.KubernetesClient, *k8sFake.Clientset) { clientSet := k8sFake.NewSimpleClientset() acidClientSet := fakeacidv1.NewSimpleClientset() return k8sutil.KubernetesClient{ - PodDisruptionBudgetsGetter: clientSet.PolicyV1(), - ServicesGetter: clientSet.CoreV1(), - StatefulSetsGetter: clientSet.AppsV1(), - PostgresqlsGetter: acidClientSet.AcidV1(), + PodDisruptionBudgetsGetter: clientSet.PolicyV1(), + SecretsGetter: clientSet.CoreV1(), + ServicesGetter: clientSet.CoreV1(), + StatefulSetsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + PersistentVolumeClaimsGetter: clientSet.CoreV1(), + PersistentVolumesGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + DeploymentsGetter: clientSet.AppsV1(), }, clientSet } -func TestInheritedAnnotations(t *testing.T) { - testName := "test inheriting annotations from manifest" - client, _ := newFakeK8sAnnotationsClient() - clusterName := "acid-test-cluster" - namespace := "default" - annotationValue := "acid" - role := Master +func clusterLabelsOptions(cluster *Cluster) metav1.ListOptions { + clusterLabel := labels.Set(map[string]string{cluster.OpConfig.ClusterNameLabel: cluster.Name}) + return metav1.ListOptions{ + LabelSelector: clusterLabel.String(), + } +} +func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[string]string) error { + clusterOptions := clusterLabelsOptions(cluster) + // helper functions + containsAnnotations := func(expected map[string]string, actual map[string]string, objName string, objType string) error { + if expected == nil { + if len(actual) != 0 { + return fmt.Errorf("%s %v expected not to have any annotations, got: %#v", objType, objName, actual) + } + } else if !(reflect.DeepEqual(expected, actual)) { + return fmt.Errorf("%s %v expected annotations: %#v, got: %#v", objType, objName, expected, actual) + } + return nil + } + + updateAnnotations := func(annotations map[string]string) map[string]string { + result := make(map[string]string, 0) + for anno := range annotations { + if _, ok := externalAnnotations[anno]; !ok { + result[anno] = annotations[anno] + } + } + return result + } + + checkSts := func(annotations map[string]string) error { + stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + stsAnnotations := updateAnnotations(annotations) + + for _, sts := range stsList.Items { + if err := containsAnnotations(stsAnnotations, sts.Annotations, sts.ObjectMeta.Name, "StatefulSet"); err != nil { + return err + } + // pod template + if err := containsAnnotations(stsAnnotations, sts.Spec.Template.Annotations, sts.ObjectMeta.Name, "StatefulSet pod template"); err != nil { + return err + } + // pvc template + if err := containsAnnotations(stsAnnotations, sts.Spec.VolumeClaimTemplates[0].Annotations, sts.ObjectMeta.Name, "StatefulSet pvc template"); err != nil { + return err + } + } + return nil + } + + checkPods := func(annotations map[string]string) error { + podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, pod := range podList.Items { + if err := containsAnnotations(annotations, pod.Annotations, pod.ObjectMeta.Name, "Pod"); err != nil { + return err + } + } + return nil + } + + checkSvc := func(annotations map[string]string) error { + svcList, err := cluster.KubeClient.Services(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, svc := range svcList.Items { + if err := containsAnnotations(annotations, svc.Annotations, svc.ObjectMeta.Name, "Service"); err != nil { + return err + } + } + return nil + } + + checkPdb := func(annotations map[string]string) error { + pdbList, err := cluster.KubeClient.PodDisruptionBudgets(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, pdb := range pdbList.Items { + if err := containsAnnotations(updateAnnotations(annotations), pdb.Annotations, pdb.ObjectMeta.Name, "Pod Disruption Budget"); err != nil { + return err + } + } + return nil + } + + checkPvc := func(annotations map[string]string) error { + pvcList, err := cluster.KubeClient.PersistentVolumeClaims(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, pvc := range pvcList.Items { + if err := containsAnnotations(annotations, pvc.Annotations, pvc.ObjectMeta.Name, "Volume claim"); err != nil { + return err + } + } + return nil + } + + checkPooler := func(annotations map[string]string) error { + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + if err != nil { + return err + } + if err := containsAnnotations(annotations, deploy.Annotations, deploy.Name, "Deployment"); err != nil { + return err + } + if err := containsAnnotations(updateAnnotations(annotations), deploy.Spec.Template.Annotations, deploy.Name, "Pooler pod template"); err != nil { + return err + } + } + return nil + } + + checkSecrets := func(annotations map[string]string) error { + secretList, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, secret := range secretList.Items { + if err := containsAnnotations(annotations, secret.Annotations, secret.Name, "Secret"); err != nil { + return err + } + } + return nil + } + + checkEndpoints := func(annotations map[string]string) error { + endpointsList, err := cluster.KubeClient.Endpoints(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, ep := range endpointsList.Items { + if err := containsAnnotations(annotations, ep.Annotations, ep.Name, "Endpoints"); err != nil { + return err + } + } + return nil + } + + checkFuncs := []func(map[string]string) error{ + checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkPvc, checkSecrets, checkEndpoints, + } + for _, f := range checkFuncs { + if err := f(resultAnnotations); err != nil { + return err + } + } + return nil +} + +func createPods(cluster *Cluster) []v1.Pod { + podsList := make([]v1.Pod, 0) + for i, role := range []PostgresRole{Master, Replica} { + podsList = append(podsList, v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%d", clusterName, i), + Namespace: namespace, + Labels: map[string]string{ + "application": "spilo", + "cluster-name": clusterName, + "spilo-role": string(role), + }, + }, + }) + podsList = append(podsList, v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pooler-%s", clusterName, role), + Namespace: namespace, + Labels: cluster.connectionPoolerLabels(role, true).MatchLabels, + }, + }) + } + + return podsList +} + +func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, error) { pg := acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{ Name: clusterName, Annotations: map[string]string{ - "owned-by": annotationValue, + "owned-by": "acid", + "foo": "bar", // should not be inherited }, }, Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), EnableReplicaConnectionPooler: boolToPointer(true), Volume: acidv1.Volume{ Size: "1Gi", }, + NumberOfInstances: 2, }, } - var cluster = New( + cluster := New( Config{ OpConfig: config.Config{ + PatroniAPICheckInterval: time.Duration(1), + PatroniAPICheckTimeout: time.Duration(5), ConnectionPooler: config.ConnectionPooler{ ConnectionPoolerDefaultCPURequest: "100m", ConnectionPoolerDefaultCPULimit: "100m", @@ -59,85 +261,225 @@ func TestInheritedAnnotations(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", NumberOfInstances: k8sutil.Int32ToPointer(1), }, + PDBNameFormat: "postgres-{cluster}-pdb", PodManagementPolicy: "ordered_ready", Resources: config.Resources{ - ClusterLabels: map[string]string{"application": "spilo"}, - ClusterNameLabel: "cluster-name", - DefaultCPURequest: "300m", - DefaultCPULimit: "300m", - DefaultMemoryRequest: "300Mi", - DefaultMemoryLimit: "300Mi", - InheritedAnnotations: []string{"owned-by"}, - PodRoleLabel: "spilo-role", + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + InheritedAnnotations: []string{"owned-by"}, + PodRoleLabel: "spilo-role", + ResourceCheckInterval: time.Duration(testResourceCheckInterval), + ResourceCheckTimeout: time.Duration(testResourceCheckTimeout), + MinInstances: -1, + MaxInstances: -1, }, }, }, client, pg, logger, eventRecorder) - cluster.Name = clusterName cluster.Namespace = namespace - - // test annotationsSet function - inheritedAnnotations := cluster.annotationsSet(nil) - - listOptions := metav1.ListOptions{ - LabelSelector: cluster.labelsSet(false).String(), - } - - // check statefulset annotations _, err := cluster.createStatefulSet() - assert.NoError(t, err) + if err != nil { + return nil, err + } + _, err = cluster.createService(Master) + if err != nil { + return nil, err + } + _, err = cluster.createPodDisruptionBudget() + if err != nil { + return nil, err + } + _, err = cluster.createConnectionPooler(mockInstallLookupFunction) + if err != nil { + return nil, err + } + pvcList := CreatePVCs(namespace, clusterName, cluster.labelsSet(false), 2, "1Gi") + for _, pvc := range pvcList.Items { + _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &pvc, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + } + podsList := createPods(cluster) + for _, pod := range podsList { + _, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + } - stsList, err := client.StatefulSets(namespace).List(context.TODO(), listOptions) - assert.NoError(t, err) + return cluster, nil +} + +func annotateResources(cluster *Cluster) error { + clusterOptions := clusterLabelsOptions(cluster) + + stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } for _, sts := range stsList.Items { - if !(util.MapContains(sts.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: StatefulSet %v not inherited annotations %#v, got %#v", testName, sts.ObjectMeta.Name, inheritedAnnotations, sts.ObjectMeta.Annotations) - } - // pod template - if !(util.MapContains(sts.Spec.Template.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: pod template %v not inherited annotations %#v, got %#v", testName, sts.ObjectMeta.Name, inheritedAnnotations, sts.ObjectMeta.Annotations) - } - // pvc template - if !(util.MapContains(sts.Spec.VolumeClaimTemplates[0].Annotations, inheritedAnnotations)) { - t.Errorf("%s: PVC template %v not inherited annotations %#v, got %#v", testName, sts.ObjectMeta.Name, inheritedAnnotations, sts.ObjectMeta.Annotations) + sts.Annotations = externalAnnotations + if _, err = cluster.KubeClient.StatefulSets(namespace).Update(context.TODO(), &sts, metav1.UpdateOptions{}); err != nil { + return err } } - // check service annotations - cluster.createService(Master) - svcList, err := client.Services(namespace).List(context.TODO(), listOptions) - assert.NoError(t, err) + podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, pod := range podList.Items { + pod.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Pods(namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil { + return err + } + } + + svcList, err := cluster.KubeClient.Services(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } for _, svc := range svcList.Items { - if !(util.MapContains(svc.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: Service %v not inherited annotations %#v, got %#v", testName, svc.ObjectMeta.Name, inheritedAnnotations, svc.ObjectMeta.Annotations) + svc.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Services(namespace).Update(context.TODO(), &svc, metav1.UpdateOptions{}); err != nil { + return err } } - // check pod disruption budget annotations - cluster.createPodDisruptionBudget() - pdbList, err := client.PodDisruptionBudgets(namespace).List(context.TODO(), listOptions) - assert.NoError(t, err) + pdbList, err := cluster.KubeClient.PodDisruptionBudgets(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } for _, pdb := range pdbList.Items { - if !(util.MapContains(pdb.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: Pod Disruption Budget %v not inherited annotations %#v, got %#v", testName, pdb.ObjectMeta.Name, inheritedAnnotations, pdb.ObjectMeta.Annotations) + pdb.Annotations = externalAnnotations + _, err = cluster.KubeClient.PodDisruptionBudgets(namespace).Update(context.TODO(), &pdb, metav1.UpdateOptions{}) + if err != nil { + return err } } - // check pooler deployment annotations - cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{} - cluster.ConnectionPooler[role] = &ConnectionPoolerObjects{ - Name: cluster.connectionPoolerName(role), - ClusterName: cluster.Name, - Namespace: cluster.Namespace, - Role: role, + pvcList, err := cluster.KubeClient.PersistentVolumeClaims(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err } - deploy, err := cluster.generateConnectionPoolerDeployment(cluster.ConnectionPooler[role]) + for _, pvc := range pvcList.Items { + pvc.Annotations = externalAnnotations + if _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { + return err + } + } + + for _, role := range []PostgresRole{Master, Replica} { + deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) + if err != nil { + return err + } + deploy.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Deployments(namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{}); err != nil { + return err + } + } + + secrets, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, secret := range secrets.Items { + secret.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Secrets(namespace).Update(context.TODO(), &secret, metav1.UpdateOptions{}); err != nil { + return err + } + } + + endpoints, err := cluster.KubeClient.Endpoints(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, ep := range endpoints.Items { + ep.Annotations = externalAnnotations + if _, err = cluster.KubeClient.Endpoints(namespace).Update(context.TODO(), &ep, metav1.UpdateOptions{}); err != nil { + return err + } + } + return nil +} + +func TestInheritedAnnotations(t *testing.T) { + // mocks + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client, _ := newFakeK8sAnnotationsClient() + mockClient := mocks.NewMockHTTPClient(ctrl) + + cluster, err := newInheritedAnnotationsCluster(client) assert.NoError(t, err) - if !(util.MapContains(deploy.ObjectMeta.Annotations, inheritedAnnotations)) { - t.Errorf("%s: Deployment %v not inherited annotations %#v, got %#v", testName, deploy.ObjectMeta.Name, inheritedAnnotations, deploy.ObjectMeta.Annotations) + configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}` + response := http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(configJson))), } + mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes() + cluster.patroni = patroni.New(patroniLogger, mockClient) + err = cluster.Sync(&cluster.Postgresql) + assert.NoError(t, err) + + filterLabels := cluster.labelsSet(false) + + // Finally, tests! + result := map[string]string{"owned-by": "acid"} + assert.True(t, reflect.DeepEqual(result, cluster.annotationsSet(nil))) + + // 1. Check initial state + err = checkResourcesInheritedAnnotations(cluster, result) + assert.NoError(t, err) + + // 2. Check annotation value change + + // 2.1 Sync event + newSpec := cluster.Postgresql.DeepCopy() + newSpec.Annotations["owned-by"] = "fooSync" + result["owned-by"] = "fooSync" + + err = cluster.Sync(newSpec) + assert.NoError(t, err) + err = checkResourcesInheritedAnnotations(cluster, result) + assert.NoError(t, err) + + // + existing PVC without annotations + cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 3, "1Gi").Items[2], metav1.CreateOptions{}) + err = cluster.Sync(newSpec) + assert.NoError(t, err) + err = checkResourcesInheritedAnnotations(cluster, result) + assert.NoError(t, err) + + // 2.2 Update event + newSpec = cluster.Postgresql.DeepCopy() + newSpec.Annotations["owned-by"] = "fooUpdate" + result["owned-by"] = "fooUpdate" + // + new PVC + cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 4, "1Gi").Items[3], metav1.CreateOptions{}) + + err = cluster.Update(cluster.Postgresql.DeepCopy(), newSpec) + assert.NoError(t, err) + + err = checkResourcesInheritedAnnotations(cluster, result) + assert.NoError(t, err) + + // 3. Existing annotations (should not be removed) + err = annotateResources(cluster) + assert.NoError(t, err) + maps.Copy(result, externalAnnotations) + err = cluster.Sync(newSpec.DeepCopy()) + assert.NoError(t, err) + err = checkResourcesInheritedAnnotations(cluster, result) + assert.NoError(t, err) } func Test_trimCronjobName(t *testing.T) { diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 1a4c7c73f..7d8bd1753 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -9,9 +9,9 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "github.com/aws/aws-sdk-go/aws" - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/constants" @@ -42,18 +42,14 @@ func (c *Cluster) syncVolumes() error { c.logger.Errorf("errors occured during EBS volume adjustments: %v", err) } } + } - // resize pvc to adjust filesystem size until better K8s support - if err = c.syncVolumeClaims(); err != nil { - err = fmt.Errorf("could not sync persistent volume claims: %v", err) - return err - } - } else if c.OpConfig.StorageResizeMode == "pvc" { - if err = c.syncVolumeClaims(); err != nil { - err = fmt.Errorf("could not sync persistent volume claims: %v", err) - return err - } - } else if c.OpConfig.StorageResizeMode == "ebs" { + if err = c.syncVolumeClaims(); err != nil { + err = fmt.Errorf("could not sync persistent volume claims: %v", err) + return err + } + + if c.OpConfig.StorageResizeMode == "ebs" { // potentially enlarge volumes before changing the statefulset. By doing that // in this order we make sure the operator is not stuck waiting for a pod that // cannot start because it ran out of disk space. @@ -64,8 +60,6 @@ func (c *Cluster) syncVolumes() error { err = fmt.Errorf("could not sync persistent volumes: %v", err) return err } - } else { - c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.") } return nil @@ -187,18 +181,58 @@ func (c *Cluster) populateVolumeMetaData() error { func (c *Cluster) syncVolumeClaims() error { c.setProcessName("syncing volume claims") - needsResizing, err := c.volumeClaimsNeedResizing(c.Spec.Volume) + ignoreResize := false + + if c.OpConfig.StorageResizeMode == "off" || c.OpConfig.StorageResizeMode == "ebs" { + ignoreResize = true + c.logger.Debugf("Storage resize mode is set to %q. Skipping volume size sync of PVCs.", c.OpConfig.StorageResizeMode) + + } + + newSize, err := resource.ParseQuantity(c.Spec.Volume.Size) if err != nil { - return fmt.Errorf("could not compare size of the volume claims: %v", err) + return fmt.Errorf("could not parse volume size from the manifest: %v", err) } + manifestSize := quantityToGigabyte(newSize) - if !needsResizing { - c.logger.Infof("volume claims do not require changes") - return nil + pvcs, err := c.listPersistentVolumeClaims() + if err != nil { + return fmt.Errorf("could not receive persistent volume claims: %v", err) } + for _, pvc := range pvcs { + needsUpdate := false + currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage]) + if !ignoreResize && currentSize != manifestSize { + if currentSize < manifestSize { + pvc.Spec.Resources.Requests[v1.ResourceStorage] = newSize + needsUpdate = true + c.logger.Debugf("persistent volume claim for volume %q needs to be resized", pvc.Name) + } else { + c.logger.Warningf("cannot shrink persistent volume") + } + } - if err := c.resizeVolumeClaims(c.Spec.Volume); err != nil { - return fmt.Errorf("could not sync volume claims: %v", err) + if needsUpdate { + c.logger.Debugf("updating persistent volume claim definition for volume %q", pvc.Name) + if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("could not update persistent volume claim: %q", err) + } + c.logger.Debugf("successfully updated persistent volume claim %q", pvc.Name) + } else { + c.logger.Debugf("volume claim for volume %q do not require updates", pvc.Name) + } + + newAnnotations := c.annotationsSet(nil) + if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations); changed { + patchData, err := metaAnnotationsPatch(newAnnotations) + if err != nil { + return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err) + } + _, err = c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch annotations of the persistent volume claim for volume %q: %v", pvc.Name, err) + } + } } c.logger.Infof("volume claims have been synced successfully") @@ -261,35 +295,6 @@ func (c *Cluster) deletePersistentVolumeClaims() error { return nil } -func (c *Cluster) resizeVolumeClaims(newVolume acidv1.Volume) error { - c.logger.Debugln("resizing PVCs") - pvcs, err := c.listPersistentVolumeClaims() - if err != nil { - return err - } - newQuantity, err := resource.ParseQuantity(newVolume.Size) - if err != nil { - return fmt.Errorf("could not parse volume size: %v", err) - } - newSize := quantityToGigabyte(newQuantity) - for _, pvc := range pvcs { - volumeSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage]) - if volumeSize >= newSize { - if volumeSize > newSize { - c.logger.Warningf("cannot shrink persistent volume") - } - continue - } - pvc.Spec.Resources.Requests[v1.ResourceStorage] = newQuantity - c.logger.Debugf("updating persistent volume claim definition for volume %q", pvc.Name) - if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("could not update persistent volume claim: %q", err) - } - c.logger.Debugf("successfully updated persistent volume claim %q", pvc.Name) - } - return nil -} - func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { result := make([]*v1.PersistentVolume, 0) @@ -406,25 +411,6 @@ func (c *Cluster) resizeVolumes() error { return nil } -func (c *Cluster) volumeClaimsNeedResizing(newVolume acidv1.Volume) (bool, error) { - newSize, err := resource.ParseQuantity(newVolume.Size) - manifestSize := quantityToGigabyte(newSize) - if err != nil { - return false, fmt.Errorf("could not parse volume size from the manifest: %v", err) - } - pvcs, err := c.listPersistentVolumeClaims() - if err != nil { - return false, fmt.Errorf("could not receive persistent volume claims: %v", err) - } - for _, pvc := range pvcs { - currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage]) - if currentSize != manifestSize { - return true, nil - } - } - return false, nil -} - func (c *Cluster) volumesNeedResizing() (bool, error) { newQuantity, _ := resource.ParseQuantity(c.Spec.Volume.Size) newSize := quantityToGigabyte(newQuantity) diff --git a/pkg/cluster/volumes_test.go b/pkg/cluster/volumes_test.go index 4ef94fcfb..329224893 100644 --- a/pkg/cluster/volumes_test.go +++ b/pkg/cluster/volumes_test.go @@ -74,6 +74,7 @@ func TestResizeVolumeClaim(t *testing.T) { cluster.Name = clusterName cluster.Namespace = namespace filterLabels := cluster.labelsSet(false) + cluster.Spec.Volume.Size = newVolumeSize // define and create PVCs for 1Gi volumes pvcList := CreatePVCs(namespace, clusterName, filterLabels, 2, "1Gi") @@ -85,7 +86,7 @@ func TestResizeVolumeClaim(t *testing.T) { } // test resizing - cluster.resizeVolumeClaims(acidv1.Volume{Size: newVolumeSize}) + cluster.syncVolumes() pvcs, err := cluster.listPersistentVolumeClaims() assert.NoError(t, err) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 66c30dede..3db1122a9 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -3,7 +3,6 @@ package k8sutil import ( "context" "fmt" - "reflect" b64 "encoding/base64" "encoding/json" @@ -17,7 +16,6 @@ import ( "github.com/zalando/postgres-operator/pkg/spec" apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - apipolicyv1 "k8s.io/api/policy/v1" apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -242,17 +240,6 @@ func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg return updatedPg, nil } -// SamePDB compares the PodDisruptionBudgets -func SamePDB(cur, new *apipolicyv1.PodDisruptionBudget) (match bool, reason string) { - //TODO: improve comparison - match = reflect.DeepEqual(new.Spec, cur.Spec) - if !match { - reason = "new PDB spec does not match the current one" - } - - return -} - func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) { oldFormatSecret := &v1.Secret{} oldFormatSecret.Name = "testcluster"