This commit is contained in:
laiminhtrung1997 2025-10-21 14:43:16 +03:00 committed by GitHub
commit fe0fbcee22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 268 additions and 32 deletions

View File

@ -584,6 +584,23 @@ spec:
- PreferNoSchedule
tolerationSeconds:
type: integer
topologySpreadConstraints:
type: array
nullable: true
items:
type: object
properties:
maxskew:
type: integer
format: int32
minimum: 1
topologyKey:
type: string
whenUnsatisfiable:
type: string
enum:
- DoNotSchedule
- ScheduleAnyway
useLoadBalancer:
type: boolean
description: deprecated

View File

@ -714,7 +714,7 @@ but Kubernetes will not spin up the pod if the requested HugePages cannot be all
For more information on HugePages in Kubernetes, see also
[https://kubernetes.io/docs/tasks/manage-hugepages/scheduling-hugepages/](https://kubernetes.io/docs/tasks/manage-hugepages/scheduling-hugepages/)
## Use taints, tolerations and node affinity for dedicated PostgreSQL nodes
## Use taints, tolerations, node affinity and topology spread constraint for dedicated PostgreSQL nodes
To ensure Postgres pods are running on nodes without any other application pods,
you can use [taints and tolerations](https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/)
@ -755,6 +755,23 @@ spec:
If you need to define a `nodeAffinity` for all your Postgres clusters use the
`node_readiness_label` [configuration](administrator.md#node-readiness-labels).
If you need PostgreSQL Pods to run on separate nodes, you can use the
[topologySpreadConstraints](https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/) to control how they are distributed across your cluster.
This ensures they are spread among failure domains such as
regions, zones, nodes, or other user-defined topology domains.
```yaml
apiVersion: "acid.zalan.do/v1"
kind: postgresql
metadata:
name: acid-minimal-cluster
spec:
topologySpreadConstraints:
- maxskew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: DoNotSchedule
```
## In-place major version upgrade
Starting with Spilo 13, operator supports in-place major version upgrade to a
@ -1053,7 +1070,7 @@ spec:
- all
volumeSource:
emptyDir: {}
sidecars:
sidecars:
- name: "container-name"
image: "company/image:tag"
volumeMounts:

View File

@ -559,7 +559,7 @@ class EndToEndTestCase(unittest.TestCase):
pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar"
del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove]
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch)
@ -576,7 +576,7 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar",
"The replication slot cannot be updated", 10, 5)
# make sure slot from Patroni didn't get deleted
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1,
"The replication slot from Patroni gets deleted", 10, 5)
@ -932,7 +932,7 @@ class EndToEndTestCase(unittest.TestCase):
},
}
}
old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
old_svc_creation_timestamp = svc.metadata.creation_timestamp
@ -1369,7 +1369,7 @@ class EndToEndTestCase(unittest.TestCase):
}
k8s.update_config(patch_scaled_policy_retain)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# decrease the number of instances
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', pg_patch_scale_down_instances)
@ -1646,7 +1646,6 @@ class EndToEndTestCase(unittest.TestCase):
# toggle pod anti affinity to move replica away from master node
self.assert_distributed_pods(master_nodes)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_overwrite_pooler_deployment(self):
pooler_name = 'acid-minimal-cluster-pooler'
@ -1799,7 +1798,7 @@ class EndToEndTestCase(unittest.TestCase):
},
}
k8s.api.core_v1.patch_namespaced_secret(
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
namespace="default",
body=secret_fake_rotation)
@ -1816,7 +1815,7 @@ class EndToEndTestCase(unittest.TestCase):
"enable_password_rotation": "true",
"inherited_annotations": "environment",
"password_rotation_interval": "30",
"password_rotation_user_retention": "30", # should be set to 60
"password_rotation_user_retention": "30", # should be set to 60
},
}
k8s.update_config(enable_password_rotation)
@ -1885,7 +1884,7 @@ class EndToEndTestCase(unittest.TestCase):
self.assertTrue("environment" in db_user_secret.metadata.annotations, "Added annotation was not propagated to secret")
# disable password rotation for all other users (foo_user)
# and pick smaller intervals to see if the third fake rotation user is dropped
# and pick smaller intervals to see if the third fake rotation user is dropped
enable_password_rotation = {
"data": {
"enable_password_rotation": "false",
@ -2385,6 +2384,78 @@ class EndToEndTestCase(unittest.TestCase):
# toggle pod anti affinity to move replica away from master node
self.assert_distributed_pods(master_nodes)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_topology_spread_constraints(self):
'''
Enable topologySpreadConstraints for pods
'''
k8s = self.k8s
cluster_labels = "application=spilo,cluster-name=acid-minimal-cluster"
# Verify we are in good state from potential previous tests
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
master_nodes, replica_nodes = k8s.get_cluster_nodes()
self.assertNotEqual(master_nodes, [])
self.assertNotEqual(replica_nodes, [])
# Patch label to nodes for topologySpreadConstraints
patch_node_label = {
"metadata": {
"labels": {
"topology.kubernetes.io/zone": "zalando"
}
}
}
k8s.api.core_v1.patch_node(master_nodes[0], patch_node_label)
k8s.api.core_v1.patch_node(replica_nodes[0], patch_node_label)
# Patch topologySpreadConstraint and scale-out postgresql pods to postgresqls manifest.
patch_topologySpreadConstraint_config = {
"spec": {
"numberOfInstances": 6,
"topologySpreadConstraint": [
{
"maxskew": 1,
"topologyKey": "topology.kubernetes.io/zone",
"whenUnsatisfiable": "DoNotSchedule"
}
]
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default",
"postgresqls", "acid-minimal-cluster",
patch_topologySpreadConstraint_config)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_labels), 6, "Postgresql StatefulSet are scale to 6")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 6, "All pods are running")
worker_node_1 = 0
worker_node_2 = 0
pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_labels)
for pod in pods.items:
if pod.spec.node_name == 'postgres-operator-e2e-tests-worker':
worker_node_1 += 1
elif pod.spec.node_name == 'postgres-operator-e2e-tests-worker2':
worker_node_2 += 1
self.assertEqual(worker_node_1, worker_node_2)
self.assertEqual(worker_node_1, 3)
self.assertEqual(worker_node_2, 3)
# Reset configurations
patch_topologySpreadConstraint_config = {
"spec": {
"numberOfInstances": 2,
"topologySpreadConstraint": []
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default",
"postgresqls", "acid-minimal-cluster",
patch_topologySpreadConstraint_config)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_zz_cluster_deletion(self):
'''
@ -2460,7 +2531,7 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted")
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 6, "PVCs were deleted although disabled in config")
except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
@ -2502,7 +2573,7 @@ class EndToEndTestCase(unittest.TestCase):
# if nodes are different we can quit here
if master_nodes[0] not in replica_nodes:
return True
return True
# enable pod anti affintiy in config map which should trigger movement of replica
patch_enable_antiaffinity = {
@ -2526,7 +2597,7 @@ class EndToEndTestCase(unittest.TestCase):
}
k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity")
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels)
k8s.wait_for_running_pods(cluster_labels, 2)
@ -2537,7 +2608,7 @@ class EndToEndTestCase(unittest.TestCase):
# if nodes are different we can quit here
for target_node in target_nodes:
if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes:
print('Pods run on the same node')
print('Pods run on the same node')
return False
except timeout_decorator.TimeoutError:

View File

@ -232,6 +232,12 @@ spec:
# values:
# - enabled
# Add topology spread constraint to distribute PostgreSQL pods across all nodes labeled with "topology.kubernetes.io/zone".
# topologySpreadConstraint:
# - maxSkew: 1
# topologyKey: topology.kubernetes.io/zone
# whenUnsatisfiable: DoNotSchedule
# Enables change data capture streams for defined database tables
# streams:
# - applicationId: test-app

View File

@ -582,6 +582,22 @@ spec:
- PreferNoSchedule
tolerationSeconds:
type: integer
topologySpreadConstraints:
type: array
nullable: true
items:
type: object
properties:
maxSkew:
type: integer
format: int32
topologyKey:
type: string
whenUnsatisfiable:
type: string
enum:
- DoNotSchedule
- ScheduleAnyway
useLoadBalancer:
type: boolean
description: deprecated

View File

@ -111,6 +111,7 @@ var OperatorConfigCRDResourceColumns = []apiextv1.CustomResourceColumnDefinition
var min0 = 0.0
var min1 = 1.0
var minLength1 int64 = 1
var minDisable = -1.0
// PostgresCRDResourceValidation to check applied manifest parameters
@ -895,6 +896,34 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
},
},
},
"topologySpreadConstraints": {
Type: "array",
Nullable: true,
Items: &apiextv1.JSONSchemaPropsOrArray{
Schema: &apiextv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextv1.JSONSchemaProps{
"maxSkew": {
Type: "integer",
Format: "int32",
Minimum: &min1,
},
"topologyKey": {
Type: "string",
MinLength: &minLength1,
},
"whenUnsatisfiable": {
Type: "string",
Enum: []apiextv1.JSON{
{Raw: []byte(`"DoNotSchedule"`)},
{Raw: []byte(`"ScheduleAnyway"`)},
},
},
},
Required: []string{"maxSkew", "topologyKey", "whenUnsatisfiable"},
},
},
},
"useLoadBalancer": {
Type: "boolean",
Description: "deprecated",

View File

@ -63,24 +63,25 @@ type PostgresSpec struct {
UsersWithSecretRotation []string `json:"usersWithSecretRotation,omitempty"`
UsersWithInPlaceSecretRotation []string `json:"usersWithInPlaceSecretRotation,omitempty"`
NumberOfInstances int32 `json:"numberOfInstances"`
MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"`
Clone *CloneDescription `json:"clone,omitempty"`
Databases map[string]string `json:"databases,omitempty"`
PreparedDatabases map[string]PreparedDatabase `json:"preparedDatabases,omitempty"`
SchedulerName *string `json:"schedulerName,omitempty"`
NodeAffinity *v1.NodeAffinity `json:"nodeAffinity,omitempty"`
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
Sidecars []Sidecar `json:"sidecars,omitempty"`
InitContainers []v1.Container `json:"initContainers,omitempty"`
PodPriorityClassName string `json:"podPriorityClassName,omitempty"`
ShmVolume *bool `json:"enableShmVolume,omitempty"`
EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"`
LogicalBackupRetention string `json:"logicalBackupRetention,omitempty"`
LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"`
StandbyCluster *StandbyDescription `json:"standby,omitempty"`
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"`
NumberOfInstances int32 `json:"numberOfInstances"`
MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"`
Clone *CloneDescription `json:"clone,omitempty"`
Databases map[string]string `json:"databases,omitempty"`
PreparedDatabases map[string]PreparedDatabase `json:"preparedDatabases,omitempty"`
SchedulerName *string `json:"schedulerName,omitempty"`
NodeAffinity *v1.NodeAffinity `json:"nodeAffinity,omitempty"`
TopologySpreadConstraints []v1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"`
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
Sidecars []Sidecar `json:"sidecars,omitempty"`
InitContainers []v1.Container `json:"initContainers,omitempty"`
PodPriorityClassName string `json:"podPriorityClassName,omitempty"`
ShmVolume *bool `json:"enableShmVolume,omitempty"`
EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"`
LogicalBackupRetention string `json:"logicalBackupRetention,omitempty"`
LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"`
StandbyCluster *StandbyDescription `json:"standby,omitempty"`
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"`
// MasterServiceAnnotations takes precedence over ServiceAnnotations for master role if not empty
MasterServiceAnnotations map[string]string `json:"masterServiceAnnotations,omitempty"`
// ReplicaServiceAnnotations takes precedence over ServiceAnnotations for replica role if not empty

View File

@ -789,6 +789,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) {
*out = new(corev1.NodeAffinity)
(*in).DeepCopyInto(*out)
}
if in.TopologySpreadConstraints != nil {
in, out := &in.TopologySpreadConstraints, &out.TopologySpreadConstraints
*out = make([]corev1.TopologySpreadConstraint, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Tolerations != nil {
in, out := &in.Tolerations, &out.Tolerations
*out = make([]corev1.Toleration, len(*in))

View File

@ -499,6 +499,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod affinity does not match the current one")
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) {
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one")
}
if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) {
needsReplace = true
needsRollUpdate = true

View File

@ -604,6 +604,13 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring
return podAntiAffinity
}
func generateTopologySpreadConstraints(labels labels.Set, topologySpreadConstraints []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint {
for _, topologySpreadConstraint := range topologySpreadConstraints {
topologySpreadConstraint.LabelSelector = &metav1.LabelSelector{MatchLabels: labels}
}
return topologySpreadConstraints
}
func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration {
// allow to override tolerations by postgresql manifest
if len(*tolerationsSpec) > 0 {
@ -809,6 +816,7 @@ func (c *Cluster) generatePodTemplate(
initContainers []v1.Container,
sidecarContainers []v1.Container,
sharePgSocketWithSidecars *bool,
topologySpreadConstraintsSpec []v1.TopologySpreadConstraint,
tolerationsSpec *[]v1.Toleration,
spiloRunAsUser *int64,
spiloRunAsGroup *int64,
@ -878,6 +886,8 @@ func (c *Cluster) generatePodTemplate(
podSpec.PriorityClassName = priorityClassName
}
podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, topologySpreadConstraintsSpec)
if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars {
addVarRunVolume(&podSpec)
}
@ -1469,6 +1479,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
initContainers,
sidecarContainers,
c.OpConfig.SharePgSocketWithSidecars,
spec.TopologySpreadConstraints,
&tolerationSpec,
effectiveRunAsUser,
effectiveRunAsGroup,
@ -2348,6 +2359,8 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
tolerationsSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration)
topologySpreadConstraintsSpec := generateTopologySpreadConstraints(labels, spec.TopologySpreadConstraints)
// re-use the method that generates DB pod templates
if podTemplate, err = c.generatePodTemplate(
c.Namespace,
@ -2357,6 +2370,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
[]v1.Container{},
[]v1.Container{},
util.False(),
topologySpreadConstraintsSpec,
&tolerationsSpec,
nil,
nil,

View File

@ -3984,3 +3984,56 @@ func TestGenerateCapabilities(t *testing.T) {
}
}
}
func TestTopologySpreadConstraints(t *testing.T) {
clusterName := "acid-test-cluster"
namespace := "default"
labelSelector := &metav1.LabelSelector{
MatchLabels: cluster.labelsSet(true),
}
pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
},
Spec: acidv1.PostgresSpec{
NumberOfInstances: 1,
Resources: &acidv1.Resources{
ResourceRequests: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
ResourceLimits: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
},
Volume: acidv1.Volume{
Size: "1G",
},
TopologySpreadConstraints: []v1.TopologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: labelSelector,
},
},
},
}
cluster := New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Name = clusterName
cluster.Namespace = namespace
cluster.labelsSet(true)
s, err := cluster.generateStatefulSet(&pg.Spec)
assert.NoError(t, err)
assert.Contains(t, s.Spec.Template.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{
MaxSkew: int32(1),
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: labelSelector,
},
)
}