some more refactoring

This commit is contained in:
Felix Kunde 2021-02-04 12:36:11 +01:00
parent 1f1fcd8657
commit b0f394ff53
4 changed files with 19 additions and 20 deletions

View File

@ -4,7 +4,7 @@ watch -c "
kubectl get postgresql --all-namespaces kubectl get postgresql --all-namespaces
echo echo
echo -n 'Rolling upgrade pending: ' echo -n 'Rolling upgrade pending: '
kubectl get statefulset -o jsonpath='{.items..metadata.annotations.zalando-postgres-operator-rolling-update-required}' kubectl get pods -o jsonpath='{.items..metadata.annotations.zalando-postgres-operator-rolling-update-required}'
echo echo
echo echo
echo 'Pods' echo 'Pods'

View File

@ -1300,7 +1300,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
err = fmt.Errorf("could not get master pod label: %v", err) err = fmt.Errorf("could not get master pod label: %v", err)
} }
} else { } else {
err = fmt.Errorf("could not switch over: %v", err) err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err)
} }
// signal the role label waiting goroutine to close the shop and go home // signal the role label waiting goroutine to close the shop and go home

View File

@ -332,6 +332,8 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st
} }
func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
// TODO due to delays in sync, should double check if recreate annotation is still present
ch := c.registerPodSubscriber(podName) ch := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName) defer c.unregisterPodSubscriber(podName)
stopChan := make(chan struct{}) stopChan := make(chan struct{})
@ -431,7 +433,9 @@ func (c *Cluster) recreatePods(pods []v1.Pod) error {
if err != nil { if err != nil {
return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
} }
if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica {
newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel])
if newRole == Replica {
replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta))
} else if newRole == Master { } else if newRole == Master {
newMasterPod = newPod newMasterPod = newPod
@ -439,11 +443,12 @@ func (c *Cluster) recreatePods(pods []v1.Pod) error {
} }
if masterPod != nil { if masterPod != nil {
// failover if we have not observed a master pod when re-creating former replicas // switchover if we have observed a master pod and replicas
if newMasterPod == nil && len(replicas) > 0 { if newMasterPod == nil && len(replicas) > 0 {
if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil { if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil {
c.logger.Warningf("could not perform switch over: %v", err) c.logger.Warningf("could not perform switch over: %v", err)
} }
// TODO if only the master pod came for recreate, we do not do switchover currently
} else if newMasterPod == nil && len(replicas) == 0 { } else if newMasterPod == nil && len(replicas) == 0 {
c.logger.Warningf("cannot perform switch over before re-creating the pod: no replicas") c.logger.Warningf("cannot perform switch over before re-creating the pod: no replicas")
} }

View File

@ -277,10 +277,7 @@ func (c *Cluster) mustUpdatePodsAfterLazyUpdate(desiredSset *appsv1.StatefulSet)
} }
func (c *Cluster) syncStatefulSet() error { func (c *Cluster) syncStatefulSet() error {
var ( var podsToRecreate []v1.Pod
podsRollingUpdateRequired bool
podsToRecreate []v1.Pod
)
pods, err := c.listPods() pods, err := c.listPods()
if err != nil { if err != nil {
@ -291,11 +288,11 @@ func (c *Cluster) syncStatefulSet() error {
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{}) sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{})
if err != nil { if err != nil {
if !k8sutil.ResourceNotFound(err) { if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get statefulset: %v", err) return fmt.Errorf("error during reading of statefulset: %v", err)
} }
// statefulset does not exist, try to re-create it // statefulset does not exist, try to re-create it
c.Statefulset = nil c.Statefulset = nil
c.logger.Infof("could not find the cluster's statefulset") c.logger.Infof("cluster's statefulset does not exist")
sset, err = c.createStatefulSet() sset, err = c.createStatefulSet()
if err != nil { if err != nil {
@ -306,11 +303,10 @@ func (c *Cluster) syncStatefulSet() error {
return fmt.Errorf("cluster is not ready: %v", err) return fmt.Errorf("cluster is not ready: %v", err)
} }
podsRollingUpdateRequired = (len(pods) > 0) if len(pods) > 0 {
if podsRollingUpdateRequired {
for _, pod := range pods { for _, pod := range pods {
if err = c.markRollingUpdateFlagForPod(&pod, "pod from previous statefulset"); err != nil { if err = c.markRollingUpdateFlagForPod(&pod, "pod from previous statefulset"); err != nil {
c.logger.Warnf("updating rolling update flag for existing pod failed: %v", err) c.logger.Warnf("marking old pod for rolling update failed: %v", err)
} }
podsToRecreate = append(podsToRecreate, pod) podsToRecreate = append(podsToRecreate, pod)
} }
@ -327,7 +323,6 @@ func (c *Cluster) syncStatefulSet() error {
if len(podsToRecreate) > 0 { if len(podsToRecreate) > 0 {
c.logger.Debugf("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods)) c.logger.Debugf("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods))
podsRollingUpdateRequired = true
} }
// statefulset is already there, make sure we use its definition in order to compare with the spec. // statefulset is already there, make sure we use its definition in order to compare with the spec.
@ -340,8 +335,8 @@ func (c *Cluster) syncStatefulSet() error {
cmp := c.compareStatefulSetWith(desiredSts) cmp := c.compareStatefulSetWith(desiredSts)
if !cmp.match { if !cmp.match {
if cmp.rollingUpdate && !podsRollingUpdateRequired { if cmp.rollingUpdate {
podsRollingUpdateRequired = cmp.rollingUpdate podsToRecreate = make([]v1.Pod, 0)
for _, pod := range pods { for _, pod := range pods {
if err = c.markRollingUpdateFlagForPod(&pod, "pod changes"); err != nil { if err = c.markRollingUpdateFlagForPod(&pod, "pod changes"); err != nil {
return fmt.Errorf("updating rolling update flag for pod failed: %v", err) return fmt.Errorf("updating rolling update flag for pod failed: %v", err)
@ -364,9 +359,9 @@ func (c *Cluster) syncStatefulSet() error {
} }
// TODO why is this necessary? // TODO why is this necessary?
c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations))) // c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations)))
if !podsRollingUpdateRequired && !c.OpConfig.EnableLazySpiloUpgrade { if len(podsToRecreate) == 0 && !c.OpConfig.EnableLazySpiloUpgrade {
// even if the desired and the running statefulsets match // even if the desired and the running statefulsets match
// there still may be not up-to-date pods on condition // there still may be not up-to-date pods on condition
// (a) the lazy update was just disabled // (a) the lazy update was just disabled
@ -377,7 +372,6 @@ func (c *Cluster) syncStatefulSet() error {
stsImage := desiredSts.Spec.Template.Spec.Containers[0].Image stsImage := desiredSts.Spec.Template.Spec.Containers[0].Image
if stsImage != effectivePodImage { if stsImage != effectivePodImage {
podsRollingUpdateRequired = true
if err = c.markRollingUpdateFlagForPod(&pod, "pod not yet restarted due to lazy update"); err != nil { if err = c.markRollingUpdateFlagForPod(&pod, "pod not yet restarted due to lazy update"); err != nil {
c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err) c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err)
} }
@ -396,7 +390,7 @@ func (c *Cluster) syncStatefulSet() error {
// if we get here we also need to re-create the pods (either leftovers from the old // if we get here we also need to re-create the pods (either leftovers from the old
// statefulset or those that got their configuration from the outdated statefulset) // statefulset or those that got their configuration from the outdated statefulset)
if podsRollingUpdateRequired { if len(podsToRecreate) > 0 {
c.logger.Debugln("performing rolling update") c.logger.Debugln("performing rolling update")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
if err := c.recreatePods(podsToRecreate); err != nil { if err := c.recreatePods(podsToRecreate); err != nil {