change merging nodeAffinity expression

This commit is contained in:
Felix Kunde 2022-01-05 16:53:23 +01:00
parent ced0eae14a
commit 541a484264
4 changed files with 61 additions and 16 deletions

View File

@ -343,8 +343,13 @@ configuration they are grouped under the `kubernetes` key.
considered `ready`. The operator uses values of those labels to detect the
start of the Kubernetes cluster upgrade procedure and move master pods off
the nodes to be decommissioned. When the set is not empty, the operator also
assigns the `Affinity` clause to the Postgres pods to be scheduled only on
`ready` nodes. The default is empty.
assigns the `nodeAffinity` clause to the Postgres pods to be scheduled only
on `ready` nodes. If a `nodeAffinity` is specified in the postgres cluster
manifest as well the `nodeSelectorTerms` will get merged. If the
`nodeAffinity` of the manifest contains only one `matchExpressions` slice
the node readiniess label expressions will be moved there (AND condition).
When multiple selector expressions are defined in the manifest an extra
`matchExpressions` section is appended (OR condition). The default is empty.
* **toleration**
a dictionary that should contain `key`, `operator`, `value` and

View File

@ -703,6 +703,10 @@ spec:
- pci
```
If you need to define a `nodeAffinity` for all your Postgres clusters use the
`node_readiness_label` configuration option, which allows you to define a list
of key-value pairs.
## In-place major version upgrade
Starting with Spilo 13, operator supports in-place major version upgrade to a

View File

@ -880,11 +880,9 @@ class EndToEndTestCase(unittest.TestCase):
# verify we are in good state from potential previous tests
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running")
# get nodes of master and replica(s)
master_nodes, replica_nodes = k8s.get_cluster_nodes()
self.assertNotEqual(master_nodes, [])
self.assertNotEqual(replica_nodes, [])
@ -975,6 +973,9 @@ class EndToEndTestCase(unittest.TestCase):
print('Operator log: {}'.format(k8s.get_operator_log()))
raise
# toggle pod anti affinity to make sure replica and master run on separate nodes
self.assert_distributed_pods(replica_nodes)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_node_readiness_label(self):
'''
@ -987,7 +988,6 @@ class EndToEndTestCase(unittest.TestCase):
# verify we are in good state from potential previous tests
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running")
# get nodes of master and replica(s) (expected target of new master)
master_nodes, replica_nodes = k8s.get_cluster_nodes()
@ -1009,7 +1009,7 @@ class EndToEndTestCase(unittest.TestCase):
for failover_target in failover_targets:
k8s.api.core_v1.patch_node(failover_target, patch_readiness_label)
# define node_readiness_label in config map which should trigger a failover of the master
# define node_readiness_label in config map which should trigger a rolling update
patch_readiness_label_config = {
"data": {
"node_readiness_label": readiness_label + ':' + readiness_value,
@ -1018,17 +1018,19 @@ class EndToEndTestCase(unittest.TestCase):
k8s.update_config(patch_readiness_label_config, "setting readiness label")
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# node affinity change should cause replica to relocate from replica node to master node due to node affinity requirement
# first replica will be replaced and get the new affinity
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
# next switchover of the master
k8s.wait_for_pod_failover(failover_targets, 'spilo-role=master,' + cluster_label)
# the replica however will not start due to a volume node affinity conflict
# the old master is replaced. However it might not start due to a volume node affinity conflict
# only if the pvc and pod are deleted it can be scheduled
replica = k8s.get_cluster_replica_pod()
if replica.status.phase == 'Pending':
k8s.api.core_v1.delete_namespaced_persistent_volume_claim('pgdata-' + replica.metadata.name, 'default')
k8s.api.core_v1.delete_namespaced_pod(replica.metadata.name, 'default')
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
# patch also node where master ran before
k8s.api.core_v1.patch_node(master_nodes[0], patch_readiness_label)
@ -1038,7 +1040,7 @@ class EndToEndTestCase(unittest.TestCase):
raise
# toggle pod anti affinity to move replica away from master node
self.eventuallyTrue(lambda: self.assert_distributed_pods(master_nodes), "Pods are redistributed")
self.assert_distributed_pods(master_nodes)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
@ -1481,7 +1483,7 @@ class EndToEndTestCase(unittest.TestCase):
raise
# toggle pod anti affinity to move replica away from master node
self.assert_distributed_pods(replica_nodes)
self.assert_distributed_pods(master_nodes)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_zz_cluster_deletion(self):
@ -1602,6 +1604,16 @@ class EndToEndTestCase(unittest.TestCase):
Toggle pod anti affinty to distribute pods accross nodes (replica in particular).
'''
k8s = self.k8s
cluster_labels = 'application=spilo,cluster-name=acid-minimal-cluster'
# get nodes of master and replica(s)
master_nodes, replica_nodes = k8s.get_cluster_nodes()
self.assertNotEqual(master_nodes, [])
self.assertNotEqual(replica_nodes, [])
# if nodes are different we can quit here
if master_nodes[0] not in replica_nodes:
return True
# enable pod anti affintiy in config map which should trigger movement of replica
patch_enable_antiaffinity = {
@ -1614,8 +1626,8 @@ class EndToEndTestCase(unittest.TestCase):
k8s.update_config(patch_enable_antiaffinity, "enable antiaffinity")
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_failover(target_nodes, 'spilo-role=replica,' + cluster_labels)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels)
k8s.wait_for_running_pods(cluster_labels, 2)
# now disable pod anti affintiy again which will cause yet another failover
patch_disable_antiaffinity = {
@ -1626,8 +1638,18 @@ class EndToEndTestCase(unittest.TestCase):
k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity")
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_start('spilo-role=master,' + cluster_labels)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels)
k8s.wait_for_running_pods(cluster_labels, 2)
master_nodes, replica_nodes = k8s.get_cluster_nodes()
self.assertNotEqual(master_nodes, [])
self.assertNotEqual(replica_nodes, [])
# if nodes are different we can quit here
for target_node in target_nodes:
if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes:
print('Pods run on the same node')
return False
except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))

View File

@ -352,8 +352,22 @@ func nodeAffinity(nodeReadinessLabel map[string]string, nodeAffinity *v1.NodeAff
},
}
} else {
nodeAffinityCopy.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{
NodeSelectorTerms: append(nodeAffinityCopy.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, nodeReadinessSelectorTerm),
// if there are multiple node selector terms specified, append the node readiness label expressions (OR condition)
if len(nodeAffinityCopy.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 1 {
manifestTerms := nodeAffinityCopy.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
manifestTerms = append(manifestTerms, nodeReadinessSelectorTerm)
nodeAffinityCopy.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{
NodeSelectorTerms: manifestTerms,
}
// if there's just one term defined merge it with the readiness label term (AND condition)
} else {
manifestExpressions := nodeAffinityCopy.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions
manifestExpressions = append(manifestExpressions, matchExpressions...)
nodeAffinityCopy.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
v1.NodeSelectorTerm{MatchExpressions: manifestExpressions},
},
}
}
}
}