diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 091a3907d..396878268 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -47,10 +47,10 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) { return pods.Items, nil } -// enableRollingUpdateFlagForPod sets the indicator for the rolling update requirement +// markRollingUpdateFlagForPod sets the indicator for the rolling update requirement // in the Pod annotation. -func (c *Cluster) enableRollingUpdateFlagForPod(pod v1.Pod, msg string) error { - c.logger.Debugf("enable rolling update annotation for %s: reason %s", pod.Name, msg) +func (c *Cluster) markRollingUpdateFlagForPod(pod v1.Pod, msg string) error { + c.logger.Debugf("mark rolling update annotation for %s: reason %s", pod.Name, msg) flag := make(map[string]string) flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true) @@ -59,13 +59,20 @@ func (c *Cluster) enableRollingUpdateFlagForPod(pod v1.Pod, msg string) error { return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err) } - _, err = c.KubeClient.Pods(pod.Namespace).Patch( - context.TODO(), - pod.Name, - types.MergePatchType, - []byte(patchData), - metav1.PatchOptions{}, - "") + err = retryutil.Retry(1*time.Second, 5*time.Second, + func() (bool, error) { + _, err2 := c.KubeClient.Pods(pod.Namespace).Patch( + context.TODO(), + pod.Name, + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "") + if err2 != nil { + return false, err2 + } + return true, nil + }) if err != nil { return fmt.Errorf("could not patch pod rolling update flag %q: %v", patchData, err) } @@ -73,12 +80,12 @@ func (c *Cluster) enableRollingUpdateFlagForPod(pod v1.Pod, msg string) error { return nil } -// enableRollingUpdateFlagForPods sets the indicator for the rolling update requirement +// markRollingUpdateFlagForPods sets the indicator for the rolling update requirement // on pods that do not have it yet -func (c *Cluster) enableRollingUpdateFlagForPods(pods []v1.Pod, msg string) error { +func (c *Cluster) markRollingUpdateFlagForPods(pods []v1.Pod, msg string) error { for _, pod := range pods { - if err := c.enableRollingUpdateFlagForPod(pod, msg); err != nil { - return fmt.Errorf("enabling rolling update flag failed for pod %q: %v", pod.Name, err) + if err := c.markRollingUpdateFlagForPod(pod, msg); err != nil { + return fmt.Errorf("marking rolling update flag failed for pod %q: %v", pod.Name, err) } } @@ -86,10 +93,9 @@ func (c *Cluster) enableRollingUpdateFlagForPods(pods []v1.Pod, msg string) erro } // getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the passed -// reverting to the default value in case of errors -func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod, defaultValue bool) (flag bool) { +func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) { anno := pod.GetAnnotations() - flag = defaultValue + flag = false stringFlag, exists := anno[rollingUpdatePodAnnotationKey] if exists { @@ -99,7 +105,6 @@ func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod, defaultValue bool) (f rollingUpdatePodAnnotationKey, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, stringFlag) - flag = defaultValue } } @@ -107,8 +112,7 @@ func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod, defaultValue bool) (f } // countPodsWithRollingUpdateFlag returns the value of the rollingUpdate flag from the passed pod -// reverting to the default value in case of errors -func (c *Cluster) countPodsWithRollingUpdateFlag(defaultValue bool) (int, int) { +func (c *Cluster) countPodsWithRollingUpdateFlag() (int, int) { pods, err := c.listPods() if err != nil { @@ -124,7 +128,7 @@ func (c *Cluster) countPodsWithRollingUpdateFlag(defaultValue bool) (int, int) { } for _, pod := range pods { - if c.getRollingUpdateFlagFromPod(&pod, defaultValue) { + if c.getRollingUpdateFlagFromPod(&pod) { podsToRollCount++ } } @@ -452,7 +456,7 @@ func (c *Cluster) recreatePods() error { replicas := make([]spec.NamespacedName, 0) for i, pod := range pods.Items { // only recreate pod if rolling update flag is true - if c.getRollingUpdateFlagFromPod(&pod, false) { + if c.getRollingUpdateFlagFromPod(&pod) { role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) if role == Master { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 8bd42bb68..ed8b4099c 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -147,32 +147,6 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error { return nil } -// mergeRollingUpdateFlagUsingCache returns the value of the rollingUpdate flag from the passed -// statefulset, however, the value can be cleared if there is a cached flag in the cluster that -// is set to false (the discrepancy could be a result of a failed StatefulSet update) -func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *appsv1.StatefulSet) bool { - var ( - cachedStatefulsetExists, podsRollingUpdateRequired bool - ) - - if c.Statefulset != nil { - // if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update - // the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying - // on the 'cached' in-memory flag. - c.logger.Debugf("cached StatefulSet value exists") - cachedStatefulsetExists = true - } - - podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag(cachedStatefulsetExists) - - if podsToRollCount > 0 { - c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount) - podsRollingUpdateRequired = true - } - - return podsRollingUpdateRequired -} - func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) { c.logger.Debugf("patching statefulset annotations") patchData, err := metaAnnotationsPatch(annotations) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 6e9ecb32b..55c19e15b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -307,7 +307,7 @@ func (c *Cluster) syncStatefulSet() error { podsRollingUpdateRequired = (len(pods) > 0) if podsRollingUpdateRequired { - if err = c.enableRollingUpdateFlagForPods(pods, "pods from previous statefulset"); err != nil { + if err = c.markRollingUpdateFlagForPods(pods, "pods from previous statefulset"); err != nil { c.logger.Warnf("updating rolling update flag for existing pods failed: %v", err) } } @@ -316,7 +316,7 @@ func (c *Cluster) syncStatefulSet() error { } else { // check if there are still pods with a rolling update flag // default value for flag depends on a potentially cached StatefulSet - podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag(c.Statefulset != nil) + podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag() if podsToRollCount > 0 { c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount) @@ -335,7 +335,7 @@ func (c *Cluster) syncStatefulSet() error { if !cmp.match { if cmp.rollingUpdate && !podsRollingUpdateRequired { podsRollingUpdateRequired = cmp.rollingUpdate - if err = c.enableRollingUpdateFlagForPods(pods, "pod changes"); err != nil { + if err = c.markRollingUpdateFlagForPods(pods, "pod changes"); err != nil { return fmt.Errorf("updating rolling update flag for pods failed: %v", err) } } @@ -368,7 +368,7 @@ func (c *Cluster) syncStatefulSet() error { if stsImage != effectivePodImage { podsRollingUpdateRequired = true - if err = c.enableRollingUpdateFlagForPod(pod, "pod not yet restarted due to lazy update"); err != nil { + if err = c.markRollingUpdateFlagForPod(pod, "pod not yet restarted due to lazy update"); err != nil { c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err) } } @@ -391,7 +391,6 @@ func (c *Cluster) syncStatefulSet() error { if err := c.recreatePods(); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } - c.logger.Infof("pods have been recreated") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") } return nil