Improve inherited annotations (#2657)

* Annotate PVC on Sync/Update, not only change PVC template
* Don't rotate pods when only annotations changed
* Annotate Logical Backup's and Pooler's pods
* Annotate PDB, Endpoints created by the Operator, Secrets, Logical Backup jobs

Inherited annotations are only added/updated, not removed
This commit is contained in:
Polina Bungina 2024-06-26 13:10:37 +02:00 committed by GitHub
parent 2ef7d58578
commit 47efca33c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 645 additions and 492 deletions

View File

@ -909,22 +909,8 @@ class EndToEndTestCase(unittest.TestCase):
'''
k8s = self.k8s
annotation_patch = {
"metadata": {
"annotations": {
"k8s-status": "healthy"
},
}
}
try:
sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')
old_svc_creation_timestamp = svc.metadata.creation_timestamp
k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch)
patch_config_ignored_annotations = {
"data": {
"ignored_annotations": "k8s-status",
@ -933,6 +919,25 @@ class EndToEndTestCase(unittest.TestCase):
k8s.update_config(patch_config_ignored_annotations)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')
annotation_patch = {
"metadata": {
"annotations": {
"k8s-status": "healthy"
},
}
}
old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
old_svc_creation_timestamp = svc.metadata.creation_timestamp
k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch)
k8s.delete_operator_pod()
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
new_sts_creation_timestamp = sts.metadata.creation_timestamp
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')

View File

@ -102,6 +102,7 @@ rules:
- delete
- get
- update
- patch
# to check nodes for node readiness label
- apiGroups:
- ""

View File

@ -30,6 +30,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -433,6 +434,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, "new statefulset's pod management policy do not match")
}
if c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy == nil {
c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
}
}
if !reflect.DeepEqual(c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy, statefulSet.Spec.PersistentVolumeClaimRetentionPolicy) {
match = false
needsReplace = true
@ -493,7 +500,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed {
match = false
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) {
@ -513,9 +519,9 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
continue
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed {
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name))
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: %s", name, reason))
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
@ -780,10 +786,6 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
}
}
if changed, reason := c.compareAnnotations(old.Annotations, new.Annotations); changed {
return !changed, "new service's annotations does not match the current one:" + reason
}
return true, ""
}
@ -801,6 +803,12 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
newImage, curImage)
}
newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
}
newPgVersion := getPgVersion(new)
curPgVersion := getPgVersion(cur)
if newPgVersion != curPgVersion {
@ -818,6 +826,17 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
return true, ""
}
func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
return false, "new PDB spec does not match the current one"
}
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed {
return false, "new PDB's annotations does not match the current one:" + reason
}
return true, ""
}
func getPgVersion(cronJob *batchv1.CronJob) string {
envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
for _, env := range envs {
@ -883,7 +902,6 @@ func (c *Cluster) hasFinalizer() bool {
func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed := false
userInitFailed := false
syncStatefulSet := false
c.mu.Lock()
defer c.mu.Unlock()
@ -914,7 +932,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) {
c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed",
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
syncStatefulSet = true
} else {
c.logger.Infof("postgresql major version unchanged or smaller, no changes needed")
// sticking with old version, this will also advance GetDesiredVersion next time.
@ -922,12 +939,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
// Service
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}
// Users
@ -946,7 +960,10 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0
if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser {
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)
initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
if initUsers {
c.logger.Debugf("initialize users")
if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
@ -954,7 +971,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
return
}
}
if initUsers || annotationsChanged {
c.logger.Debugf("syncing secrets")
//TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
@ -968,38 +986,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if c.OpConfig.StorageResizeMode != "off" {
c.syncVolumes()
} else {
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.")
}
// streams configuration
if len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 {
syncStatefulSet = true
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.")
}
// Statefulset
func() {
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate old statefulset spec: %v", err)
if err := c.syncStatefulSet(); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
return
}
newSs, err := c.generateStatefulSet(&newSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate new statefulset spec: %v", err)
updateFailed = true
return
}
if syncStatefulSet || !reflect.DeepEqual(oldSs, newSs) {
c.logger.Debugf("syncing statefulsets")
syncStatefulSet = false
// TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet
if err := c.syncStatefulSet(); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
}
}
}()
@ -1011,12 +1005,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
// pod disruption budget
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
c.logger.Debug("syncing pod disruption budgets")
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}
// logical backup job

View File

@ -1443,205 +1443,6 @@ func TestCompareServices(t *testing.T) {
match: false,
reason: `new service's LoadBalancerSourceRange does not match the current one`,
},
{
about: "services differ on DNS annotation",
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "new_clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: false,
reason: `new service's annotations does not match the current one: "external-dns.alpha.kubernetes.io/hostname" changed from "clstr.acid.zalan.do" to "new_clstr.acid.zalan.do".`,
},
{
about: "services differ on AWS ELB annotation",
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: "1800",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: false,
reason: `new service's annotations does not match the current one: "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" changed from "3600" to "1800".`,
},
{
about: "service changes existing annotation",
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"foo": "bar",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"foo": "baz",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: false,
reason: `new service's annotations does not match the current one: "foo" changed from "bar" to "baz".`,
},
{
about: "service changes multiple existing annotations",
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"foo": "bar",
"bar": "foo",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"foo": "baz",
"bar": "fooz",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: false,
// Test just the prefix to avoid flakiness and map sorting
reason: `new service's annotations does not match the current one:`,
},
{
about: "service adds a new custom annotation",
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"foo": "bar",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: false,
reason: `new service's annotations does not match the current one: Added "foo" with value "bar".`,
},
{
about: "service removes a custom annotation",
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"foo": "bar",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: false,
reason: `new service's annotations does not match the current one: Removed "foo".`,
},
{
about: "service removes a custom annotation and adds a new one",
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"foo": "bar",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"bar": "foo",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: false,
reason: `new service's annotations does not match the current one: Removed "foo". Added "bar" with value "foo".`,
},
{
about: "service removes a custom annotation, adds a new one and change another",
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"foo": "bar",
"zalan": "do",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
"bar": "foo",
"zalan": "do.com",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: false,
// Test just the prefix to avoid flakiness and map sorting
reason: `new service's annotations does not match the current one: Removed "foo".`,
},
{
about: "service add annotations",
current: newService(
map[string]string{},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: false,
// Test just the prefix to avoid flakiness and map sorting
reason: `new service's annotations does not match the current one: Added `,
},
{
about: "ignored annotations",
current: newService(
map[string]string{},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
new: newService(
map[string]string{
"k8s.v1.cni.cncf.io/network-status": "up",
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"}),
match: true,
},
}
for _, tt := range tests {

View File

@ -691,8 +691,8 @@ func updateConnectionPoolerDeployment(KubeClient k8sutil.KubernetesClient, newDe
return deployment, nil
}
// updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment
func updateConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) {
// patchConnectionPoolerAnnotations updates the annotations of connection pooler deployment
func patchConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) {
patchData, err := metaAnnotationsPatch(annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for the connection pooler deployment metadata: %v", err)
@ -1022,6 +1022,13 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
syncReason = append(syncReason, specReason...)
}
newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec))
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed {
specSync = true
syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current one: " + reason}...)
deployment.Spec.Template.Annotations = newPodAnnotations
}
defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(&c.Config, newConnectionPooler, deployment)
syncReason = append(syncReason, defaultsReason...)
@ -1040,15 +1047,15 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
}
c.ConnectionPooler[role].Deployment = deployment
}
}
newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(c.ConnectionPooler[role].Deployment.Annotations))
if newAnnotations != nil {
deployment, err = updateConnectionPoolerAnnotations(c.KubeClient, c.ConnectionPooler[role].Deployment, newAnnotations)
if err != nil {
return nil, err
newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations
if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations); changed {
deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations)
if err != nil {
return nil, err
}
c.ConnectionPooler[role].Deployment = deployment
}
c.ConnectionPooler[role].Deployment = deployment
}
// check if pooler pods must be replaced due to secret update
@ -1076,22 +1083,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil {
return nil, fmt.Errorf("could not delete pooler pod: %v", err)
}
} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed {
patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err)
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err)
}
}
}
if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil {
c.ConnectionPooler[role].Service = service
desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role])
if match, reason := c.compareServices(service, desiredSvc); !match {
syncReason = append(syncReason, reason)
c.logServiceChanges(role, service, desiredSvc, false, reason)
newService, err = c.updateService(role, service, desiredSvc)
if err != nil {
return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err)
}
c.ConnectionPooler[role].Service = newService
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
newService, err = c.updateService(role, service, desiredSvc)
if err != nil {
return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err)
}
c.ConnectionPooler[role].Service = newService
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
return NoSync, nil
}

View File

@ -2061,9 +2061,10 @@ func (c *Cluster) getCustomServiceAnnotations(role PostgresRole, spec *acidv1.Po
func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints {
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: c.endpointName(role),
Namespace: c.Namespace,
Labels: c.roleLabelsSet(true, role),
Name: c.endpointName(role),
Namespace: c.Namespace,
Annotations: c.annotationsSet(nil),
Labels: c.roleLabelsSet(true, role),
},
}
if len(subsets) > 0 {

View File

@ -286,55 +286,37 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
}
func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newService *v1.Service) (*v1.Service, error) {
var (
svc *v1.Service
err error
)
c.setProcessName("updating %v service", role)
var err error
svc := oldService
serviceName := util.NameFromMeta(oldService.ObjectMeta)
match, reason := c.compareServices(oldService, newService)
if !match {
c.logServiceChanges(role, oldService, newService, false, reason)
c.setProcessName("updating %v service", role)
// update the service annotation in order to propagate ELB notation.
if len(newService.ObjectMeta.Annotations) > 0 {
if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil {
_, err = c.KubeClient.Services(serviceName.Namespace).Patch(
context.TODO(),
serviceName.Name,
types.MergePatchType,
[]byte(annotationsPatchData),
metav1.PatchOptions{},
"")
if err != nil {
return nil, fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err)
}
} else {
return nil, fmt.Errorf("could not form patch for the service metadata: %v", err)
// now, patch the service spec, but when disabling LoadBalancers do update instead
// patch does not work because of LoadBalancerSourceRanges field (even if set to nil)
oldServiceType := oldService.Spec.Type
newServiceType := newService.Spec.Type
if newServiceType == "ClusterIP" && newServiceType != oldServiceType {
newService.ResourceVersion = oldService.ResourceVersion
newService.Spec.ClusterIP = oldService.Spec.ClusterIP
}
}
// now, patch the service spec, but when disabling LoadBalancers do update instead
// patch does not work because of LoadBalancerSourceRanges field (even if set to nil)
oldServiceType := oldService.Spec.Type
newServiceType := newService.Spec.Type
if newServiceType == "ClusterIP" && newServiceType != oldServiceType {
newService.ResourceVersion = oldService.ResourceVersion
newService.Spec.ClusterIP = oldService.Spec.ClusterIP
svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("could not update service %q: %v", serviceName, err)
}
} else {
patchData, err := specPatch(newService.Spec)
if err != nil {
return nil, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
}
}
svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(
context.TODO(), serviceName.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "")
if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed {
patchData, err := metaAnnotationsPatch(newService.Annotations)
if err != nil {
return nil, fmt.Errorf("could not patch service %q: %v", serviceName, err)
return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err)
}
svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(context.TODO(), newService.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("could not patch annotations for service %q: %v", oldService.Name, err)
}
}

View File

@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
var requirePrimaryRestartWhenDecreased = []string{
@ -91,7 +92,6 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
}
c.logger.Debug("syncing statefulsets")
if err = c.syncStatefulSet(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
err = fmt.Errorf("could not sync statefulsets: %v", err)
@ -200,15 +200,12 @@ func (c *Cluster) syncService(role PostgresRole) error {
if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil {
c.Services[role] = svc
desiredSvc := c.generateService(role, &c.Spec)
if match, reason := c.compareServices(svc, desiredSvc); !match {
c.logServiceChanges(role, svc, desiredSvc, false, reason)
updatedSvc, err := c.updateService(role, svc, desiredSvc)
if err != nil {
return fmt.Errorf("could not update %s service to match desired state: %v", role, err)
}
c.Services[role] = updatedSvc
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
updatedSvc, err := c.updateService(role, svc, desiredSvc)
if err != nil {
return fmt.Errorf("could not update %s service to match desired state: %v", role, err)
}
c.Services[role] = updatedSvc
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
return nil
}
if !k8sutil.ResourceNotFound(err) {
@ -241,7 +238,17 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
c.setProcessName("syncing %s endpoint", role)
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.endpointName(role), metav1.GetOptions{}); err == nil {
// TODO: No syncing of endpoints here, is this covered completely by updateService?
desiredEp := c.generateEndpoint(role, ep.Subsets)
if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed {
patchData, err := metaAnnotationsPatch(desiredEp.Annotations)
if err != nil {
return fmt.Errorf("could not form patch for %s endpoint: %v", role, err)
}
ep, err = c.KubeClient.Endpoints(c.Namespace).Patch(context.TODO(), c.endpointName(role), types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not patch annotations of %s endpoint: %v", role, err)
}
}
c.Endpoints[role] = ep
return nil
}
@ -275,7 +282,8 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.PodDisruptionBudget = pdb
newPDB := c.generatePodDisruptionBudget()
if match, reason := k8sutil.SamePDB(pdb, newPDB); !match {
match, reason := c.comparePodDisruptionBudget(pdb, newPDB)
if !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err = c.updatePodDisruptionBudget(newPDB); err != nil {
return err
@ -326,10 +334,11 @@ func (c *Cluster) syncStatefulSet() error {
// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{})
if err != nil && !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("error during reading of statefulset: %v", err)
}
if err != nil {
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("error during reading of statefulset: %v", err)
}
// statefulset does not exist, try to re-create it
c.Statefulset = nil
c.logger.Infof("cluster's statefulset does not exist")
@ -354,6 +363,11 @@ func (c *Cluster) syncStatefulSet() error {
c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
} else {
desiredSts, err := c.generateStatefulSet(&c.Spec)
if err != nil {
return fmt.Errorf("could not generate statefulset: %v", err)
}
c.logger.Debugf("syncing statefulsets")
// check if there are still pods with a rolling update flag
for _, pod := range pods {
if c.getRollingUpdateFlagFromPod(&pod) {
@ -374,12 +388,21 @@ func (c *Cluster) syncStatefulSet() error {
// statefulset is already there, make sure we use its definition in order to compare with the spec.
c.Statefulset = sset
desiredSts, err := c.generateStatefulSet(&c.Spec)
if err != nil {
return fmt.Errorf("could not generate statefulset: %v", err)
}
cmp := c.compareStatefulSetWith(desiredSts)
if !cmp.rollingUpdate {
for _, pod := range pods {
if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations); changed {
patchData, err := metaAnnotationsPatch(desiredSts.Spec.Template.Annotations)
if err != nil {
return fmt.Errorf("could not form patch for pod %q annotations: %v", pod.Name, err)
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not patch annotations for pod %q: %v", pod.Name, err)
}
}
}
}
if !cmp.match {
if cmp.rollingUpdate {
podsToRecreate = make([]v1.Pod, 0)
@ -942,6 +965,17 @@ func (c *Cluster) updateSecret(
c.Secrets[secret.UID] = secret
}
if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed {
patchData, err := metaAnnotationsPatch(generatedSecret.Annotations)
if err != nil {
return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err)
}
_, err = c.KubeClient.Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err)
}
}
return nil
}
@ -1379,6 +1413,16 @@ func (c *Cluster) syncLogicalBackupJob() error {
}
c.logger.Info("the logical backup job is synced")
}
if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations); changed {
patchData, err := metaAnnotationsPatch(desiredJob.Annotations)
if err != nil {
return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err)
}
_, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), jobName, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not patch annotations of the logical backup job %q: %v", jobName, err)
}
}
return nil
}
if !k8sutil.ResourceNotFound(err) {

View File

@ -1,57 +1,259 @@
package cluster
import (
"bytes"
"context"
"fmt"
"io"
"maps"
"net/http"
"reflect"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/zalando/postgres-operator/mocks"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/patroni"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8sFake "k8s.io/client-go/kubernetes/fake"
)
var externalAnnotations = map[string]string{"existing": "annotation"}
func newFakeK8sAnnotationsClient() (k8sutil.KubernetesClient, *k8sFake.Clientset) {
clientSet := k8sFake.NewSimpleClientset()
acidClientSet := fakeacidv1.NewSimpleClientset()
return k8sutil.KubernetesClient{
PodDisruptionBudgetsGetter: clientSet.PolicyV1(),
ServicesGetter: clientSet.CoreV1(),
StatefulSetsGetter: clientSet.AppsV1(),
PostgresqlsGetter: acidClientSet.AcidV1(),
PodDisruptionBudgetsGetter: clientSet.PolicyV1(),
SecretsGetter: clientSet.CoreV1(),
ServicesGetter: clientSet.CoreV1(),
StatefulSetsGetter: clientSet.AppsV1(),
PostgresqlsGetter: acidClientSet.AcidV1(),
PersistentVolumeClaimsGetter: clientSet.CoreV1(),
PersistentVolumesGetter: clientSet.CoreV1(),
EndpointsGetter: clientSet.CoreV1(),
PodsGetter: clientSet.CoreV1(),
DeploymentsGetter: clientSet.AppsV1(),
}, clientSet
}
func TestInheritedAnnotations(t *testing.T) {
testName := "test inheriting annotations from manifest"
client, _ := newFakeK8sAnnotationsClient()
clusterName := "acid-test-cluster"
namespace := "default"
annotationValue := "acid"
role := Master
func clusterLabelsOptions(cluster *Cluster) metav1.ListOptions {
clusterLabel := labels.Set(map[string]string{cluster.OpConfig.ClusterNameLabel: cluster.Name})
return metav1.ListOptions{
LabelSelector: clusterLabel.String(),
}
}
func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[string]string) error {
clusterOptions := clusterLabelsOptions(cluster)
// helper functions
containsAnnotations := func(expected map[string]string, actual map[string]string, objName string, objType string) error {
if expected == nil {
if len(actual) != 0 {
return fmt.Errorf("%s %v expected not to have any annotations, got: %#v", objType, objName, actual)
}
} else if !(reflect.DeepEqual(expected, actual)) {
return fmt.Errorf("%s %v expected annotations: %#v, got: %#v", objType, objName, expected, actual)
}
return nil
}
updateAnnotations := func(annotations map[string]string) map[string]string {
result := make(map[string]string, 0)
for anno := range annotations {
if _, ok := externalAnnotations[anno]; !ok {
result[anno] = annotations[anno]
}
}
return result
}
checkSts := func(annotations map[string]string) error {
stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
stsAnnotations := updateAnnotations(annotations)
for _, sts := range stsList.Items {
if err := containsAnnotations(stsAnnotations, sts.Annotations, sts.ObjectMeta.Name, "StatefulSet"); err != nil {
return err
}
// pod template
if err := containsAnnotations(stsAnnotations, sts.Spec.Template.Annotations, sts.ObjectMeta.Name, "StatefulSet pod template"); err != nil {
return err
}
// pvc template
if err := containsAnnotations(stsAnnotations, sts.Spec.VolumeClaimTemplates[0].Annotations, sts.ObjectMeta.Name, "StatefulSet pvc template"); err != nil {
return err
}
}
return nil
}
checkPods := func(annotations map[string]string) error {
podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, pod := range podList.Items {
if err := containsAnnotations(annotations, pod.Annotations, pod.ObjectMeta.Name, "Pod"); err != nil {
return err
}
}
return nil
}
checkSvc := func(annotations map[string]string) error {
svcList, err := cluster.KubeClient.Services(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, svc := range svcList.Items {
if err := containsAnnotations(annotations, svc.Annotations, svc.ObjectMeta.Name, "Service"); err != nil {
return err
}
}
return nil
}
checkPdb := func(annotations map[string]string) error {
pdbList, err := cluster.KubeClient.PodDisruptionBudgets(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, pdb := range pdbList.Items {
if err := containsAnnotations(updateAnnotations(annotations), pdb.Annotations, pdb.ObjectMeta.Name, "Pod Disruption Budget"); err != nil {
return err
}
}
return nil
}
checkPvc := func(annotations map[string]string) error {
pvcList, err := cluster.KubeClient.PersistentVolumeClaims(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, pvc := range pvcList.Items {
if err := containsAnnotations(annotations, pvc.Annotations, pvc.ObjectMeta.Name, "Volume claim"); err != nil {
return err
}
}
return nil
}
checkPooler := func(annotations map[string]string) error {
for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
if err != nil {
return err
}
if err := containsAnnotations(annotations, deploy.Annotations, deploy.Name, "Deployment"); err != nil {
return err
}
if err := containsAnnotations(updateAnnotations(annotations), deploy.Spec.Template.Annotations, deploy.Name, "Pooler pod template"); err != nil {
return err
}
}
return nil
}
checkSecrets := func(annotations map[string]string) error {
secretList, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, secret := range secretList.Items {
if err := containsAnnotations(annotations, secret.Annotations, secret.Name, "Secret"); err != nil {
return err
}
}
return nil
}
checkEndpoints := func(annotations map[string]string) error {
endpointsList, err := cluster.KubeClient.Endpoints(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, ep := range endpointsList.Items {
if err := containsAnnotations(annotations, ep.Annotations, ep.Name, "Endpoints"); err != nil {
return err
}
}
return nil
}
checkFuncs := []func(map[string]string) error{
checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkPvc, checkSecrets, checkEndpoints,
}
for _, f := range checkFuncs {
if err := f(resultAnnotations); err != nil {
return err
}
}
return nil
}
func createPods(cluster *Cluster) []v1.Pod {
podsList := make([]v1.Pod, 0)
for i, role := range []PostgresRole{Master, Replica} {
podsList = append(podsList, v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", clusterName, i),
Namespace: namespace,
Labels: map[string]string{
"application": "spilo",
"cluster-name": clusterName,
"spilo-role": string(role),
},
},
})
podsList = append(podsList, v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-pooler-%s", clusterName, role),
Namespace: namespace,
Labels: cluster.connectionPoolerLabels(role, true).MatchLabels,
},
})
}
return podsList
}
func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, error) {
pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Annotations: map[string]string{
"owned-by": annotationValue,
"owned-by": "acid",
"foo": "bar", // should not be inherited
},
},
Spec: acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true),
EnableReplicaConnectionPooler: boolToPointer(true),
Volume: acidv1.Volume{
Size: "1Gi",
},
NumberOfInstances: 2,
},
}
var cluster = New(
cluster := New(
Config{
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),
PatroniAPICheckTimeout: time.Duration(5),
ConnectionPooler: config.ConnectionPooler{
ConnectionPoolerDefaultCPURequest: "100m",
ConnectionPoolerDefaultCPULimit: "100m",
@ -59,85 +261,225 @@ func TestInheritedAnnotations(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi",
NumberOfInstances: k8sutil.Int32ToPointer(1),
},
PDBNameFormat: "postgres-{cluster}-pdb",
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
InheritedAnnotations: []string{"owned-by"},
PodRoleLabel: "spilo-role",
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
InheritedAnnotations: []string{"owned-by"},
PodRoleLabel: "spilo-role",
ResourceCheckInterval: time.Duration(testResourceCheckInterval),
ResourceCheckTimeout: time.Duration(testResourceCheckTimeout),
MinInstances: -1,
MaxInstances: -1,
},
},
}, client, pg, logger, eventRecorder)
cluster.Name = clusterName
cluster.Namespace = namespace
// test annotationsSet function
inheritedAnnotations := cluster.annotationsSet(nil)
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(false).String(),
}
// check statefulset annotations
_, err := cluster.createStatefulSet()
assert.NoError(t, err)
if err != nil {
return nil, err
}
_, err = cluster.createService(Master)
if err != nil {
return nil, err
}
_, err = cluster.createPodDisruptionBudget()
if err != nil {
return nil, err
}
_, err = cluster.createConnectionPooler(mockInstallLookupFunction)
if err != nil {
return nil, err
}
pvcList := CreatePVCs(namespace, clusterName, cluster.labelsSet(false), 2, "1Gi")
for _, pvc := range pvcList.Items {
_, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &pvc, metav1.CreateOptions{})
if err != nil {
return nil, err
}
}
podsList := createPods(cluster)
for _, pod := range podsList {
_, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
if err != nil {
return nil, err
}
}
stsList, err := client.StatefulSets(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
return cluster, nil
}
func annotateResources(cluster *Cluster) error {
clusterOptions := clusterLabelsOptions(cluster)
stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, sts := range stsList.Items {
if !(util.MapContains(sts.ObjectMeta.Annotations, inheritedAnnotations)) {
t.Errorf("%s: StatefulSet %v not inherited annotations %#v, got %#v", testName, sts.ObjectMeta.Name, inheritedAnnotations, sts.ObjectMeta.Annotations)
}
// pod template
if !(util.MapContains(sts.Spec.Template.ObjectMeta.Annotations, inheritedAnnotations)) {
t.Errorf("%s: pod template %v not inherited annotations %#v, got %#v", testName, sts.ObjectMeta.Name, inheritedAnnotations, sts.ObjectMeta.Annotations)
}
// pvc template
if !(util.MapContains(sts.Spec.VolumeClaimTemplates[0].Annotations, inheritedAnnotations)) {
t.Errorf("%s: PVC template %v not inherited annotations %#v, got %#v", testName, sts.ObjectMeta.Name, inheritedAnnotations, sts.ObjectMeta.Annotations)
sts.Annotations = externalAnnotations
if _, err = cluster.KubeClient.StatefulSets(namespace).Update(context.TODO(), &sts, metav1.UpdateOptions{}); err != nil {
return err
}
}
// check service annotations
cluster.createService(Master)
svcList, err := client.Services(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, pod := range podList.Items {
pod.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Pods(namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil {
return err
}
}
svcList, err := cluster.KubeClient.Services(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, svc := range svcList.Items {
if !(util.MapContains(svc.ObjectMeta.Annotations, inheritedAnnotations)) {
t.Errorf("%s: Service %v not inherited annotations %#v, got %#v", testName, svc.ObjectMeta.Name, inheritedAnnotations, svc.ObjectMeta.Annotations)
svc.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Services(namespace).Update(context.TODO(), &svc, metav1.UpdateOptions{}); err != nil {
return err
}
}
// check pod disruption budget annotations
cluster.createPodDisruptionBudget()
pdbList, err := client.PodDisruptionBudgets(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
pdbList, err := cluster.KubeClient.PodDisruptionBudgets(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, pdb := range pdbList.Items {
if !(util.MapContains(pdb.ObjectMeta.Annotations, inheritedAnnotations)) {
t.Errorf("%s: Pod Disruption Budget %v not inherited annotations %#v, got %#v", testName, pdb.ObjectMeta.Name, inheritedAnnotations, pdb.ObjectMeta.Annotations)
pdb.Annotations = externalAnnotations
_, err = cluster.KubeClient.PodDisruptionBudgets(namespace).Update(context.TODO(), &pdb, metav1.UpdateOptions{})
if err != nil {
return err
}
}
// check pooler deployment annotations
cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{}
cluster.ConnectionPooler[role] = &ConnectionPoolerObjects{
Name: cluster.connectionPoolerName(role),
ClusterName: cluster.Name,
Namespace: cluster.Namespace,
Role: role,
pvcList, err := cluster.KubeClient.PersistentVolumeClaims(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
deploy, err := cluster.generateConnectionPoolerDeployment(cluster.ConnectionPooler[role])
for _, pvc := range pvcList.Items {
pvc.Annotations = externalAnnotations
if _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil {
return err
}
}
for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
if err != nil {
return err
}
deploy.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Deployments(namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{}); err != nil {
return err
}
}
secrets, err := cluster.KubeClient.Secrets(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, secret := range secrets.Items {
secret.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Secrets(namespace).Update(context.TODO(), &secret, metav1.UpdateOptions{}); err != nil {
return err
}
}
endpoints, err := cluster.KubeClient.Endpoints(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, ep := range endpoints.Items {
ep.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Endpoints(namespace).Update(context.TODO(), &ep, metav1.UpdateOptions{}); err != nil {
return err
}
}
return nil
}
func TestInheritedAnnotations(t *testing.T) {
// mocks
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client, _ := newFakeK8sAnnotationsClient()
mockClient := mocks.NewMockHTTPClient(ctrl)
cluster, err := newInheritedAnnotationsCluster(client)
assert.NoError(t, err)
if !(util.MapContains(deploy.ObjectMeta.Annotations, inheritedAnnotations)) {
t.Errorf("%s: Deployment %v not inherited annotations %#v, got %#v", testName, deploy.ObjectMeta.Name, inheritedAnnotations, deploy.ObjectMeta.Annotations)
configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}`
response := http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(configJson))),
}
mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes()
cluster.patroni = patroni.New(patroniLogger, mockClient)
err = cluster.Sync(&cluster.Postgresql)
assert.NoError(t, err)
filterLabels := cluster.labelsSet(false)
// Finally, tests!
result := map[string]string{"owned-by": "acid"}
assert.True(t, reflect.DeepEqual(result, cluster.annotationsSet(nil)))
// 1. Check initial state
err = checkResourcesInheritedAnnotations(cluster, result)
assert.NoError(t, err)
// 2. Check annotation value change
// 2.1 Sync event
newSpec := cluster.Postgresql.DeepCopy()
newSpec.Annotations["owned-by"] = "fooSync"
result["owned-by"] = "fooSync"
err = cluster.Sync(newSpec)
assert.NoError(t, err)
err = checkResourcesInheritedAnnotations(cluster, result)
assert.NoError(t, err)
// + existing PVC without annotations
cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 3, "1Gi").Items[2], metav1.CreateOptions{})
err = cluster.Sync(newSpec)
assert.NoError(t, err)
err = checkResourcesInheritedAnnotations(cluster, result)
assert.NoError(t, err)
// 2.2 Update event
newSpec = cluster.Postgresql.DeepCopy()
newSpec.Annotations["owned-by"] = "fooUpdate"
result["owned-by"] = "fooUpdate"
// + new PVC
cluster.KubeClient.PersistentVolumeClaims(namespace).Create(context.TODO(), &CreatePVCs(namespace, clusterName, filterLabels, 4, "1Gi").Items[3], metav1.CreateOptions{})
err = cluster.Update(cluster.Postgresql.DeepCopy(), newSpec)
assert.NoError(t, err)
err = checkResourcesInheritedAnnotations(cluster, result)
assert.NoError(t, err)
// 3. Existing annotations (should not be removed)
err = annotateResources(cluster)
assert.NoError(t, err)
maps.Copy(result, externalAnnotations)
err = cluster.Sync(newSpec.DeepCopy())
assert.NoError(t, err)
err = checkResourcesInheritedAnnotations(cluster, result)
assert.NoError(t, err)
}
func Test_trimCronjobName(t *testing.T) {

View File

@ -9,9 +9,9 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/aws/aws-sdk-go/aws"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/constants"
@ -42,18 +42,14 @@ func (c *Cluster) syncVolumes() error {
c.logger.Errorf("errors occured during EBS volume adjustments: %v", err)
}
}
}
// resize pvc to adjust filesystem size until better K8s support
if err = c.syncVolumeClaims(); err != nil {
err = fmt.Errorf("could not sync persistent volume claims: %v", err)
return err
}
} else if c.OpConfig.StorageResizeMode == "pvc" {
if err = c.syncVolumeClaims(); err != nil {
err = fmt.Errorf("could not sync persistent volume claims: %v", err)
return err
}
} else if c.OpConfig.StorageResizeMode == "ebs" {
if err = c.syncVolumeClaims(); err != nil {
err = fmt.Errorf("could not sync persistent volume claims: %v", err)
return err
}
if c.OpConfig.StorageResizeMode == "ebs" {
// potentially enlarge volumes before changing the statefulset. By doing that
// in this order we make sure the operator is not stuck waiting for a pod that
// cannot start because it ran out of disk space.
@ -64,8 +60,6 @@ func (c *Cluster) syncVolumes() error {
err = fmt.Errorf("could not sync persistent volumes: %v", err)
return err
}
} else {
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.")
}
return nil
@ -187,18 +181,58 @@ func (c *Cluster) populateVolumeMetaData() error {
func (c *Cluster) syncVolumeClaims() error {
c.setProcessName("syncing volume claims")
needsResizing, err := c.volumeClaimsNeedResizing(c.Spec.Volume)
ignoreResize := false
if c.OpConfig.StorageResizeMode == "off" || c.OpConfig.StorageResizeMode == "ebs" {
ignoreResize = true
c.logger.Debugf("Storage resize mode is set to %q. Skipping volume size sync of PVCs.", c.OpConfig.StorageResizeMode)
}
newSize, err := resource.ParseQuantity(c.Spec.Volume.Size)
if err != nil {
return fmt.Errorf("could not compare size of the volume claims: %v", err)
return fmt.Errorf("could not parse volume size from the manifest: %v", err)
}
manifestSize := quantityToGigabyte(newSize)
if !needsResizing {
c.logger.Infof("volume claims do not require changes")
return nil
pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return fmt.Errorf("could not receive persistent volume claims: %v", err)
}
for _, pvc := range pvcs {
needsUpdate := false
currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage])
if !ignoreResize && currentSize != manifestSize {
if currentSize < manifestSize {
pvc.Spec.Resources.Requests[v1.ResourceStorage] = newSize
needsUpdate = true
c.logger.Debugf("persistent volume claim for volume %q needs to be resized", pvc.Name)
} else {
c.logger.Warningf("cannot shrink persistent volume")
}
}
if err := c.resizeVolumeClaims(c.Spec.Volume); err != nil {
return fmt.Errorf("could not sync volume claims: %v", err)
if needsUpdate {
c.logger.Debugf("updating persistent volume claim definition for volume %q", pvc.Name)
if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("could not update persistent volume claim: %q", err)
}
c.logger.Debugf("successfully updated persistent volume claim %q", pvc.Name)
} else {
c.logger.Debugf("volume claim for volume %q do not require updates", pvc.Name)
}
newAnnotations := c.annotationsSet(nil)
if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations); changed {
patchData, err := metaAnnotationsPatch(newAnnotations)
if err != nil {
return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err)
}
_, err = c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not patch annotations of the persistent volume claim for volume %q: %v", pvc.Name, err)
}
}
}
c.logger.Infof("volume claims have been synced successfully")
@ -261,35 +295,6 @@ func (c *Cluster) deletePersistentVolumeClaims() error {
return nil
}
func (c *Cluster) resizeVolumeClaims(newVolume acidv1.Volume) error {
c.logger.Debugln("resizing PVCs")
pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return err
}
newQuantity, err := resource.ParseQuantity(newVolume.Size)
if err != nil {
return fmt.Errorf("could not parse volume size: %v", err)
}
newSize := quantityToGigabyte(newQuantity)
for _, pvc := range pvcs {
volumeSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage])
if volumeSize >= newSize {
if volumeSize > newSize {
c.logger.Warningf("cannot shrink persistent volume")
}
continue
}
pvc.Spec.Resources.Requests[v1.ResourceStorage] = newQuantity
c.logger.Debugf("updating persistent volume claim definition for volume %q", pvc.Name)
if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("could not update persistent volume claim: %q", err)
}
c.logger.Debugf("successfully updated persistent volume claim %q", pvc.Name)
}
return nil
}
func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) {
result := make([]*v1.PersistentVolume, 0)
@ -406,25 +411,6 @@ func (c *Cluster) resizeVolumes() error {
return nil
}
func (c *Cluster) volumeClaimsNeedResizing(newVolume acidv1.Volume) (bool, error) {
newSize, err := resource.ParseQuantity(newVolume.Size)
manifestSize := quantityToGigabyte(newSize)
if err != nil {
return false, fmt.Errorf("could not parse volume size from the manifest: %v", err)
}
pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return false, fmt.Errorf("could not receive persistent volume claims: %v", err)
}
for _, pvc := range pvcs {
currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage])
if currentSize != manifestSize {
return true, nil
}
}
return false, nil
}
func (c *Cluster) volumesNeedResizing() (bool, error) {
newQuantity, _ := resource.ParseQuantity(c.Spec.Volume.Size)
newSize := quantityToGigabyte(newQuantity)

View File

@ -74,6 +74,7 @@ func TestResizeVolumeClaim(t *testing.T) {
cluster.Name = clusterName
cluster.Namespace = namespace
filterLabels := cluster.labelsSet(false)
cluster.Spec.Volume.Size = newVolumeSize
// define and create PVCs for 1Gi volumes
pvcList := CreatePVCs(namespace, clusterName, filterLabels, 2, "1Gi")
@ -85,7 +86,7 @@ func TestResizeVolumeClaim(t *testing.T) {
}
// test resizing
cluster.resizeVolumeClaims(acidv1.Volume{Size: newVolumeSize})
cluster.syncVolumes()
pvcs, err := cluster.listPersistentVolumeClaims()
assert.NoError(t, err)

View File

@ -3,7 +3,6 @@ package k8sutil
import (
"context"
"fmt"
"reflect"
b64 "encoding/base64"
"encoding/json"
@ -17,7 +16,6 @@ import (
"github.com/zalando/postgres-operator/pkg/spec"
apiappsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -242,17 +240,6 @@ func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg
return updatedPg, nil
}
// SamePDB compares the PodDisruptionBudgets
func SamePDB(cur, new *apipolicyv1.PodDisruptionBudget) (match bool, reason string) {
//TODO: improve comparison
match = reflect.DeepEqual(new.Spec, cur.Spec)
if !match {
reason = "new PDB spec does not match the current one"
}
return
}
func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) {
oldFormatSecret := &v1.Secret{}
oldFormatSecret.Name = "testcluster"