diff --git a/e2e/scripts/watch_objects.sh b/e2e/scripts/watch_objects.sh index eeb5f4a1f..5005f88e2 100755 --- a/e2e/scripts/watch_objects.sh +++ b/e2e/scripts/watch_objects.sh @@ -4,7 +4,7 @@ watch -c " kubectl get postgresql --all-namespaces echo echo -n 'Rolling upgrade pending: ' -kubectl get pods -o jsonpath='{.items..metadata.annotations.zalando-postgres-operator-rolling-update-required}' +kubectl get pods -o jsonpath='{.items[].metadata.annotations.zalando-postgres-operator-rolling-update-required}' echo echo echo 'Pods' diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 21e2a16e9..0cb40535c 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -211,7 +211,7 @@ class K8s: self.wait_for_logical_backup_job(expected_num_of_jobs=1) def delete_operator_pod(self, step="Delete operator pod"): - # patching the pod template in the deployment restarts the operator pod + # patching the pod template in the deployment restarts the operator pod self.api.apps_v1.patch_namespaced_deployment("postgres-operator", "default", {"spec": {"template": {"metadata": {"annotations": {"step": "{}-{}".format(step, time.time())}}}}}) self.wait_for_operator_pod_start() @@ -219,8 +219,8 @@ class K8s: self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) self.delete_operator_pod(step=step) - def patch_statefulset(self, data, name="acid-minimal-cluster", namespace="default"): - self.api.apps_v1.patch_namespaced_stateful_set(name, namespace, data) + def patch_pod(self, data, pod_name, namespace="default"): + self.api.core_v1.patch_namespaced_pod(pod_name, namespace, data) def create_with_kubectl(self, path): return subprocess.run( diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index dd99fd135..d6d916f0d 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -709,7 +709,6 @@ class EndToEndTestCase(unittest.TestCase): k8s.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) - k8s.patch_statefulset({"metadata": {"annotations": {"zalando-postgres-operator-rolling-update-required": "False"}}}) k8s.update_config(patch_min_resource_limits, "Minimum resource test") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") @@ -730,8 +729,6 @@ class EndToEndTestCase(unittest.TestCase): @classmethod def setUp(cls): - # cls.k8s.update_config({}, step="Setup") - cls.k8s.patch_statefulset({"meta": {"annotations": {"zalando-postgres-operator-rolling-update-required": False}}}) pass @timeout_decorator.timeout(TEST_TIMEOUT_SEC) @@ -762,6 +759,44 @@ class EndToEndTestCase(unittest.TestCase): "acid.zalan.do", "v1", self.test_namespace, "postgresqls", "acid-test-cluster") time.sleep(5) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_rolling_update_flag(self): + ''' + Add rolling update flag to only the master and see it failing over + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + # verify we are in good state from potential previous tests + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + + # rolling update annotation + flag = { + "metadata": { + "annotations": { + "zalando-postgres-operator-rolling-update-required": "true", + } + } + } + + podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) + for pod in podsList.items: + # add flag only to the master to make it appear to the operator as a leftover from a rolling update + if pod.metadata.labels.get('spilo-role') == 'master': + k8s.patch_pod(flag, pod.metadata.name, pod.metadata.namespace) + # operator will perform a switchover to an existing replica before recreating the master pod + else: + switchover_target = pod.metadata.name + + # do not wait until the next sync + k8s.delete_operator_pod() + + # wait for the both pods to be up and running + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + + # check if the former replica is now the new master + leader = k8s.get_cluster_leader_pod('acid-minimal-cluster') + self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover") @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_zz_node_readiness_label(self): diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 98ddfd544..377e5deab 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -373,15 +373,14 @@ func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool { XXX operator cannot forbid replica re-init, so we might still fail if re-init is started after this check succeeds but before a pod is re-created */ - for i, pod := range pods { + for _, pod := range pods { c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP) var state string err := retryutil.Retry(1*time.Second, 5*time.Second, func() (bool, error) { var err error - state, err = c.patroni.GetPatroniMemberState(&pods[i]) - + state, err = c.patroni.GetPatroniMemberState(&pod) if err != nil { return false, err } @@ -400,7 +399,7 @@ func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool { return true } -func (c *Cluster) recreatePods(pods []v1.Pod) error { +func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error { c.setProcessName("starting to recreate pods") c.logger.Infof("there are %d pods in the cluster to recreate", len(pods)) @@ -411,7 +410,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod) error { var ( masterPod, newMasterPod *v1.Pod ) - replicas := make([]spec.NamespacedName, 0) + replicas := switchoverCandidates for i, pod := range pods { role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index acc6e7835..14a4da662 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -277,7 +277,9 @@ func (c *Cluster) mustUpdatePodsAfterLazyUpdate(desiredSset *appsv1.StatefulSet) } func (c *Cluster) syncStatefulSet() error { - var podsToRecreate []v1.Pod + + podsToRecreate := make([]v1.Pod, 0) + switchoverCandidates := make([]spec.NamespacedName, 0) pods, err := c.listPods() if err != nil { @@ -318,6 +320,8 @@ func (c *Cluster) syncStatefulSet() error { for _, pod := range pods { if c.getRollingUpdateFlagFromPod(&pod) { podsToRecreate = append(podsToRecreate, pod) + } else { + switchoverCandidates = append(switchoverCandidates, util.NameFromMeta(pod.ObjectMeta)) } } @@ -376,6 +380,8 @@ func (c *Cluster) syncStatefulSet() error { c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err) } podsToRecreate = append(podsToRecreate, pod) + } else { + switchoverCandidates = append(switchoverCandidates, util.NameFromMeta(pod.ObjectMeta)) } } } @@ -393,7 +399,7 @@ func (c *Cluster) syncStatefulSet() error { if len(podsToRecreate) > 0 { c.logger.Debugln("performing rolling update") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") - if err := c.recreatePods(podsToRecreate); err != nil { + if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")