From b0f394ff53d9c27c92ca7fd7e3f99f6d4b2178c6 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Thu, 4 Feb 2021 12:36:11 +0100 Subject: [PATCH] some more refactoring --- e2e/scripts/watch_objects.sh | 2 +- pkg/cluster/cluster.go | 2 +- pkg/cluster/pod.go | 9 +++++++-- pkg/cluster/sync.go | 26 ++++++++++---------------- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/e2e/scripts/watch_objects.sh b/e2e/scripts/watch_objects.sh index 4c9b82404..eeb5f4a1f 100755 --- a/e2e/scripts/watch_objects.sh +++ b/e2e/scripts/watch_objects.sh @@ -4,7 +4,7 @@ watch -c " kubectl get postgresql --all-namespaces echo echo -n 'Rolling upgrade pending: ' -kubectl get statefulset -o jsonpath='{.items..metadata.annotations.zalando-postgres-operator-rolling-update-required}' +kubectl get pods -o jsonpath='{.items..metadata.annotations.zalando-postgres-operator-rolling-update-required}' echo echo echo 'Pods' diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 16d399865..e7df6522e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1300,7 +1300,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e err = fmt.Errorf("could not get master pod label: %v", err) } } else { - err = fmt.Errorf("could not switch over: %v", err) + err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err) } // signal the role label waiting goroutine to close the shop and go home diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 9ee8ea8ae..f73221959 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -332,6 +332,8 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st } func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { + // TODO due to delays in sync, should double check if recreate annotation is still present + ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) stopChan := make(chan struct{}) @@ -431,7 +433,9 @@ func (c *Cluster) recreatePods(pods []v1.Pod) error { if err != nil { return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) } - if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica { + + newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]) + if newRole == Replica { replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) } else if newRole == Master { newMasterPod = newPod @@ -439,11 +443,12 @@ func (c *Cluster) recreatePods(pods []v1.Pod) error { } if masterPod != nil { - // failover if we have not observed a master pod when re-creating former replicas + // switchover if we have observed a master pod and replicas if newMasterPod == nil && len(replicas) > 0 { if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil { c.logger.Warningf("could not perform switch over: %v", err) } + // TODO if only the master pod came for recreate, we do not do switchover currently } else if newMasterPod == nil && len(replicas) == 0 { c.logger.Warningf("cannot perform switch over before re-creating the pod: no replicas") } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 94763369d..acc6e7835 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -277,10 +277,7 @@ func (c *Cluster) mustUpdatePodsAfterLazyUpdate(desiredSset *appsv1.StatefulSet) } func (c *Cluster) syncStatefulSet() error { - var ( - podsRollingUpdateRequired bool - podsToRecreate []v1.Pod - ) + var podsToRecreate []v1.Pod pods, err := c.listPods() if err != nil { @@ -291,11 +288,11 @@ func (c *Cluster) syncStatefulSet() error { sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not get statefulset: %v", err) + return fmt.Errorf("error during reading of statefulset: %v", err) } // statefulset does not exist, try to re-create it c.Statefulset = nil - c.logger.Infof("could not find the cluster's statefulset") + c.logger.Infof("cluster's statefulset does not exist") sset, err = c.createStatefulSet() if err != nil { @@ -306,11 +303,10 @@ func (c *Cluster) syncStatefulSet() error { return fmt.Errorf("cluster is not ready: %v", err) } - podsRollingUpdateRequired = (len(pods) > 0) - if podsRollingUpdateRequired { + if len(pods) > 0 { for _, pod := range pods { if err = c.markRollingUpdateFlagForPod(&pod, "pod from previous statefulset"); err != nil { - c.logger.Warnf("updating rolling update flag for existing pod failed: %v", err) + c.logger.Warnf("marking old pod for rolling update failed: %v", err) } podsToRecreate = append(podsToRecreate, pod) } @@ -327,7 +323,6 @@ func (c *Cluster) syncStatefulSet() error { if len(podsToRecreate) > 0 { c.logger.Debugf("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods)) - podsRollingUpdateRequired = true } // statefulset is already there, make sure we use its definition in order to compare with the spec. @@ -340,8 +335,8 @@ func (c *Cluster) syncStatefulSet() error { cmp := c.compareStatefulSetWith(desiredSts) if !cmp.match { - if cmp.rollingUpdate && !podsRollingUpdateRequired { - podsRollingUpdateRequired = cmp.rollingUpdate + if cmp.rollingUpdate { + podsToRecreate = make([]v1.Pod, 0) for _, pod := range pods { if err = c.markRollingUpdateFlagForPod(&pod, "pod changes"); err != nil { return fmt.Errorf("updating rolling update flag for pod failed: %v", err) @@ -364,9 +359,9 @@ func (c *Cluster) syncStatefulSet() error { } // TODO why is this necessary? - c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations))) + // c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations))) - if !podsRollingUpdateRequired && !c.OpConfig.EnableLazySpiloUpgrade { + if len(podsToRecreate) == 0 && !c.OpConfig.EnableLazySpiloUpgrade { // even if the desired and the running statefulsets match // there still may be not up-to-date pods on condition // (a) the lazy update was just disabled @@ -377,7 +372,6 @@ func (c *Cluster) syncStatefulSet() error { stsImage := desiredSts.Spec.Template.Spec.Containers[0].Image if stsImage != effectivePodImage { - podsRollingUpdateRequired = true if err = c.markRollingUpdateFlagForPod(&pod, "pod not yet restarted due to lazy update"); err != nil { c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err) } @@ -396,7 +390,7 @@ func (c *Cluster) syncStatefulSet() error { // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) - if podsRollingUpdateRequired { + if len(podsToRecreate) > 0 { c.logger.Debugln("performing rolling update") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") if err := c.recreatePods(podsToRecreate); err != nil {