Add topologySpreadConstraints configuration to pod spec.
This commit is contained in:
		
							parent
							
								
									fa4bc21538
								
							
						
					
					
						commit
						263242a5e1
					
				|  | @ -559,7 +559,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
| 
 | ||||
|             pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar" | ||||
|             del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove] | ||||
|              | ||||
| 
 | ||||
|             k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||
|                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch) | ||||
| 
 | ||||
|  | @ -576,7 +576,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
| 
 | ||||
|             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar", | ||||
|                 "The replication slot cannot be updated", 10, 5) | ||||
|              | ||||
| 
 | ||||
|             # make sure slot from Patroni didn't get deleted | ||||
|             self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1, | ||||
|                 "The replication slot from Patroni gets deleted", 10, 5) | ||||
|  | @ -932,7 +932,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|                     }, | ||||
|                 } | ||||
|             } | ||||
|              | ||||
| 
 | ||||
|             old_sts_creation_timestamp = sts.metadata.creation_timestamp | ||||
|             k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch) | ||||
|             old_svc_creation_timestamp = svc.metadata.creation_timestamp | ||||
|  | @ -1369,7 +1369,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|         } | ||||
|         k8s.update_config(patch_scaled_policy_retain) | ||||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
|   | ||||
| 
 | ||||
|         # decrease the number of instances | ||||
|         k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||
|             'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', pg_patch_scale_down_instances) | ||||
|  | @ -1646,7 +1646,6 @@ class EndToEndTestCase(unittest.TestCase): | |||
|         # toggle pod anti affinity to move replica away from master node | ||||
|         self.assert_distributed_pods(master_nodes) | ||||
| 
 | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_overwrite_pooler_deployment(self): | ||||
|         pooler_name = 'acid-minimal-cluster-pooler' | ||||
|  | @ -1799,7 +1798,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|             }, | ||||
|         } | ||||
|         k8s.api.core_v1.patch_namespaced_secret( | ||||
|             name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",  | ||||
|             name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do", | ||||
|             namespace="default", | ||||
|             body=secret_fake_rotation) | ||||
| 
 | ||||
|  | @ -1816,7 +1815,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|                 "enable_password_rotation": "true", | ||||
|                 "inherited_annotations": "environment", | ||||
|                 "password_rotation_interval": "30", | ||||
|                 "password_rotation_user_retention": "30",  # should be set to 60  | ||||
|                 "password_rotation_user_retention": "30",  # should be set to 60 | ||||
|             }, | ||||
|         } | ||||
|         k8s.update_config(enable_password_rotation) | ||||
|  | @ -1885,7 +1884,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|         self.assertTrue("environment" in db_user_secret.metadata.annotations, "Added annotation was not propagated to secret") | ||||
| 
 | ||||
|         # disable password rotation for all other users (foo_user) | ||||
|         # and pick smaller intervals to see if the third fake rotation user is dropped  | ||||
|         # and pick smaller intervals to see if the third fake rotation user is dropped | ||||
|         enable_password_rotation = { | ||||
|             "data": { | ||||
|                 "enable_password_rotation": "false", | ||||
|  | @ -2385,6 +2384,56 @@ class EndToEndTestCase(unittest.TestCase): | |||
|         # toggle pod anti affinity to move replica away from master node | ||||
|         self.assert_distributed_pods(master_nodes) | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_topology_spread_constraints(self): | ||||
|         ''' | ||||
|             Enable topologySpreadConstraints for pods | ||||
|         ''' | ||||
|         k8s = self.k8s | ||||
|         cluster_labels = "application=spilo,cluster-name=acid-minimal-cluster" | ||||
| 
 | ||||
|         # Verify we are in good state from potential previous tests | ||||
|         self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") | ||||
| 
 | ||||
|         master_nodes, replica_nodes = k8s.get_cluster_nodes() | ||||
|         self.assertNotEqual(master_nodes, []) | ||||
|         self.assertNotEqual(replica_nodes, []) | ||||
| 
 | ||||
|         # Patch label to nodes for topologySpreadConstraints | ||||
|         patch_node_label = { | ||||
|             "metadata": { | ||||
|                 "labels": { | ||||
|                     "topology.kubernetes.io/zone": "zalando" | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         k8s.api.core_v1.patch_node(master_nodes[0], patch_node_label) | ||||
|         k8s.api.core_v1.patch_node(replica_nodes[0], patch_node_label) | ||||
| 
 | ||||
|         # Scale-out postgresql pods | ||||
|         k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", | ||||
|             {"spec": {"numberOfInstances": 6}}) | ||||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
|         self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_labels), 6, "Postgresql StatefulSet are scale to 6") | ||||
|         self.eventuallyEqual(lambda: k8s.count_running_pods(), 6, "All pods are running") | ||||
| 
 | ||||
|         worker_node_1 = 0 | ||||
|         worker_node_2 = 0 | ||||
|         pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_labels) | ||||
|         for pod in pods.items: | ||||
|             if pod.spec.node_name == 'postgres-operator-e2e-tests-worker': | ||||
|                 worker_node_1 += 1 | ||||
|             elif pod.spec.node_name == 'postgres-operator-e2e-tests-worker2': | ||||
|                 worker_node_2 += 1 | ||||
| 
 | ||||
|         self.assertEqual(worker_node_1, worker_node_2) | ||||
|         self.assertEqual(worker_node_1, 3) | ||||
|         self.assertEqual(worker_node_2, 3) | ||||
| 
 | ||||
|         # Scale-it postgresql pods to previous replicas | ||||
|         k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", | ||||
|             {"spec": {"numberOfInstances": 2}}) | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_zz_cluster_deletion(self): | ||||
|         ''' | ||||
|  | @ -2460,7 +2509,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|             self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") | ||||
|             self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") | ||||
|             self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config") | ||||
|             self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config") | ||||
|             self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 6, "PVCs were deleted although disabled in config") | ||||
| 
 | ||||
|         except timeout_decorator.TimeoutError: | ||||
|             print('Operator log: {}'.format(k8s.get_operator_log())) | ||||
|  | @ -2502,7 +2551,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
| 
 | ||||
|         # if nodes are different we can quit here | ||||
|         if master_nodes[0] not in replica_nodes: | ||||
|             return True              | ||||
|             return True | ||||
| 
 | ||||
|         # enable pod anti affintiy in config map which should trigger movement of replica | ||||
|         patch_enable_antiaffinity = { | ||||
|  | @ -2526,7 +2575,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|             } | ||||
|             k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity") | ||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
|              | ||||
| 
 | ||||
|             k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels) | ||||
|             k8s.wait_for_running_pods(cluster_labels, 2) | ||||
| 
 | ||||
|  | @ -2537,7 +2586,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|             # if nodes are different we can quit here | ||||
|             for target_node in target_nodes: | ||||
|                 if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes: | ||||
|                     print('Pods run on the same node')  | ||||
|                     print('Pods run on the same node') | ||||
|                     return False | ||||
| 
 | ||||
|         except timeout_decorator.TimeoutError: | ||||
|  |  | |||
|  | @ -582,6 +582,12 @@ spec: | |||
|                         - PreferNoSchedule | ||||
|                     tolerationSeconds: | ||||
|                       type: integer | ||||
|               topologySpreadConstraints: | ||||
|                 type: array | ||||
|                 nullable: true | ||||
|                 items: | ||||
|                   type: object | ||||
|                   x-kubernetes-preserve-unknown-fields: true | ||||
|               useLoadBalancer: | ||||
|                 type: boolean | ||||
|                 description: deprecated | ||||
|  |  | |||
|  | @ -895,6 +895,16 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ | |||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					"topologySpreadConstraints": { | ||||
| 						Type:     "array", | ||||
| 						Nullable: true, | ||||
| 						Items: &apiextv1.JSONSchemaPropsOrArray{ | ||||
| 							Schema: &apiextv1.JSONSchemaProps{ | ||||
| 								Type:                   "object", | ||||
| 								XPreserveUnknownFields: util.True(), | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					"useLoadBalancer": { | ||||
| 						Type:        "boolean", | ||||
| 						Description: "deprecated", | ||||
|  |  | |||
|  | @ -63,24 +63,25 @@ type PostgresSpec struct { | |||
| 	UsersWithSecretRotation        []string             `json:"usersWithSecretRotation,omitempty"` | ||||
| 	UsersWithInPlaceSecretRotation []string             `json:"usersWithInPlaceSecretRotation,omitempty"` | ||||
| 
 | ||||
| 	NumberOfInstances      int32                       `json:"numberOfInstances"` | ||||
| 	MaintenanceWindows     []MaintenanceWindow         `json:"maintenanceWindows,omitempty"` | ||||
| 	Clone                  *CloneDescription           `json:"clone,omitempty"` | ||||
| 	Databases              map[string]string           `json:"databases,omitempty"` | ||||
| 	PreparedDatabases      map[string]PreparedDatabase `json:"preparedDatabases,omitempty"` | ||||
| 	SchedulerName          *string                     `json:"schedulerName,omitempty"` | ||||
| 	NodeAffinity           *v1.NodeAffinity            `json:"nodeAffinity,omitempty"` | ||||
| 	Tolerations            []v1.Toleration             `json:"tolerations,omitempty"` | ||||
| 	Sidecars               []Sidecar                   `json:"sidecars,omitempty"` | ||||
| 	InitContainers         []v1.Container              `json:"initContainers,omitempty"` | ||||
| 	PodPriorityClassName   string                      `json:"podPriorityClassName,omitempty"` | ||||
| 	ShmVolume              *bool                       `json:"enableShmVolume,omitempty"` | ||||
| 	EnableLogicalBackup    bool                        `json:"enableLogicalBackup,omitempty"` | ||||
| 	LogicalBackupRetention string                      `json:"logicalBackupRetention,omitempty"` | ||||
| 	LogicalBackupSchedule  string                      `json:"logicalBackupSchedule,omitempty"` | ||||
| 	StandbyCluster         *StandbyDescription         `json:"standby,omitempty"` | ||||
| 	PodAnnotations         map[string]string           `json:"podAnnotations,omitempty"` | ||||
| 	ServiceAnnotations     map[string]string           `json:"serviceAnnotations,omitempty"` | ||||
| 	NumberOfInstances         int32                         `json:"numberOfInstances"` | ||||
| 	MaintenanceWindows        []MaintenanceWindow           `json:"maintenanceWindows,omitempty"` | ||||
| 	Clone                     *CloneDescription             `json:"clone,omitempty"` | ||||
| 	Databases                 map[string]string             `json:"databases,omitempty"` | ||||
| 	PreparedDatabases         map[string]PreparedDatabase   `json:"preparedDatabases,omitempty"` | ||||
| 	SchedulerName             *string                       `json:"schedulerName,omitempty"` | ||||
| 	NodeAffinity              *v1.NodeAffinity              `json:"nodeAffinity,omitempty"` | ||||
| 	TopologySpreadConstraints []v1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"` | ||||
| 	Tolerations               []v1.Toleration               `json:"tolerations,omitempty"` | ||||
| 	Sidecars                  []Sidecar                     `json:"sidecars,omitempty"` | ||||
| 	InitContainers            []v1.Container                `json:"initContainers,omitempty"` | ||||
| 	PodPriorityClassName      string                        `json:"podPriorityClassName,omitempty"` | ||||
| 	ShmVolume                 *bool                         `json:"enableShmVolume,omitempty"` | ||||
| 	EnableLogicalBackup       bool                          `json:"enableLogicalBackup,omitempty"` | ||||
| 	LogicalBackupRetention    string                        `json:"logicalBackupRetention,omitempty"` | ||||
| 	LogicalBackupSchedule     string                        `json:"logicalBackupSchedule,omitempty"` | ||||
| 	StandbyCluster            *StandbyDescription           `json:"standby,omitempty"` | ||||
| 	PodAnnotations            map[string]string             `json:"podAnnotations,omitempty"` | ||||
| 	ServiceAnnotations        map[string]string             `json:"serviceAnnotations,omitempty"` | ||||
| 	// MasterServiceAnnotations takes precedence over ServiceAnnotations for master role if not empty
 | ||||
| 	MasterServiceAnnotations map[string]string `json:"masterServiceAnnotations,omitempty"` | ||||
| 	// ReplicaServiceAnnotations takes precedence over ServiceAnnotations for replica role if not empty
 | ||||
|  |  | |||
|  | @ -499,6 +499,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa | |||
| 		needsRollUpdate = true | ||||
| 		reasons = append(reasons, "new statefulset's pod affinity does not match the current one") | ||||
| 	} | ||||
| 	if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) { | ||||
| 		needsReplace = true | ||||
| 		needsRollUpdate = true | ||||
| 		reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one") | ||||
| 	} | ||||
| 	if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) { | ||||
| 		needsReplace = true | ||||
| 		needsRollUpdate = true | ||||
|  |  | |||
|  | @ -604,6 +604,13 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring | |||
| 	return podAntiAffinity | ||||
| } | ||||
| 
 | ||||
| func generateTopologySpreadConstraints(labels labels.Set, topologySpreadConstraints []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint { | ||||
| 	for _, topologySpreadConstraint := range topologySpreadConstraints { | ||||
| 		topologySpreadConstraint.LabelSelector = &metav1.LabelSelector{MatchLabels: labels} | ||||
| 	} | ||||
| 	return topologySpreadConstraints | ||||
| } | ||||
| 
 | ||||
| func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration { | ||||
| 	// allow to override tolerations by postgresql manifest
 | ||||
| 	if len(*tolerationsSpec) > 0 { | ||||
|  | @ -809,6 +816,7 @@ func (c *Cluster) generatePodTemplate( | |||
| 	initContainers []v1.Container, | ||||
| 	sidecarContainers []v1.Container, | ||||
| 	sharePgSocketWithSidecars *bool, | ||||
| 	topologySpreadConstraintsSpec []v1.TopologySpreadConstraint, | ||||
| 	tolerationsSpec *[]v1.Toleration, | ||||
| 	spiloRunAsUser *int64, | ||||
| 	spiloRunAsGroup *int64, | ||||
|  | @ -878,6 +886,10 @@ func (c *Cluster) generatePodTemplate( | |||
| 		podSpec.PriorityClassName = priorityClassName | ||||
| 	} | ||||
| 
 | ||||
| 	if len(topologySpreadConstraintsSpec) > 0 { | ||||
| 		podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, topologySpreadConstraintsSpec) | ||||
| 	} | ||||
| 
 | ||||
| 	if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars { | ||||
| 		addVarRunVolume(&podSpec) | ||||
| 	} | ||||
|  | @ -1469,6 +1481,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef | |||
| 		initContainers, | ||||
| 		sidecarContainers, | ||||
| 		c.OpConfig.SharePgSocketWithSidecars, | ||||
| 		spec.TopologySpreadConstraints, | ||||
| 		&tolerationSpec, | ||||
| 		effectiveRunAsUser, | ||||
| 		effectiveRunAsGroup, | ||||
|  | @ -2356,6 +2369,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) { | |||
| 		[]v1.Container{}, | ||||
| 		[]v1.Container{}, | ||||
| 		util.False(), | ||||
| 		[]v1.TopologySpreadConstraint{}, | ||||
| 		&tolerationsSpec, | ||||
| 		nil, | ||||
| 		nil, | ||||
|  |  | |||
|  | @ -3984,3 +3984,46 @@ func TestGenerateCapabilities(t *testing.T) { | |||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestTopologySpreadConstraints(t *testing.T) { | ||||
| 	clusterName := "acid-test-cluster" | ||||
| 	namespace := "default" | ||||
| 
 | ||||
| 	pg := acidv1.Postgresql{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      clusterName, | ||||
| 			Namespace: namespace, | ||||
| 		}, | ||||
| 		Spec: acidv1.PostgresSpec{ | ||||
| 			NumberOfInstances: 1, | ||||
| 			Resources: &acidv1.Resources{ | ||||
| 				ResourceRequests: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")}, | ||||
| 				ResourceLimits:   acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")}, | ||||
| 			}, | ||||
| 			Volume: acidv1.Volume{ | ||||
| 				Size: "1G", | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	cluster := New( | ||||
| 		Config{ | ||||
| 			OpConfig: config.Config{ | ||||
| 				PodManagementPolicy: "ordered_ready", | ||||
| 			}, | ||||
| 		}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) | ||||
| 	cluster.Name = clusterName | ||||
| 	cluster.Namespace = namespace | ||||
| 	cluster.labelsSet(true) | ||||
| 
 | ||||
| 	s, err := cluster.generateStatefulSet(&pg.Spec) | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.Contains(t, s.Spec.Template.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{ | ||||
| 		MaxSkew:           int32(1), | ||||
| 		TopologyKey:       "topology.kubernetes.io/zone", | ||||
| 		WhenUnsatisfiable: v1.DoNotSchedule, | ||||
| 		LabelSelector: &metav1.LabelSelector{ | ||||
| 			MatchLabels: cluster.labelsSet(true), | ||||
| 		}, | ||||
| 	}) | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue