From 4421724848f9ac17f843fc298bb1cc8e72fd6ace Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 3 Feb 2021 14:07:58 +0100 Subject: [PATCH] steer rolling update via pod annotation --- e2e/tests/test_e2e.py | 2 +- pkg/cluster/k8sres.go | 3 - pkg/cluster/pod.go | 121 ++++++++++++++++++++++++++++++++++----- pkg/cluster/resources.go | 70 +++------------------- pkg/cluster/sync.go | 62 ++++++++++++-------- 5 files changed, 154 insertions(+), 104 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 87bed3baa..dd99fd135 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -588,7 +588,7 @@ class EndToEndTestCase(unittest.TestCase): k8s.update_config(unpatch_lazy_spilo_upgrade, step="patch lazy upgrade") # at this point operator will complete the normal rolling upgrade - # so we additonally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works + # so we additionally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), conf_image, "Rolling upgrade was not executed", 50, 3) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 89d541a59..2ac5cd31c 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -6,7 +6,6 @@ import ( "fmt" "path" "sort" - "strconv" "strings" "github.com/sirupsen/logrus" @@ -1279,8 +1278,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef } stsAnnotations := make(map[string]string) - // TODO remove from sts - stsAnnotations[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(false) stsAnnotations = c.AnnotationsToPropagate(c.annotationsSet(nil)) statefulSet := &appsv1.StatefulSet{ diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 39f29ff28..f79cb8e80 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "math/rand" + "strconv" "time" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" @@ -45,6 +47,93 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) { return pods.Items, nil } +// enableRollingUpdateFlagForPod sets the indicator for the rolling update requirement +// in the Pod annotation. +func (c *Cluster) enableRollingUpdateFlagForPod(pod v1.Pod, msg string) error { + c.logger.Debugf("enable rolling update annotation for %s: reason %s", pod.Name, msg) + flag := make(map[string]string) + flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true) + + patchData, err := metaAnnotationsPatch(flag) + if err != nil { + return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err) + } + + _, err = c.KubeClient.Pods(pod.Namespace).Patch( + context.TODO(), + pod.Name, + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "") + if err != nil { + return fmt.Errorf("could not patch pod rolling update flag %q: %v", patchData, err) + } + + return nil +} + +// enableRollingUpdateFlagForPods sets the indicator for the rolling update requirement +// on pods that do not have it yet +func (c *Cluster) enableRollingUpdateFlagForPods(pods []v1.Pod, msg string) error { + for _, pod := range pods { + if c.getRollingUpdateFlagFromPod(&pod, true) { + if err := c.enableRollingUpdateFlagForPod(pod, msg); err != nil { + return fmt.Errorf("enabling rolling update flag failed for pod %q: %v", pod.Name, err) + } + } + } + + return nil +} + +// getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the passed +// reverting to the default value in case of errors +func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod, defaultValue bool) (flag bool) { + anno := pod.GetAnnotations() + flag = defaultValue + + stringFlag, exists := anno[rollingUpdatePodAnnotationKey] + if exists { + var err error + if flag, err = strconv.ParseBool(stringFlag); err != nil { + c.logger.Warnf("error when parsing %q annotation for the pod %q: expected boolean value, got %q\n", + rollingUpdatePodAnnotationKey, + types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, + stringFlag) + flag = defaultValue + } + } + + return flag +} + +// countPodsWithRollingUpdateFlag returns the value of the rollingUpdate flag from the passed pod +// reverting to the default value in case of errors +func (c *Cluster) countPodsWithRollingUpdateFlag(defaultValue bool) (int, int) { + + pods, err := c.listPods() + if err != nil { + c.logger.Warnf("could not count pods with rolling update flag") + return 0, 0 + } + + desiredPodCount := c.Spec.NumberOfInstances + actualPodCount := len(pods) + podsToRollCount := int(desiredPodCount) - actualPodCount + if podsToRollCount < 0 { + podsToRollCount = 0 + } + + for _, pod := range pods { + if c.getRollingUpdateFlagFromPod(&pod, defaultValue) { + podsToRollCount++ + } + } + + return podsToRollCount, actualPodCount +} + func (c *Cluster) deletePods() error { c.logger.Debugln("deleting pods") pods, err := c.listPods() @@ -364,27 +453,29 @@ func (c *Cluster) recreatePods() error { ) replicas := make([]spec.NamespacedName, 0) for i, pod := range pods.Items { - role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) + // only recreate pod if rolling update flag is true + if c.getRollingUpdateFlagFromPod(&pod, false) { + role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) - if role == Master { - masterPod = &pods.Items[i] - continue - } + if role == Master { + masterPod = &pods.Items[i] + continue + } - podName := util.NameFromMeta(pods.Items[i].ObjectMeta) - if newPod, err = c.recreatePod(podName); err != nil { - return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) - } - if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica { - replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) - } else if newRole == Master { - newMasterPod = newPod + podName := util.NameFromMeta(pods.Items[i].ObjectMeta) + if newPod, err = c.recreatePod(podName); err != nil { + return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) + } + if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica { + replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) + } else if newRole == Master { + newMasterPod = newPod + } } } if masterPod != nil { - // failover if we have not observed a master pod when re-creating former replicas. - // TODO if masterPod has no rolling update label anymore skip switchover too + // failover if we have not observed a master pod when re-creating former replicas if newMasterPod == nil && len(replicas) > 0 { if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil { c.logger.Warningf("could not perform switch over: %v", err) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 712cf0430..a3110d2eb 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -20,7 +20,7 @@ import ( const ( // TODO rollingUpdatePodAnnotationKey = "zalando-postgres-operator-rolling-update-required" - rollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update-required" + rollingUpdatePodAnnotationKey = "zalando-postgres-operator-rolling-update-required" ) func (c *Cluster) listResources() error { @@ -148,84 +148,32 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error { return nil } -// TODO setRollingUpdateFlagForPod -// setRollingUpdateFlagForStatefulSet sets the indicator or the rolling update requirement -// in the StatefulSet annotation. -func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *appsv1.StatefulSet, val bool, msg string) { - anno := sset.GetAnnotations() - if anno == nil { - anno = make(map[string]string) - } - - anno[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val) - sset.SetAnnotations(anno) - c.logger.Debugf("set statefulset's rolling update annotation to %t: caller/reason %s", val, msg) -} - -// TODO applyRollingUpdateFlagForPod -// applyRollingUpdateFlagforStatefulSet sets the rolling update flag for the cluster's StatefulSet -// and applies that setting to the actual running cluster. -func (c *Cluster) applyRollingUpdateFlagforStatefulSet(val bool) error { - c.setRollingUpdateFlagForStatefulSet(c.Statefulset, val, "applyRollingUpdateFlag") - sset, err := c.updateStatefulSetAnnotations(c.Statefulset.GetAnnotations()) - if err != nil { - return err - } - c.Statefulset = sset - return nil -} - -// TODO getRollingUpdateFlagFromPod -// getRollingUpdateFlagFromStatefulSet returns the value of the rollingUpdate flag from the passed -// StatefulSet, reverting to the default value in case of errors -func (c *Cluster) getRollingUpdateFlagFromStatefulSet(sset *appsv1.StatefulSet, defaultValue bool) (flag bool) { - anno := sset.GetAnnotations() - flag = defaultValue - - stringFlag, exists := anno[rollingUpdateStatefulsetAnnotationKey] - if exists { - var err error - if flag, err = strconv.ParseBool(stringFlag); err != nil { - c.logger.Warnf("error when parsing %q annotation for the statefulset %q: expected boolean value, got %q\n", - rollingUpdateStatefulsetAnnotationKey, - types.NamespacedName{Namespace: sset.Namespace, Name: sset.Name}, - stringFlag) - flag = defaultValue - } - } - return flag -} - -// TODO mergeRollingUpdateFlagUsingCache(runningPod *v1.Pod) bool { // mergeRollingUpdateFlagUsingCache returns the value of the rollingUpdate flag from the passed // statefulset, however, the value can be cleared if there is a cached flag in the cluster that // is set to false (the discrepancy could be a result of a failed StatefulSet update) func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *appsv1.StatefulSet) bool { var ( - cachedStatefulsetExists, clearRollingUpdateFromCache, podsRollingUpdateRequired bool + cachedStatefulsetExists, podsRollingUpdateRequired bool ) if c.Statefulset != nil { // if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update // the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying // on the 'cached' in-memory flag. + c.logger.Debugf("cached StatefulSet value exists") cachedStatefulsetExists = true - clearRollingUpdateFromCache = !c.getRollingUpdateFlagFromStatefulSet(c.Statefulset, true) - c.logger.Debugf("cached StatefulSet value exists, rollingUpdate flag is %t", clearRollingUpdateFromCache) } - if podsRollingUpdateRequired = c.getRollingUpdateFlagFromStatefulSet(runningStatefulSet, false); podsRollingUpdateRequired { - if cachedStatefulsetExists && clearRollingUpdateFromCache { - c.logger.Infof("clearing the rolling update flag based on the cached information") - podsRollingUpdateRequired = false - } else { - c.logger.Infof("found a statefulset with an unfinished rolling update of the pods") - } + podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag(cachedStatefulsetExists) + + if podsToRollCount > 0 { + c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount) + podsRollingUpdateRequired = true } + return podsRollingUpdateRequired } -// updatePodAnnotations func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) { c.logger.Debugf("patching statefulset annotations") patchData, err := metaAnnotationsPatch(annotations) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 5c0f6ce84..6e9ecb32b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -280,6 +280,12 @@ func (c *Cluster) syncStatefulSet() error { var ( podsRollingUpdateRequired bool ) + + pods, err := c.listPods() + if err != nil { + c.logger.Infof("could not list pods of the statefulset: %v", err) + } + // NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early. sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{}) if err != nil { @@ -289,10 +295,6 @@ func (c *Cluster) syncStatefulSet() error { // statefulset does not exist, try to re-create it c.Statefulset = nil c.logger.Infof("could not find the cluster's statefulset") - pods, err := c.listPods() - if err != nil { - return fmt.Errorf("could not list pods of the statefulset: %v", err) - } sset, err = c.createStatefulSet() if err != nil { @@ -305,58 +307,73 @@ func (c *Cluster) syncStatefulSet() error { podsRollingUpdateRequired = (len(pods) > 0) if podsRollingUpdateRequired { - c.logger.Warningf("found pods from the previous statefulset: trigger rolling update") - if err := c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired); err != nil { - return fmt.Errorf("could not set rolling update flag for the statefulset: %v", err) + if err = c.enableRollingUpdateFlagForPods(pods, "pods from previous statefulset"); err != nil { + c.logger.Warnf("updating rolling update flag for existing pods failed: %v", err) } } c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) } else { - podsRollingUpdateRequired = c.mergeRollingUpdateFlagUsingCache(sset) + // check if there are still pods with a rolling update flag + // default value for flag depends on a potentially cached StatefulSet + podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag(c.Statefulset != nil) + + if podsToRollCount > 0 { + c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount) + podsRollingUpdateRequired = true + } + // statefulset is already there, make sure we use its definition in order to compare with the spec. c.Statefulset = sset - desiredSS, err := c.generateStatefulSet(&c.Spec) + desiredSts, err := c.generateStatefulSet(&c.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } - c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "from cache") - cmp := c.compareStatefulSetWith(desiredSS) + cmp := c.compareStatefulSetWith(desiredSts) if !cmp.match { if cmp.rollingUpdate && !podsRollingUpdateRequired { - podsRollingUpdateRequired = true - c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "statefulset changes") + podsRollingUpdateRequired = cmp.rollingUpdate + if err = c.enableRollingUpdateFlagForPods(pods, "pod changes"); err != nil { + return fmt.Errorf("updating rolling update flag for pods failed: %v", err) + } } - c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) + c.logStatefulSetChanges(c.Statefulset, desiredSts, false, cmp.reasons) if !cmp.replace { - if err := c.updateStatefulSet(desiredSS); err != nil { + if err := c.updateStatefulSet(desiredSts); err != nil { return fmt.Errorf("could not update statefulset: %v", err) } } else { - if err := c.replaceStatefulSet(desiredSS); err != nil { + if err := c.replaceStatefulSet(desiredSts); err != nil { return fmt.Errorf("could not replace statefulset: %v", err) } } } + // TODO why is this necessary? c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations))) if !podsRollingUpdateRequired && !c.OpConfig.EnableLazySpiloUpgrade { - // even if desired and actual statefulsets match + // even if the desired and the running statefulsets match // there still may be not up-to-date pods on condition // (a) the lazy update was just disabled // and // (b) some of the pods were not restarted when the lazy update was still in place - podsRollingUpdateRequired, err = c.mustUpdatePodsAfterLazyUpdate(desiredSS) - if err != nil { - return fmt.Errorf("could not list pods of the statefulset: %v", err) + for _, pod := range pods { + effectivePodImage := pod.Spec.Containers[0].Image + stsImage := desiredSts.Spec.Template.Spec.Containers[0].Image + + if stsImage != effectivePodImage { + podsRollingUpdateRequired = true + if err = c.enableRollingUpdateFlagForPod(pod, "pod not yet restarted due to lazy update"); err != nil { + c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err) + } + } } } - } // Apply special PostgreSQL parameters that can only be set via the Patroni API. @@ -376,9 +393,6 @@ func (c *Cluster) syncStatefulSet() error { } c.logger.Infof("pods have been recreated") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") - if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil { - c.logger.Warningf("could not clear rolling update for the statefulset: %v", err) - } } return nil }