diff --git a/e2e/scripts/watch_objects.sh b/e2e/scripts/watch_objects.sh index 4c9b82404..5005f88e2 100755 --- a/e2e/scripts/watch_objects.sh +++ b/e2e/scripts/watch_objects.sh @@ -4,7 +4,7 @@ watch -c " kubectl get postgresql --all-namespaces echo echo -n 'Rolling upgrade pending: ' -kubectl get statefulset -o jsonpath='{.items..metadata.annotations.zalando-postgres-operator-rolling-update-required}' +kubectl get pods -o jsonpath='{.items[].metadata.annotations.zalando-postgres-operator-rolling-update-required}' echo echo echo 'Pods' diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 21e2a16e9..85bcb6245 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -211,7 +211,7 @@ class K8s: self.wait_for_logical_backup_job(expected_num_of_jobs=1) def delete_operator_pod(self, step="Delete operator pod"): - # patching the pod template in the deployment restarts the operator pod + # patching the pod template in the deployment restarts the operator pod self.api.apps_v1.patch_namespaced_deployment("postgres-operator", "default", {"spec": {"template": {"metadata": {"annotations": {"step": "{}-{}".format(step, time.time())}}}}}) self.wait_for_operator_pod_start() @@ -219,8 +219,8 @@ class K8s: self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) self.delete_operator_pod(step=step) - def patch_statefulset(self, data, name="acid-minimal-cluster", namespace="default"): - self.api.apps_v1.patch_namespaced_stateful_set(name, namespace, data) + def patch_pod(self, data, pod_name, namespace="default"): + self.api.core_v1.patch_namespaced_pod(pod_name, namespace, data) def create_with_kubectl(self, path): return subprocess.run( @@ -280,19 +280,21 @@ class K8s: return None return pod.items[0].spec.containers[0].image - def get_cluster_leader_pod(self, pg_cluster_name, namespace='default'): - labels = { - 'application': 'spilo', - 'cluster-name': pg_cluster_name, - 'spilo-role': 'master', - } + def get_cluster_pod(self, role, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'): + labels = labels + ',spilo-role=' + role pods = self.api.core_v1.list_namespaced_pod( - namespace, label_selector=to_selector(labels)).items + namespace, label_selector=labels).items if pods: return pods[0] + def get_cluster_leader_pod(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'): + return self.get_cluster_pod('master', labels, namespace) + + def get_cluster_replica_pod(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'): + return self.get_cluster_pod('replica', labels, namespace) + class K8sBase: ''' diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 1bf723f12..114f881c4 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -168,12 +168,25 @@ class EndToEndTestCase(unittest.TestCase): "additional_pod_capabilities": ','.join(capabilities), }, } - self.k8s.update_config(patch_capabilities) - self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"}, - "Operator does not get in sync") - - self.eventuallyEqual(lambda: self.k8s.count_pods_with_container_capabilities(capabilities, cluster_label), - 2, "Container capabilities not updated") + + # get node and replica (expected target of new master) + _, replica_nodes = self.k8s.get_pg_nodes(cluster_label) + + try: + self.k8s.update_config(patch_capabilities) + self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + + # changed security context of postrges container should trigger a rolling update + self.k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) + self.k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + self.eventuallyEqual(lambda: self.k8s.count_pods_with_container_capabilities(capabilities, cluster_label), + 2, "Container capabilities not updated") + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_additional_teams_and_members(self): @@ -212,7 +225,7 @@ class EndToEndTestCase(unittest.TestCase): # make sure we let one sync pass and the new user being added time.sleep(15) - leader = self.k8s.get_cluster_leader_pod('acid-minimal-cluster') + leader = self.k8s.get_cluster_leader_pod() user_query = """ SELECT usename FROM pg_catalog.pg_user @@ -392,7 +405,7 @@ class EndToEndTestCase(unittest.TestCase): # credentials. db_list = [] - leader = k8s.get_cluster_leader_pod('acid-minimal-cluster') + leader = k8s.get_cluster_leader_pod() schemas_query = """ select schema_name from information_schema.schemata @@ -611,7 +624,7 @@ class EndToEndTestCase(unittest.TestCase): k8s.update_config(unpatch_lazy_spilo_upgrade, step="patch lazy upgrade") # at this point operator will complete the normal rolling upgrade - # so we additonally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works + # so we additionally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), conf_image, "Rolling upgrade was not executed", 50, 3) @@ -750,12 +763,6 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyTrue(verify_pod_limits, "Pod limits where not adjusted") - @classmethod - def setUp(cls): - # cls.k8s.update_config({}, step="Setup") - cls.k8s.patch_statefulset({"meta": {"annotations": {"zalando-postgres-operator-rolling-update-required": False}}}) - pass - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_multi_namespace_support(self): ''' @@ -784,6 +791,139 @@ class EndToEndTestCase(unittest.TestCase): "acid.zalan.do", "v1", self.test_namespace, "postgresqls", "acid-test-cluster") time.sleep(5) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_rolling_update_flag(self): + ''' + Add rolling update flag to only the master and see it failing over + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + # verify we are in good state from potential previous tests + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + + # get node and replica (expected target of new master) + _, replica_nodes = k8s.get_pg_nodes(cluster_label) + + # rolling update annotation + flag = { + "metadata": { + "annotations": { + "zalando-postgres-operator-rolling-update-required": "true", + } + } + } + + try: + podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) + for pod in podsList.items: + # add flag only to the master to make it appear to the operator as a leftover from a rolling update + if pod.metadata.labels.get('spilo-role') == 'master': + old_creation_timestamp = pod.metadata.creation_timestamp + k8s.patch_pod(flag, pod.metadata.name, pod.metadata.namespace) + else: + # remember replica name to check if operator does a switchover + switchover_target = pod.metadata.name + + # do not wait until the next sync + k8s.delete_operator_pod() + + # operator should now recreate the master pod and do a switchover before + k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) + + # check if the former replica is now the new master + leader = k8s.get_cluster_leader_pod() + self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover") + + # check that the old master has been recreated + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + replica = k8s.get_cluster_replica_pod() + self.assertTrue(replica.metadata.creation_timestamp > old_creation_timestamp, "Old master pod was not recreated") + + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_rolling_update_label_timeout(self): + ''' + Simulate case when replica does not receive label in time and rolling update does not finish + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + flag = "zalando-postgres-operator-rolling-update-required" + + # verify we are in good state from potential previous tests + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + + # get node and replica (expected target of new master) + _, replica_nodes = k8s.get_pg_nodes(cluster_label) + + # rolling update annotation + rolling_update_patch = { + "metadata": { + "annotations": { + flag: "true", + } + } + } + + # make pod_label_wait_timeout so short that rolling update fails on first try + # temporarily lower resync interval to reduce waiting for further tests + # pods should get healthy in the meantime + patch_resync_config = { + "data": { + "pod_label_wait_timeout": "2s", + "resync_period": "20s", + } + } + + try: + # patch both pods for rolling update + podList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) + for pod in podList.items: + k8s.patch_pod(rolling_update_patch, pod.metadata.name, pod.metadata.namespace) + if pod.metadata.labels.get('spilo-role') == 'replica': + switchover_target = pod.metadata.name + + # update config and restart operator + k8s.update_config(patch_resync_config, "update resync interval and pod_label_wait_timeout") + + # operator should now recreate the replica pod first and do a switchover after + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + # pod_label_wait_timeout should have been exceeded hence the rolling update is continued on next sync + # check if the cluster state is "SyncFailed" + self.eventuallyEqual(lambda: k8s.pg_get_status(), "SyncFailed", "Expected SYNC event to fail") + + # wait for next sync, replica should be running normally by now and be ready for switchover + k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) + + # check if the former replica is now the new master + leader = k8s.get_cluster_leader_pod() + self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover") + + # wait for the old master to get restarted + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + # status should again be "SyncFailed" but turn into "Running" on the next sync + time.sleep(10) + self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Expected running cluster after two syncs") + + # revert config changes + patch_resync_config = { + "data": { + "pod_label_wait_timeout": "10m", + "resync_period": "30m", + } + } + k8s.update_config(patch_resync_config, "revert resync interval and pod_label_wait_timeout") + + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_zz_node_readiness_label(self): diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c3e3ec905..986f747f9 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1309,7 +1309,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e err = fmt.Errorf("could not get master pod label: %v", err) } } else { - err = fmt.Errorf("could not switch over: %v", err) + err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err) } // signal the role label waiting goroutine to close the shop and go home diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index e3ee322d5..ae689d0f9 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -6,7 +6,6 @@ import ( "fmt" "path" "sort" - "strconv" "strings" "github.com/sirupsen/logrus" @@ -1279,7 +1278,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef } stsAnnotations := make(map[string]string) - stsAnnotations[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(false) stsAnnotations = c.AnnotationsToPropagate(c.annotationsSet(nil)) statefulSet := &appsv1.StatefulSet{ diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 94e79d186..c8dfe2a13 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "math/rand" + "strconv" "time" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" @@ -46,6 +48,64 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) { return pods.Items, nil } +// markRollingUpdateFlagForPod sets the indicator for the rolling update requirement +// in the Pod annotation. +func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error { + // no need to patch pod if annotation is already there + if c.getRollingUpdateFlagFromPod(pod) { + return nil + } + + c.logger.Debugf("mark rolling update annotation for %s: reason %s", pod.Name, msg) + flag := make(map[string]string) + flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true) + + patchData, err := metaAnnotationsPatch(flag) + if err != nil { + return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err) + } + + err = retryutil.Retry(1*time.Second, 5*time.Second, + func() (bool, error) { + _, err2 := c.KubeClient.Pods(pod.Namespace).Patch( + context.TODO(), + pod.Name, + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "") + if err2 != nil { + return false, err2 + } + return true, nil + }) + if err != nil { + return fmt.Errorf("could not patch pod rolling update flag %q: %v", patchData, err) + } + + return nil +} + +// getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the given pod +func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) { + anno := pod.GetAnnotations() + flag = false + + stringFlag, exists := anno[rollingUpdatePodAnnotationKey] + if exists { + var err error + c.logger.Debugf("found rolling update flag on pod %q", pod.Name) + if flag, err = strconv.ParseBool(stringFlag); err != nil { + c.logger.Warnf("error when parsing %q annotation for the pod %q: expected boolean value, got %q\n", + rollingUpdatePodAnnotationKey, + types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, + stringFlag) + } + } + + return flag +} + func (c *Cluster) deletePods() error { c.logger.Debugln("deleting pods") pods, err := c.listPods() @@ -282,7 +342,18 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { defer c.unregisterPodSubscriber(podName) stopChan := make(chan struct{}) - if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil { + err := retryutil.Retry(1*time.Second, 5*time.Second, + func() (bool, error) { + err2 := c.KubeClient.Pods(podName.Namespace).Delete( + context.TODO(), + podName.Name, + c.deleteOptions) + if err2 != nil { + return false, err2 + } + return true, nil + }) + if err != nil { return nil, fmt.Errorf("could not delete pod: %v", err) } @@ -297,7 +368,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { return pod, nil } -func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { +func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool { /* Operator should not re-create pods if there is at least one replica being bootstrapped @@ -306,20 +377,17 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { XXX operator cannot forbid replica re-init, so we might still fail if re-init is started after this check succeeds but before a pod is re-created */ - - for _, pod := range pods.Items { + for _, pod := range pods { c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP) } - for _, pod := range pods.Items { + for _, pod := range pods { var data patroni.MemberData err := retryutil.Retry(1*time.Second, 5*time.Second, func() (bool, error) { - var err error - data, err = c.patroni.GetMemberData(&pod) if err != nil { @@ -336,47 +404,39 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name) return false } - } return true } -func (c *Cluster) recreatePods() error { +func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error { c.setProcessName("starting to recreate pods") - ls := c.labelsSet(false) - namespace := c.Namespace - - listOptions := metav1.ListOptions{ - LabelSelector: ls.String(), - } - - pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions) - if err != nil { - return fmt.Errorf("could not get the list of pods: %v", err) - } - c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items)) + c.logger.Infof("there are %d pods in the cluster to recreate", len(pods)) if !c.isSafeToRecreatePods(pods) { return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized") } var ( - masterPod, newMasterPod, newPod *v1.Pod + masterPod, newMasterPod *v1.Pod ) - replicas := make([]spec.NamespacedName, 0) - for i, pod := range pods.Items { + replicas := switchoverCandidates + + for i, pod := range pods { role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) if role == Master { - masterPod = &pods.Items[i] + masterPod = &pods[i] continue } - podName := util.NameFromMeta(pods.Items[i].ObjectMeta) - if newPod, err = c.recreatePod(podName); err != nil { + podName := util.NameFromMeta(pod.ObjectMeta) + newPod, err := c.recreatePod(podName) + if err != nil { return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) } - if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica { + + newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]) + if newRole == Replica { replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) } else if newRole == Master { newMasterPod = newPod @@ -384,7 +444,9 @@ func (c *Cluster) recreatePods() error { } if masterPod != nil { - // failover if we have not observed a master pod when re-creating former replicas. + // switchover if + // 1. we have not observed a new master pod when re-creating former replicas + // 2. we know possible switchover targets even when no replicas were recreated if newMasterPod == nil && len(replicas) > 0 { if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil { c.logger.Warningf("could not perform switch over: %v", err) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index bcc568adc..ed8b4099c 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -19,7 +19,7 @@ import ( ) const ( - rollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update-required" + rollingUpdatePodAnnotationKey = "zalando-postgres-operator-rolling-update-required" ) func (c *Cluster) listResources() error { @@ -147,79 +147,6 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error { return nil } -// setRollingUpdateFlagForStatefulSet sets the indicator or the rolling update requirement -// in the StatefulSet annotation. -func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *appsv1.StatefulSet, val bool, msg string) { - anno := sset.GetAnnotations() - if anno == nil { - anno = make(map[string]string) - } - - anno[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val) - sset.SetAnnotations(anno) - c.logger.Debugf("set statefulset's rolling update annotation to %t: caller/reason %s", val, msg) -} - -// applyRollingUpdateFlagforStatefulSet sets the rolling update flag for the cluster's StatefulSet -// and applies that setting to the actual running cluster. -func (c *Cluster) applyRollingUpdateFlagforStatefulSet(val bool) error { - c.setRollingUpdateFlagForStatefulSet(c.Statefulset, val, "applyRollingUpdateFlag") - sset, err := c.updateStatefulSetAnnotations(c.Statefulset.GetAnnotations()) - if err != nil { - return err - } - c.Statefulset = sset - return nil -} - -// getRollingUpdateFlagFromStatefulSet returns the value of the rollingUpdate flag from the passed -// StatefulSet, reverting to the default value in case of errors -func (c *Cluster) getRollingUpdateFlagFromStatefulSet(sset *appsv1.StatefulSet, defaultValue bool) (flag bool) { - anno := sset.GetAnnotations() - flag = defaultValue - - stringFlag, exists := anno[rollingUpdateStatefulsetAnnotationKey] - if exists { - var err error - if flag, err = strconv.ParseBool(stringFlag); err != nil { - c.logger.Warnf("error when parsing %q annotation for the statefulset %q: expected boolean value, got %q\n", - rollingUpdateStatefulsetAnnotationKey, - types.NamespacedName{Namespace: sset.Namespace, Name: sset.Name}, - stringFlag) - flag = defaultValue - } - } - return flag -} - -// mergeRollingUpdateFlagUsingCache returns the value of the rollingUpdate flag from the passed -// statefulset, however, the value can be cleared if there is a cached flag in the cluster that -// is set to false (the discrepancy could be a result of a failed StatefulSet update) -func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *appsv1.StatefulSet) bool { - var ( - cachedStatefulsetExists, clearRollingUpdateFromCache, podsRollingUpdateRequired bool - ) - - if c.Statefulset != nil { - // if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update - // the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying - // on the 'cached' in-memory flag. - cachedStatefulsetExists = true - clearRollingUpdateFromCache = !c.getRollingUpdateFlagFromStatefulSet(c.Statefulset, true) - c.logger.Debugf("cached StatefulSet value exists, rollingUpdate flag is %t", clearRollingUpdateFromCache) - } - - if podsRollingUpdateRequired = c.getRollingUpdateFlagFromStatefulSet(runningStatefulSet, false); podsRollingUpdateRequired { - if cachedStatefulsetExists && clearRollingUpdateFromCache { - c.logger.Infof("clearing the rolling update flag based on the cached information") - podsRollingUpdateRequired = false - } else { - c.logger.Infof("found a statefulset with an unfinished rolling update of the pods") - } - } - return podsRollingUpdateRequired -} - func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) { c.logger.Debugf("patching statefulset annotations") patchData, err := metaAnnotationsPatch(annotations) @@ -237,8 +164,8 @@ func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (* return nil, fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err) } return result, nil - } + func (c *Cluster) updateStatefulSet(newStatefulSet *appsv1.StatefulSet) error { c.setProcessName("updating statefulset") if c.Statefulset == nil { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index d104fd6e0..62d9c1864 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -283,22 +283,24 @@ func (c *Cluster) mustUpdatePodsAfterLazyUpdate(desiredSset *appsv1.StatefulSet) } func (c *Cluster) syncStatefulSet() error { - var ( - podsRollingUpdateRequired bool - ) + + podsToRecreate := make([]v1.Pod, 0) + switchoverCandidates := make([]spec.NamespacedName, 0) + + pods, err := c.listPods() + if err != nil { + c.logger.Infof("could not list pods of the statefulset: %v", err) + } + // NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early. sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not get statefulset: %v", err) + return fmt.Errorf("error during reading of statefulset: %v", err) } // statefulset does not exist, try to re-create it c.Statefulset = nil - c.logger.Infof("could not find the cluster's statefulset") - pods, err := c.listPods() - if err != nil { - return fmt.Errorf("could not list pods of the statefulset: %v", err) - } + c.logger.Infof("cluster's statefulset does not exist") sset, err = c.createStatefulSet() if err != nil { @@ -309,41 +311,63 @@ func (c *Cluster) syncStatefulSet() error { return fmt.Errorf("cluster is not ready: %v", err) } - podsRollingUpdateRequired = (len(pods) > 0) - if podsRollingUpdateRequired { - c.logger.Warningf("found pods from the previous statefulset: trigger rolling update") - if err := c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired); err != nil { - return fmt.Errorf("could not set rolling update flag for the statefulset: %v", err) + if len(pods) > 0 { + for _, pod := range pods { + if err = c.markRollingUpdateFlagForPod(&pod, "pod from previous statefulset"); err != nil { + c.logger.Warnf("marking old pod for rolling update failed: %v", err) + } + podsToRecreate = append(podsToRecreate, pod) } } c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) } else { - podsRollingUpdateRequired = c.mergeRollingUpdateFlagUsingCache(sset) + // check if there are still pods with a rolling update flag + for _, pod := range pods { + if c.getRollingUpdateFlagFromPod(&pod) { + podsToRecreate = append(podsToRecreate, pod) + } else { + role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) + if role == Master { + continue + } + switchoverCandidates = append(switchoverCandidates, util.NameFromMeta(pod.ObjectMeta)) + } + } + + if len(podsToRecreate) > 0 { + c.logger.Debugf("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods)) + } + // statefulset is already there, make sure we use its definition in order to compare with the spec. c.Statefulset = sset - desiredSS, err := c.generateStatefulSet(&c.Spec) + desiredSts, err := c.generateStatefulSet(&c.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } - c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "from cache") - cmp := c.compareStatefulSetWith(desiredSS) + cmp := c.compareStatefulSetWith(desiredSts) if !cmp.match { - if cmp.rollingUpdate && !podsRollingUpdateRequired { - podsRollingUpdateRequired = true - c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "statefulset changes") + if cmp.rollingUpdate { + podsToRecreate = make([]v1.Pod, 0) + switchoverCandidates = make([]spec.NamespacedName, 0) + for _, pod := range pods { + if err = c.markRollingUpdateFlagForPod(&pod, "pod changes"); err != nil { + return fmt.Errorf("updating rolling update flag for pod failed: %v", err) + } + podsToRecreate = append(podsToRecreate, pod) + } } - c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) + c.logStatefulSetChanges(c.Statefulset, desiredSts, false, cmp.reasons) if !cmp.replace { - if err := c.updateStatefulSet(desiredSS); err != nil { + if err := c.updateStatefulSet(desiredSts); err != nil { return fmt.Errorf("could not update statefulset: %v", err) } } else { - if err := c.replaceStatefulSet(desiredSS); err != nil { + if err := c.replaceStatefulSet(desiredSts); err != nil { return fmt.Errorf("could not replace statefulset: %v", err) } } @@ -351,18 +375,30 @@ func (c *Cluster) syncStatefulSet() error { c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations))) - if !podsRollingUpdateRequired && !c.OpConfig.EnableLazySpiloUpgrade { - // even if desired and actual statefulsets match + if len(podsToRecreate) == 0 && !c.OpConfig.EnableLazySpiloUpgrade { + // even if the desired and the running statefulsets match // there still may be not up-to-date pods on condition // (a) the lazy update was just disabled // and // (b) some of the pods were not restarted when the lazy update was still in place - podsRollingUpdateRequired, err = c.mustUpdatePodsAfterLazyUpdate(desiredSS) - if err != nil { - return fmt.Errorf("could not list pods of the statefulset: %v", err) + for _, pod := range pods { + effectivePodImage := pod.Spec.Containers[0].Image + stsImage := desiredSts.Spec.Template.Spec.Containers[0].Image + + if stsImage != effectivePodImage { + if err = c.markRollingUpdateFlagForPod(&pod, "pod not yet restarted due to lazy update"); err != nil { + c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err) + } + podsToRecreate = append(podsToRecreate, pod) + } else { + role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) + if role == Master { + continue + } + switchoverCandidates = append(switchoverCandidates, util.NameFromMeta(pod.ObjectMeta)) + } } } - } // Apply special PostgreSQL parameters that can only be set via the Patroni API. @@ -374,17 +410,13 @@ func (c *Cluster) syncStatefulSet() error { // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) - if podsRollingUpdateRequired { + if len(podsToRecreate) > 0 { c.logger.Debugln("performing rolling update") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") - if err := c.recreatePods(); err != nil { + if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } - c.logger.Infof("pods have been recreated") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") - if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil { - c.logger.Warningf("could not clear rolling update for the statefulset: %v", err) - } } return nil }