Ensure podAnnotations are removed from pods if reset in the config (#2826)
This commit is contained in:
		
							parent
							
								
									b0cfeb30ea
								
							
						
					
					
						commit
						f49b4f1e97
					
				|  | @ -105,10 +105,17 @@ type Cluster struct { | |||
| } | ||||
| 
 | ||||
| type compareStatefulsetResult struct { | ||||
| 	match         bool | ||||
| 	replace       bool | ||||
| 	rollingUpdate bool | ||||
| 	reasons       []string | ||||
| 	match                 bool | ||||
| 	replace               bool | ||||
| 	rollingUpdate         bool | ||||
| 	reasons               []string | ||||
| 	deletedPodAnnotations []string | ||||
| } | ||||
| 
 | ||||
| type compareLogicalBackupJobResult struct { | ||||
| 	match                 bool | ||||
| 	reasons               []string | ||||
| 	deletedPodAnnotations []string | ||||
| } | ||||
| 
 | ||||
| // New creates a new cluster. This function should be called from a controller.
 | ||||
|  | @ -431,6 +438,7 @@ func (c *Cluster) Create() (err error) { | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compareStatefulsetResult { | ||||
| 	deletedPodAnnotations := []string{} | ||||
| 	reasons := make([]string, 0) | ||||
| 	var match, needsRollUpdate, needsReplace bool | ||||
| 
 | ||||
|  | @ -445,7 +453,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa | |||
| 		needsReplace = true | ||||
| 		reasons = append(reasons, "new statefulset's ownerReferences do not match") | ||||
| 	} | ||||
| 	if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations); changed { | ||||
| 	if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations, nil); changed { | ||||
| 		match = false | ||||
| 		needsReplace = true | ||||
| 		reasons = append(reasons, "new statefulset's annotations do not match: "+reason) | ||||
|  | @ -519,7 +527,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed { | ||||
| 	if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations, &deletedPodAnnotations); changed { | ||||
| 		match = false | ||||
| 		needsReplace = true | ||||
| 		reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason) | ||||
|  | @ -541,7 +549,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa | |||
| 				reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i)) | ||||
| 				continue | ||||
| 			} | ||||
| 			if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed { | ||||
| 			if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations, nil); changed { | ||||
| 				needsReplace = true | ||||
| 				reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q do not match the current ones: %s", name, reason)) | ||||
| 			} | ||||
|  | @ -579,7 +587,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa | |||
| 		match = false | ||||
| 	} | ||||
| 
 | ||||
| 	return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace} | ||||
| 	return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace, deletedPodAnnotations: deletedPodAnnotations} | ||||
| } | ||||
| 
 | ||||
| type containerCondition func(a, b v1.Container) bool | ||||
|  | @ -781,7 +789,7 @@ func volumeMountExists(mount v1.VolumeMount, mounts []v1.VolumeMount) bool { | |||
| 	return false | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) { | ||||
| func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]string) (bool, string) { | ||||
| 	reason := "" | ||||
| 	ignoredAnnotations := make(map[string]bool) | ||||
| 	for _, ignore := range c.OpConfig.IgnoredAnnotations { | ||||
|  | @ -794,6 +802,9 @@ func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) | |||
| 		} | ||||
| 		if _, ok := new[key]; !ok { | ||||
| 			reason += fmt.Sprintf(" Removed %q.", key) | ||||
| 			if removedList != nil { | ||||
| 				*removedList = append(*removedList, key) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
|  | @ -836,41 +847,46 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { | |||
| 	return true, "" | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) { | ||||
| func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLogicalBackupJobResult { | ||||
| 	deletedPodAnnotations := []string{} | ||||
| 	reasons := make([]string, 0) | ||||
| 	match := true | ||||
| 
 | ||||
| 	if cur.Spec.Schedule != new.Spec.Schedule { | ||||
| 		return false, fmt.Sprintf("new job's schedule %q does not match the current one %q", | ||||
| 			new.Spec.Schedule, cur.Spec.Schedule) | ||||
| 		match = false | ||||
| 		reasons = append(reasons, fmt.Sprintf("new job's schedule %q does not match the current one %q", new.Spec.Schedule, cur.Spec.Schedule)) | ||||
| 	} | ||||
| 
 | ||||
| 	newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image | ||||
| 	curImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image | ||||
| 	if newImage != curImage { | ||||
| 		return false, fmt.Sprintf("new job's image %q does not match the current one %q", | ||||
| 			newImage, curImage) | ||||
| 		match = false | ||||
| 		reasons = append(reasons, fmt.Sprintf("new job's image %q does not match the current one %q", newImage, curImage)) | ||||
| 	} | ||||
| 
 | ||||
| 	newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations | ||||
| 	curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations | ||||
| 	if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed { | ||||
| 		return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason) | ||||
| 	if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation, &deletedPodAnnotations); changed { | ||||
| 		match = false | ||||
| 		reasons = append(reasons, fmt.Sprint("new job's pod template metadata annotations do not match "+reason)) | ||||
| 	} | ||||
| 
 | ||||
| 	newPgVersion := getPgVersion(new) | ||||
| 	curPgVersion := getPgVersion(cur) | ||||
| 	if newPgVersion != curPgVersion { | ||||
| 		return false, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", | ||||
| 			newPgVersion, curPgVersion) | ||||
| 		match = false | ||||
| 		reasons = append(reasons, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", newPgVersion, curPgVersion)) | ||||
| 	} | ||||
| 
 | ||||
| 	needsReplace := false | ||||
| 	reasons := make([]string, 0) | ||||
| 	needsReplace, reasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, reasons) | ||||
| 	contReasons := make([]string, 0) | ||||
| 	needsReplace, contReasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, contReasons) | ||||
| 	if needsReplace { | ||||
| 		return false, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(reasons, `', '`)) | ||||
| 		match = false | ||||
| 		reasons = append(reasons, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(contReasons, `', '`))) | ||||
| 	} | ||||
| 
 | ||||
| 	return true, "" | ||||
| 	return &compareLogicalBackupJobResult{match: match, reasons: reasons, deletedPodAnnotations: deletedPodAnnotations} | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) { | ||||
|  | @ -881,7 +897,7 @@ func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBud | |||
| 	if !reflect.DeepEqual(new.ObjectMeta.OwnerReferences, cur.ObjectMeta.OwnerReferences) { | ||||
| 		return false, "new PDB's owner references do not match the current ones" | ||||
| 	} | ||||
| 	if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed { | ||||
| 	if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations, nil); changed { | ||||
| 		return false, "new PDB's annotations do not match the current ones:" + reason | ||||
| 	} | ||||
| 	return true, "" | ||||
|  | @ -1021,7 +1037,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | |||
| 		// only when streams were not specified in oldSpec but in newSpec
 | ||||
| 		needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 | ||||
| 
 | ||||
| 		annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations) | ||||
| 		annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations, nil) | ||||
| 
 | ||||
| 		initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser | ||||
| 		if initUsers { | ||||
|  |  | |||
|  | @ -1680,12 +1680,20 @@ func TestCompareLogicalBackupJob(t *testing.T) { | |||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			match, reason := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob) | ||||
| 			if match != tt.match { | ||||
| 				t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), match, currentCronJob, desiredCronJob) | ||||
| 			} else { | ||||
| 				if !strings.HasPrefix(reason, tt.reason) { | ||||
| 					t.Errorf("%s - expected reason prefix %s, found %s", t.Name(), tt.reason, reason) | ||||
| 			cmp := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob) | ||||
| 			if cmp.match != tt.match { | ||||
| 				t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), cmp.match, currentCronJob, desiredCronJob) | ||||
| 			} else if !cmp.match { | ||||
| 				found := false | ||||
| 				for _, reason := range cmp.reasons { | ||||
| 					if strings.HasPrefix(reason, tt.reason) { | ||||
| 						found = true | ||||
| 						break | ||||
| 					} | ||||
| 					found = false | ||||
| 				} | ||||
| 				if !found { | ||||
| 					t.Errorf("%s - expected reason prefix %s, not found in %#v", t.Name(), tt.reason, cmp.reasons) | ||||
| 				} | ||||
| 			} | ||||
| 		}) | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ package cluster | |||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"strings" | ||||
|  | @ -977,6 +978,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql | |||
| 		err           error | ||||
| 	) | ||||
| 
 | ||||
| 	updatedPodAnnotations := map[string]*string{} | ||||
| 	syncReason := make([]string, 0) | ||||
| 	deployment, err = c.KubeClient. | ||||
| 		Deployments(c.Namespace). | ||||
|  | @ -1038,9 +1040,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql | |||
| 		} | ||||
| 
 | ||||
| 		newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec)) | ||||
| 		if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed { | ||||
| 		deletedPodAnnotations := []string{} | ||||
| 		if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations, &deletedPodAnnotations); changed { | ||||
| 			specSync = true | ||||
| 			syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...) | ||||
| 
 | ||||
| 			for _, anno := range deletedPodAnnotations { | ||||
| 				updatedPodAnnotations[anno] = nil | ||||
| 			} | ||||
| 			templateMetadataReq := map[string]map[string]map[string]map[string]map[string]*string{ | ||||
| 				"spec": {"template": {"metadata": {"annotations": updatedPodAnnotations}}}} | ||||
| 			patch, err := json.Marshal(templateMetadataReq) | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pod template: %v", role, err) | ||||
| 			} | ||||
| 			deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(), | ||||
| 				deployment.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "") | ||||
| 			if err != nil { | ||||
| 				c.logger.Errorf("failed to patch %s connection pooler's pod template: %v", role, err) | ||||
| 				return nil, err | ||||
| 			} | ||||
| 
 | ||||
| 			deployment.Spec.Template.Annotations = newPodAnnotations | ||||
| 		} | ||||
| 
 | ||||
|  | @ -1064,7 +1084,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql | |||
| 		} | ||||
| 
 | ||||
| 		newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations
 | ||||
| 		if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations); changed { | ||||
| 		if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations, nil); changed { | ||||
| 			deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
|  | @ -1098,14 +1118,20 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql | |||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("could not delete pooler pod: %v", err) | ||||
| 			} | ||||
| 		} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { | ||||
| 			patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) | ||||
| 		} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations, nil); changed { | ||||
| 			metadataReq := map[string]map[string]map[string]*string{"metadata": {}} | ||||
| 
 | ||||
| 			for anno, val := range deployment.Spec.Template.Annotations { | ||||
| 				updatedPodAnnotations[anno] = &val | ||||
| 			} | ||||
| 			_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) | ||||
| 			metadataReq["metadata"]["annotations"] = updatedPodAnnotations | ||||
| 			patch, err := json.Marshal(metadataReq) | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) | ||||
| 				return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pods: %v", role, err) | ||||
| 			} | ||||
| 			_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("could not patch annotations for %s connection pooler's pod %q: %v", role, pod.Name, err) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  |  | |||
|  | @ -329,7 +329,7 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed { | ||||
| 	if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations, nil); changed { | ||||
| 		patchData, err := metaAnnotationsPatch(newService.Annotations) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err) | ||||
|  |  | |||
|  | @ -545,7 +545,7 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab | |||
| 	for newKey, newValue := range newEventStreams.Annotations { | ||||
| 		desiredAnnotations[newKey] = newValue | ||||
| 	} | ||||
| 	if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed { | ||||
| 	if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations, nil); changed { | ||||
| 		match = false | ||||
| 		reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason)) | ||||
| 	} | ||||
|  |  | |||
|  | @ -235,7 +235,7 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { | |||
| 		maps.Copy(annotations, cm.Annotations) | ||||
| 		// Patroni can add extra annotations so incl. current annotations in desired annotations
 | ||||
| 		desiredAnnotations := c.annotationsSet(cm.Annotations) | ||||
| 		if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { | ||||
| 		if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed { | ||||
| 			patchData, err := metaAnnotationsPatch(desiredAnnotations) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("could not form patch for %s config map: %v", configMapName, err) | ||||
|  | @ -280,7 +280,7 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { | |||
| 		maps.Copy(annotations, ep.Annotations) | ||||
| 		// Patroni can add extra annotations so incl. current annotations in desired annotations
 | ||||
| 		desiredAnnotations := c.annotationsSet(ep.Annotations) | ||||
| 		if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { | ||||
| 		if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed { | ||||
| 			patchData, err := metaAnnotationsPatch(desiredAnnotations) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("could not form patch for %s endpoint: %v", endpointName, err) | ||||
|  | @ -325,7 +325,7 @@ func (c *Cluster) syncPatroniService() error { | |||
| 		maps.Copy(annotations, svc.Annotations) | ||||
| 		// Patroni can add extra annotations so incl. current annotations in desired annotations
 | ||||
| 		desiredAnnotations := c.annotationsSet(svc.Annotations) | ||||
| 		if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { | ||||
| 		if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed { | ||||
| 			patchData, err := metaAnnotationsPatch(desiredAnnotations) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("could not form patch for %s service: %v", serviceName, err) | ||||
|  | @ -417,7 +417,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { | |||
| 				return fmt.Errorf("could not update %s endpoint: %v", role, err) | ||||
| 			} | ||||
| 		} else { | ||||
| 			if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed { | ||||
| 			if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations, nil); changed { | ||||
| 				patchData, err := metaAnnotationsPatch(desiredEp.Annotations) | ||||
| 				if err != nil { | ||||
| 					return fmt.Errorf("could not form patch for %s endpoint: %v", role, err) | ||||
|  | @ -567,13 +567,22 @@ func (c *Cluster) syncStatefulSet() error { | |||
| 
 | ||||
| 		cmp := c.compareStatefulSetWith(desiredSts) | ||||
| 		if !cmp.rollingUpdate { | ||||
| 			updatedPodAnnotations := map[string]*string{} | ||||
| 			for _, anno := range cmp.deletedPodAnnotations { | ||||
| 				updatedPodAnnotations[anno] = nil | ||||
| 			} | ||||
| 			for anno, val := range desiredSts.Spec.Template.Annotations { | ||||
| 				updatedPodAnnotations[anno] = &val | ||||
| 			} | ||||
| 			metadataReq := map[string]map[string]map[string]*string{"metadata": {"annotations": updatedPodAnnotations}} | ||||
| 			patch, err := json.Marshal(metadataReq) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("could not form patch for pod annotations: %v", err) | ||||
| 			} | ||||
| 
 | ||||
| 			for _, pod := range pods { | ||||
| 				if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations); changed { | ||||
| 					patchData, err := metaAnnotationsPatch(desiredSts.Spec.Template.Annotations) | ||||
| 					if err != nil { | ||||
| 						return fmt.Errorf("could not form patch for pod %q annotations: %v", pod.Name, err) | ||||
| 					} | ||||
| 					_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) | ||||
| 				if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations, nil); changed { | ||||
| 					_, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) | ||||
| 					if err != nil { | ||||
| 						return fmt.Errorf("could not patch annotations for pod %q: %v", pod.Name, err) | ||||
| 					} | ||||
|  | @ -1150,7 +1159,7 @@ func (c *Cluster) updateSecret( | |||
| 		c.Secrets[secret.UID] = secret | ||||
| 	} | ||||
| 
 | ||||
| 	if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed { | ||||
| 	if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations, nil); changed { | ||||
| 		patchData, err := metaAnnotationsPatch(generatedSecret.Annotations) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) | ||||
|  | @ -1595,19 +1604,38 @@ func (c *Cluster) syncLogicalBackupJob() error { | |||
| 			} | ||||
| 			c.logger.Infof("logical backup job %s updated", c.getLogicalBackupJobName()) | ||||
| 		} | ||||
| 		if match, reason := c.compareLogicalBackupJob(job, desiredJob); !match { | ||||
| 		if cmp := c.compareLogicalBackupJob(job, desiredJob); !cmp.match { | ||||
| 			c.logger.Infof("logical job %s is not in the desired state and needs to be updated", | ||||
| 				c.getLogicalBackupJobName(), | ||||
| 			) | ||||
| 			if reason != "" { | ||||
| 				c.logger.Infof("reason: %s", reason) | ||||
| 			if len(cmp.reasons) != 0 { | ||||
| 				for _, reason := range cmp.reasons { | ||||
| 					c.logger.Infof("reason: %s", reason) | ||||
| 				} | ||||
| 			} | ||||
| 			if len(cmp.deletedPodAnnotations) != 0 { | ||||
| 				templateMetadataReq := map[string]map[string]map[string]map[string]map[string]map[string]map[string]*string{ | ||||
| 					"spec": {"jobTemplate": {"spec": {"template": {"metadata": {"annotations": {}}}}}}} | ||||
| 				for _, anno := range cmp.deletedPodAnnotations { | ||||
| 					templateMetadataReq["spec"]["jobTemplate"]["spec"]["template"]["metadata"]["annotations"][anno] = nil | ||||
| 				} | ||||
| 				patch, err := json.Marshal(templateMetadataReq) | ||||
| 				if err != nil { | ||||
| 					return fmt.Errorf("could not marshal ObjectMeta for logical backup job %q pod template: %v", jobName, err) | ||||
| 				} | ||||
| 
 | ||||
| 				job, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), jobName, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "") | ||||
| 				if err != nil { | ||||
| 					c.logger.Errorf("failed to remove annotations from the logical backup job %q pod template: %v", jobName, err) | ||||
| 					return err | ||||
| 				} | ||||
| 			} | ||||
| 			if err = c.patchLogicalBackupJob(desiredJob); err != nil { | ||||
| 				return fmt.Errorf("could not update logical backup job to match desired state: %v", err) | ||||
| 			} | ||||
| 			c.logger.Info("the logical backup job is synced") | ||||
| 		} | ||||
| 		if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations); changed { | ||||
| 		if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations, nil); changed { | ||||
| 			patchData, err := metaAnnotationsPatch(desiredJob.Annotations) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err) | ||||
|  |  | |||
|  | @ -142,6 +142,181 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestPodAnnotationsSync(t *testing.T) { | ||||
| 	clusterName := "acid-test-cluster-2" | ||||
| 	namespace := "default" | ||||
| 	podAnnotation := "no-scale-down" | ||||
| 	podAnnotations := map[string]string{podAnnotation: "true"} | ||||
| 	customPodAnnotation := "foo" | ||||
| 	customPodAnnotations := map[string]string{customPodAnnotation: "true"} | ||||
| 
 | ||||
| 	ctrl := gomock.NewController(t) | ||||
| 	defer ctrl.Finish() | ||||
| 	mockClient := mocks.NewMockHTTPClient(ctrl) | ||||
| 	client, _ := newFakeK8sAnnotationsClient() | ||||
| 
 | ||||
| 	pg := acidv1.Postgresql{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      clusterName, | ||||
| 			Namespace: namespace, | ||||
| 		}, | ||||
| 		Spec: acidv1.PostgresSpec{ | ||||
| 			Volume: acidv1.Volume{ | ||||
| 				Size: "1Gi", | ||||
| 			}, | ||||
| 			EnableConnectionPooler:        boolToPointer(true), | ||||
| 			EnableLogicalBackup:           true, | ||||
| 			EnableReplicaConnectionPooler: boolToPointer(true), | ||||
| 			PodAnnotations:                podAnnotations, | ||||
| 			NumberOfInstances:             2, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	var cluster = New( | ||||
| 		Config{ | ||||
| 			OpConfig: config.Config{ | ||||
| 				PatroniAPICheckInterval: time.Duration(1), | ||||
| 				PatroniAPICheckTimeout:  time.Duration(5), | ||||
| 				PodManagementPolicy:     "ordered_ready", | ||||
| 				CustomPodAnnotations:    customPodAnnotations, | ||||
| 				ConnectionPooler: config.ConnectionPooler{ | ||||
| 					ConnectionPoolerDefaultCPURequest:    "100m", | ||||
| 					ConnectionPoolerDefaultCPULimit:      "100m", | ||||
| 					ConnectionPoolerDefaultMemoryRequest: "100Mi", | ||||
| 					ConnectionPoolerDefaultMemoryLimit:   "100Mi", | ||||
| 					NumberOfInstances:                    k8sutil.Int32ToPointer(1), | ||||
| 				}, | ||||
| 				Resources: config.Resources{ | ||||
| 					ClusterLabels:         map[string]string{"application": "spilo"}, | ||||
| 					ClusterNameLabel:      "cluster-name", | ||||
| 					DefaultCPURequest:     "300m", | ||||
| 					DefaultCPULimit:       "300m", | ||||
| 					DefaultMemoryRequest:  "300Mi", | ||||
| 					DefaultMemoryLimit:    "300Mi", | ||||
| 					MaxInstances:          -1, | ||||
| 					PodRoleLabel:          "spilo-role", | ||||
| 					ResourceCheckInterval: time.Duration(3), | ||||
| 					ResourceCheckTimeout:  time.Duration(10), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, client, pg, logger, eventRecorder) | ||||
| 
 | ||||
| 	configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}` | ||||
| 	response := http.Response{ | ||||
| 		StatusCode: 200, | ||||
| 		Body:       io.NopCloser(bytes.NewReader([]byte(configJson))), | ||||
| 	} | ||||
| 
 | ||||
| 	mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes() | ||||
| 	cluster.patroni = patroni.New(patroniLogger, mockClient) | ||||
| 	cluster.Name = clusterName | ||||
| 	cluster.Namespace = namespace | ||||
| 	clusterOptions := clusterLabelsOptions(cluster) | ||||
| 
 | ||||
| 	// create a statefulset
 | ||||
| 	_, err := cluster.createStatefulSet() | ||||
| 	assert.NoError(t, err) | ||||
| 	// create a pods
 | ||||
| 	podsList := createPods(cluster) | ||||
| 	for _, pod := range podsList { | ||||
| 		_, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) | ||||
| 		assert.NoError(t, err) | ||||
| 	} | ||||
| 	// create connection pooler
 | ||||
| 	_, err = cluster.createConnectionPooler(mockInstallLookupFunction) | ||||
| 	assert.NoError(t, err) | ||||
| 
 | ||||
| 	// create cron job
 | ||||
| 	err = cluster.createLogicalBackupJob() | ||||
| 	assert.NoError(t, err) | ||||
| 
 | ||||
| 	annotateResources(cluster) | ||||
| 	err = cluster.Sync(&cluster.Postgresql) | ||||
| 	assert.NoError(t, err) | ||||
| 
 | ||||
| 	// 1. PodAnnotations set
 | ||||
| 	stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) | ||||
| 	assert.NoError(t, err) | ||||
| 	for _, sts := range stsList.Items { | ||||
| 		for _, annotation := range []string{podAnnotation, customPodAnnotation} { | ||||
| 			assert.Contains(t, sts.Spec.Template.Annotations, annotation) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for _, role := range []PostgresRole{Master, Replica} { | ||||
| 		deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) | ||||
| 		assert.NoError(t, err) | ||||
| 		for _, annotation := range []string{podAnnotation, customPodAnnotation} { | ||||
| 			assert.Contains(t, deploy.Spec.Template.Annotations, annotation, | ||||
| 				fmt.Sprintf("pooler deployment pod template %s should contain annotation %s, found %#v", | ||||
| 					deploy.Name, annotation, deploy.Spec.Template.Annotations)) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) | ||||
| 	assert.NoError(t, err) | ||||
| 	for _, pod := range podList.Items { | ||||
| 		for _, annotation := range []string{podAnnotation, customPodAnnotation} { | ||||
| 			assert.Contains(t, pod.Annotations, annotation, | ||||
| 				fmt.Sprintf("pod %s should contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations)) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) | ||||
| 	assert.NoError(t, err) | ||||
| 	for _, cronJob := range cronJobList.Items { | ||||
| 		for _, annotation := range []string{podAnnotation, customPodAnnotation} { | ||||
| 			assert.Contains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation, | ||||
| 				fmt.Sprintf("logical backup cron job's pod template should contain annotation %s, found %#v", | ||||
| 					annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// 2 PodAnnotations removed
 | ||||
| 	newSpec := cluster.Postgresql.DeepCopy() | ||||
| 	newSpec.Spec.PodAnnotations = nil | ||||
| 	cluster.OpConfig.CustomPodAnnotations = nil | ||||
| 	err = cluster.Sync(newSpec) | ||||
| 	assert.NoError(t, err) | ||||
| 
 | ||||
| 	stsList, err = cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) | ||||
| 	assert.NoError(t, err) | ||||
| 	for _, sts := range stsList.Items { | ||||
| 		for _, annotation := range []string{podAnnotation, customPodAnnotation} { | ||||
| 			assert.NotContains(t, sts.Spec.Template.Annotations, annotation) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for _, role := range []PostgresRole{Master, Replica} { | ||||
| 		deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{}) | ||||
| 		assert.NoError(t, err) | ||||
| 		for _, annotation := range []string{podAnnotation, customPodAnnotation} { | ||||
| 			assert.NotContains(t, deploy.Spec.Template.Annotations, annotation, | ||||
| 				fmt.Sprintf("pooler deployment pod template %s should not contain annotation %s, found %#v", | ||||
| 					deploy.Name, annotation, deploy.Spec.Template.Annotations)) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	podList, err = cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions) | ||||
| 	assert.NoError(t, err) | ||||
| 	for _, pod := range podList.Items { | ||||
| 		for _, annotation := range []string{podAnnotation, customPodAnnotation} { | ||||
| 			assert.NotContains(t, pod.Annotations, annotation, | ||||
| 				fmt.Sprintf("pod %s should not contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations)) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	cronJobList, err = cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) | ||||
| 	assert.NoError(t, err) | ||||
| 	for _, cronJob := range cronJobList.Items { | ||||
| 		for _, annotation := range []string{podAnnotation, customPodAnnotation} { | ||||
| 			assert.NotContains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation, | ||||
| 				fmt.Sprintf("logical backup cron job's pod template should not contain annotation %s, found %#v", | ||||
| 					annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations)) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { | ||||
| 	testName := "test config comparison" | ||||
| 	client, _ := newFakeK8sSyncClient() | ||||
|  |  | |||
|  | @ -247,18 +247,18 @@ func createPods(cluster *Cluster) []v1.Pod { | |||
| 	for i, role := range []PostgresRole{Master, Replica} { | ||||
| 		podsList = append(podsList, v1.Pod{ | ||||
| 			ObjectMeta: metav1.ObjectMeta{ | ||||
| 				Name:      fmt.Sprintf("%s-%d", clusterName, i), | ||||
| 				Name:      fmt.Sprintf("%s-%d", cluster.Name, i), | ||||
| 				Namespace: namespace, | ||||
| 				Labels: map[string]string{ | ||||
| 					"application":  "spilo", | ||||
| 					"cluster-name": clusterName, | ||||
| 					"cluster-name": cluster.Name, | ||||
| 					"spilo-role":   string(role), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}) | ||||
| 		podsList = append(podsList, v1.Pod{ | ||||
| 			ObjectMeta: metav1.ObjectMeta{ | ||||
| 				Name:      fmt.Sprintf("%s-pooler-%s", clusterName, role), | ||||
| 				Name:      fmt.Sprintf("%s-pooler-%s", cluster.Name, role), | ||||
| 				Namespace: namespace, | ||||
| 				Labels:    cluster.connectionPoolerLabels(role, true).MatchLabels, | ||||
| 			}, | ||||
|  |  | |||
|  | @ -225,7 +225,7 @@ func (c *Cluster) syncVolumeClaims() error { | |||
| 		} | ||||
| 
 | ||||
| 		newAnnotations := c.annotationsSet(nil) | ||||
| 		if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations); changed { | ||||
| 		if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations, nil); changed { | ||||
| 			patchData, err := metaAnnotationsPatch(newAnnotations) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue