From 1f1fcd86573fcdd8440912c6c0934204e1aa779c Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Thu, 4 Feb 2021 09:49:01 +0100 Subject: [PATCH] pass only pods to recreatePods function --- pkg/cluster/pod.go | 108 ++++++++++++++------------------------------ pkg/cluster/sync.go | 31 +++++++++---- 2 files changed, 56 insertions(+), 83 deletions(-) diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 396878268..9ee8ea8ae 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -49,7 +49,7 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) { // markRollingUpdateFlagForPod sets the indicator for the rolling update requirement // in the Pod annotation. -func (c *Cluster) markRollingUpdateFlagForPod(pod v1.Pod, msg string) error { +func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error { c.logger.Debugf("mark rolling update annotation for %s: reason %s", pod.Name, msg) flag := make(map[string]string) flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true) @@ -80,18 +80,6 @@ func (c *Cluster) markRollingUpdateFlagForPod(pod v1.Pod, msg string) error { return nil } -// markRollingUpdateFlagForPods sets the indicator for the rolling update requirement -// on pods that do not have it yet -func (c *Cluster) markRollingUpdateFlagForPods(pods []v1.Pod, msg string) error { - for _, pod := range pods { - if err := c.markRollingUpdateFlagForPod(pod, msg); err != nil { - return fmt.Errorf("marking rolling update flag failed for pod %q: %v", pod.Name, err) - } - } - - return nil -} - // getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the passed func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) { anno := pod.GetAnnotations() @@ -100,6 +88,7 @@ func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) { stringFlag, exists := anno[rollingUpdatePodAnnotationKey] if exists { var err error + c.logger.Debugf("found rolling update flag on pod %q", pod.Name) if flag, err = strconv.ParseBool(stringFlag); err != nil { c.logger.Warnf("error when parsing %q annotation for the pod %q: expected boolean value, got %q\n", rollingUpdatePodAnnotationKey, @@ -111,31 +100,6 @@ func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) { return flag } -// countPodsWithRollingUpdateFlag returns the value of the rollingUpdate flag from the passed pod -func (c *Cluster) countPodsWithRollingUpdateFlag() (int, int) { - - pods, err := c.listPods() - if err != nil { - c.logger.Warnf("could not count pods with rolling update flag") - return 0, 0 - } - - desiredPodCount := c.Spec.NumberOfInstances - actualPodCount := len(pods) - podsToRollCount := int(desiredPodCount) - actualPodCount - if podsToRollCount < 0 { - podsToRollCount = 0 - } - - for _, pod := range pods { - if c.getRollingUpdateFlagFromPod(&pod) { - podsToRollCount++ - } - } - - return podsToRollCount, actualPodCount -} - func (c *Cluster) deletePods() error { c.logger.Debugln("deleting pods") pods, err := c.listPods() @@ -372,7 +336,18 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { defer c.unregisterPodSubscriber(podName) stopChan := make(chan struct{}) - if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil { + err := retryutil.Retry(1*time.Second, 5*time.Second, + func() (bool, error) { + err2 := c.KubeClient.Pods(podName.Namespace).Delete( + context.TODO(), + podName.Name, + c.deleteOptions) + if err2 != nil { + return false, err2 + } + return true, nil + }) + if err != nil { return nil, fmt.Errorf("could not delete pod: %v", err) } @@ -387,7 +362,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { return pod, nil } -func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { +func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool { /* Operator should not re-create pods if there is at least one replica being bootstrapped @@ -397,11 +372,11 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { after this check succeeds but before a pod is re-created */ - for _, pod := range pods.Items { + for _, pod := range pods { c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP) } - for _, pod := range pods.Items { + for _, pod := range pods { var state string @@ -431,48 +406,35 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { return true } -func (c *Cluster) recreatePods() error { +func (c *Cluster) recreatePods(pods []v1.Pod) error { c.setProcessName("starting to recreate pods") - ls := c.labelsSet(false) - namespace := c.Namespace - - listOptions := metav1.ListOptions{ - LabelSelector: ls.String(), - } - - pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions) - if err != nil { - return fmt.Errorf("could not get the list of pods: %v", err) - } - c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items)) + c.logger.Infof("there are %d pods in the cluster to recreate", len(pods)) if !c.isSafeToRecreatePods(pods) { return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized") } var ( - masterPod, newMasterPod, newPod *v1.Pod + masterPod, newMasterPod *v1.Pod ) replicas := make([]spec.NamespacedName, 0) - for i, pod := range pods.Items { - // only recreate pod if rolling update flag is true - if c.getRollingUpdateFlagFromPod(&pod) { - role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) + for _, pod := range pods { + role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) - if role == Master { - masterPod = &pods.Items[i] - continue - } + if role == Master { + masterPod = &pod + continue + } - podName := util.NameFromMeta(pods.Items[i].ObjectMeta) - if newPod, err = c.recreatePod(podName); err != nil { - return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) - } - if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica { - replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) - } else if newRole == Master { - newMasterPod = newPod - } + podName := util.NameFromMeta(pod.ObjectMeta) + newPod, err := c.recreatePod(podName) + if err != nil { + return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) + } + if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica { + replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) + } else if newRole == Master { + newMasterPod = newPod } } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 55c19e15b..94763369d 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -279,6 +279,7 @@ func (c *Cluster) mustUpdatePodsAfterLazyUpdate(desiredSset *appsv1.StatefulSet) func (c *Cluster) syncStatefulSet() error { var ( podsRollingUpdateRequired bool + podsToRecreate []v1.Pod ) pods, err := c.listPods() @@ -307,19 +308,25 @@ func (c *Cluster) syncStatefulSet() error { podsRollingUpdateRequired = (len(pods) > 0) if podsRollingUpdateRequired { - if err = c.markRollingUpdateFlagForPods(pods, "pods from previous statefulset"); err != nil { - c.logger.Warnf("updating rolling update flag for existing pods failed: %v", err) + for _, pod := range pods { + if err = c.markRollingUpdateFlagForPod(&pod, "pod from previous statefulset"); err != nil { + c.logger.Warnf("updating rolling update flag for existing pod failed: %v", err) + } + podsToRecreate = append(podsToRecreate, pod) } } c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) } else { // check if there are still pods with a rolling update flag - // default value for flag depends on a potentially cached StatefulSet - podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag() + for _, pod := range pods { + if c.getRollingUpdateFlagFromPod(&pod) { + podsToRecreate = append(podsToRecreate, pod) + } + } - if podsToRollCount > 0 { - c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount) + if len(podsToRecreate) > 0 { + c.logger.Debugf("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods)) podsRollingUpdateRequired = true } @@ -335,8 +342,11 @@ func (c *Cluster) syncStatefulSet() error { if !cmp.match { if cmp.rollingUpdate && !podsRollingUpdateRequired { podsRollingUpdateRequired = cmp.rollingUpdate - if err = c.markRollingUpdateFlagForPods(pods, "pod changes"); err != nil { - return fmt.Errorf("updating rolling update flag for pods failed: %v", err) + for _, pod := range pods { + if err = c.markRollingUpdateFlagForPod(&pod, "pod changes"); err != nil { + return fmt.Errorf("updating rolling update flag for pod failed: %v", err) + } + podsToRecreate = append(podsToRecreate, pod) } } @@ -368,9 +378,10 @@ func (c *Cluster) syncStatefulSet() error { if stsImage != effectivePodImage { podsRollingUpdateRequired = true - if err = c.markRollingUpdateFlagForPod(pod, "pod not yet restarted due to lazy update"); err != nil { + if err = c.markRollingUpdateFlagForPod(&pod, "pod not yet restarted due to lazy update"); err != nil { c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err) } + podsToRecreate = append(podsToRecreate, pod) } } } @@ -388,7 +399,7 @@ func (c *Cluster) syncStatefulSet() error { if podsRollingUpdateRequired { c.logger.Debugln("performing rolling update") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") - if err := c.recreatePods(); err != nil { + if err := c.recreatePods(podsToRecreate); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")