steer rolling update via pod annotation
This commit is contained in:
		
							parent
							
								
									d150a477f0
								
							
						
					
					
						commit
						4421724848
					
				| 
						 | 
					@ -588,7 +588,7 @@ class EndToEndTestCase(unittest.TestCase):
 | 
				
			||||||
            k8s.update_config(unpatch_lazy_spilo_upgrade, step="patch lazy upgrade")
 | 
					            k8s.update_config(unpatch_lazy_spilo_upgrade, step="patch lazy upgrade")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # at this point operator will complete the normal rolling upgrade
 | 
					            # at this point operator will complete the normal rolling upgrade
 | 
				
			||||||
            # so we additonally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works
 | 
					            # so we additionally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works
 | 
				
			||||||
            self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0),
 | 
					            self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0),
 | 
				
			||||||
                                 conf_image, "Rolling upgrade was not executed",
 | 
					                                 conf_image, "Rolling upgrade was not executed",
 | 
				
			||||||
                                 50, 3)
 | 
					                                 50, 3)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -6,7 +6,6 @@ import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"path"
 | 
						"path"
 | 
				
			||||||
	"sort"
 | 
						"sort"
 | 
				
			||||||
	"strconv"
 | 
					 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/sirupsen/logrus"
 | 
						"github.com/sirupsen/logrus"
 | 
				
			||||||
| 
						 | 
					@ -1279,8 +1278,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stsAnnotations := make(map[string]string)
 | 
						stsAnnotations := make(map[string]string)
 | 
				
			||||||
	// TODO remove from sts
 | 
					 | 
				
			||||||
	stsAnnotations[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(false)
 | 
					 | 
				
			||||||
	stsAnnotations = c.AnnotationsToPropagate(c.annotationsSet(nil))
 | 
						stsAnnotations = c.AnnotationsToPropagate(c.annotationsSet(nil))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	statefulSet := &appsv1.StatefulSet{
 | 
						statefulSet := &appsv1.StatefulSet{
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -4,11 +4,13 @@ import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"math/rand"
 | 
						"math/rand"
 | 
				
			||||||
 | 
						"strconv"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	appsv1 "k8s.io/api/apps/v1"
 | 
						appsv1 "k8s.io/api/apps/v1"
 | 
				
			||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/types"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/zalando/postgres-operator/pkg/spec"
 | 
						"github.com/zalando/postgres-operator/pkg/spec"
 | 
				
			||||||
	"github.com/zalando/postgres-operator/pkg/util"
 | 
						"github.com/zalando/postgres-operator/pkg/util"
 | 
				
			||||||
| 
						 | 
					@ -45,6 +47,93 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) {
 | 
				
			||||||
	return pods.Items, nil
 | 
						return pods.Items, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// enableRollingUpdateFlagForPod sets the indicator for the rolling update requirement
 | 
				
			||||||
 | 
					// in the Pod annotation.
 | 
				
			||||||
 | 
					func (c *Cluster) enableRollingUpdateFlagForPod(pod v1.Pod, msg string) error {
 | 
				
			||||||
 | 
						c.logger.Debugf("enable rolling update annotation for %s: reason %s", pod.Name, msg)
 | 
				
			||||||
 | 
						flag := make(map[string]string)
 | 
				
			||||||
 | 
						flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						patchData, err := metaAnnotationsPatch(flag)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						_, err = c.KubeClient.Pods(pod.Namespace).Patch(
 | 
				
			||||||
 | 
							context.TODO(),
 | 
				
			||||||
 | 
							pod.Name,
 | 
				
			||||||
 | 
							types.MergePatchType,
 | 
				
			||||||
 | 
							[]byte(patchData),
 | 
				
			||||||
 | 
							metav1.PatchOptions{},
 | 
				
			||||||
 | 
							"")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("could not patch pod rolling update flag %q: %v", patchData, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// enableRollingUpdateFlagForPods sets the indicator for the rolling update requirement
 | 
				
			||||||
 | 
					// on pods that do not have it yet
 | 
				
			||||||
 | 
					func (c *Cluster) enableRollingUpdateFlagForPods(pods []v1.Pod, msg string) error {
 | 
				
			||||||
 | 
						for _, pod := range pods {
 | 
				
			||||||
 | 
							if c.getRollingUpdateFlagFromPod(&pod, true) {
 | 
				
			||||||
 | 
								if err := c.enableRollingUpdateFlagForPod(pod, msg); err != nil {
 | 
				
			||||||
 | 
									return fmt.Errorf("enabling rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the passed
 | 
				
			||||||
 | 
					// reverting to the default value in case of errors
 | 
				
			||||||
 | 
					func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod, defaultValue bool) (flag bool) {
 | 
				
			||||||
 | 
						anno := pod.GetAnnotations()
 | 
				
			||||||
 | 
						flag = defaultValue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						stringFlag, exists := anno[rollingUpdatePodAnnotationKey]
 | 
				
			||||||
 | 
						if exists {
 | 
				
			||||||
 | 
							var err error
 | 
				
			||||||
 | 
							if flag, err = strconv.ParseBool(stringFlag); err != nil {
 | 
				
			||||||
 | 
								c.logger.Warnf("error when parsing %q annotation for the pod %q: expected boolean value, got %q\n",
 | 
				
			||||||
 | 
									rollingUpdatePodAnnotationKey,
 | 
				
			||||||
 | 
									types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name},
 | 
				
			||||||
 | 
									stringFlag)
 | 
				
			||||||
 | 
								flag = defaultValue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return flag
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// countPodsWithRollingUpdateFlag returns the value of the rollingUpdate flag from the passed pod
 | 
				
			||||||
 | 
					// reverting to the default value in case of errors
 | 
				
			||||||
 | 
					func (c *Cluster) countPodsWithRollingUpdateFlag(defaultValue bool) (int, int) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pods, err := c.listPods()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							c.logger.Warnf("could not count pods with rolling update flag")
 | 
				
			||||||
 | 
							return 0, 0
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						desiredPodCount := c.Spec.NumberOfInstances
 | 
				
			||||||
 | 
						actualPodCount := len(pods)
 | 
				
			||||||
 | 
						podsToRollCount := int(desiredPodCount) - actualPodCount
 | 
				
			||||||
 | 
						if podsToRollCount < 0 {
 | 
				
			||||||
 | 
							podsToRollCount = 0
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, pod := range pods {
 | 
				
			||||||
 | 
							if c.getRollingUpdateFlagFromPod(&pod, defaultValue) {
 | 
				
			||||||
 | 
								podsToRollCount++
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return podsToRollCount, actualPodCount
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cluster) deletePods() error {
 | 
					func (c *Cluster) deletePods() error {
 | 
				
			||||||
	c.logger.Debugln("deleting pods")
 | 
						c.logger.Debugln("deleting pods")
 | 
				
			||||||
	pods, err := c.listPods()
 | 
						pods, err := c.listPods()
 | 
				
			||||||
| 
						 | 
					@ -364,27 +453,29 @@ func (c *Cluster) recreatePods() error {
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	replicas := make([]spec.NamespacedName, 0)
 | 
						replicas := make([]spec.NamespacedName, 0)
 | 
				
			||||||
	for i, pod := range pods.Items {
 | 
						for i, pod := range pods.Items {
 | 
				
			||||||
		role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
							// only recreate pod if rolling update flag is true
 | 
				
			||||||
 | 
							if c.getRollingUpdateFlagFromPod(&pod, false) {
 | 
				
			||||||
 | 
								role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if role == Master {
 | 
								if role == Master {
 | 
				
			||||||
			masterPod = &pods.Items[i]
 | 
									masterPod = &pods.Items[i]
 | 
				
			||||||
			continue
 | 
									continue
 | 
				
			||||||
		}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		podName := util.NameFromMeta(pods.Items[i].ObjectMeta)
 | 
								podName := util.NameFromMeta(pods.Items[i].ObjectMeta)
 | 
				
			||||||
		if newPod, err = c.recreatePod(podName); err != nil {
 | 
								if newPod, err = c.recreatePod(podName); err != nil {
 | 
				
			||||||
			return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
 | 
									return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
 | 
				
			||||||
		}
 | 
								}
 | 
				
			||||||
		if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica {
 | 
								if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica {
 | 
				
			||||||
			replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta))
 | 
									replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta))
 | 
				
			||||||
		} else if newRole == Master {
 | 
								} else if newRole == Master {
 | 
				
			||||||
			newMasterPod = newPod
 | 
									newMasterPod = newPod
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if masterPod != nil {
 | 
						if masterPod != nil {
 | 
				
			||||||
		// failover if we have not observed a master pod when re-creating former replicas.
 | 
							// failover if we have not observed a master pod when re-creating former replicas
 | 
				
			||||||
		// TODO if masterPod has no rolling update label anymore skip switchover too
 | 
					 | 
				
			||||||
		if newMasterPod == nil && len(replicas) > 0 {
 | 
							if newMasterPod == nil && len(replicas) > 0 {
 | 
				
			||||||
			if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil {
 | 
								if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil {
 | 
				
			||||||
				c.logger.Warningf("could not perform switch over: %v", err)
 | 
									c.logger.Warningf("could not perform switch over: %v", err)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -20,7 +20,7 @@ import (
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	// TODO rollingUpdatePodAnnotationKey = "zalando-postgres-operator-rolling-update-required"
 | 
						// TODO rollingUpdatePodAnnotationKey = "zalando-postgres-operator-rolling-update-required"
 | 
				
			||||||
	rollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update-required"
 | 
						rollingUpdatePodAnnotationKey = "zalando-postgres-operator-rolling-update-required"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cluster) listResources() error {
 | 
					func (c *Cluster) listResources() error {
 | 
				
			||||||
| 
						 | 
					@ -148,84 +148,32 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TODO setRollingUpdateFlagForPod
 | 
					 | 
				
			||||||
// setRollingUpdateFlagForStatefulSet sets the indicator or the rolling update requirement
 | 
					 | 
				
			||||||
// in the StatefulSet annotation.
 | 
					 | 
				
			||||||
func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *appsv1.StatefulSet, val bool, msg string) {
 | 
					 | 
				
			||||||
	anno := sset.GetAnnotations()
 | 
					 | 
				
			||||||
	if anno == nil {
 | 
					 | 
				
			||||||
		anno = make(map[string]string)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	anno[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val)
 | 
					 | 
				
			||||||
	sset.SetAnnotations(anno)
 | 
					 | 
				
			||||||
	c.logger.Debugf("set statefulset's rolling update annotation to %t: caller/reason %s", val, msg)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// TODO applyRollingUpdateFlagForPod
 | 
					 | 
				
			||||||
// applyRollingUpdateFlagforStatefulSet sets the rolling update flag for the cluster's StatefulSet
 | 
					 | 
				
			||||||
// and applies that setting to the actual running cluster.
 | 
					 | 
				
			||||||
func (c *Cluster) applyRollingUpdateFlagforStatefulSet(val bool) error {
 | 
					 | 
				
			||||||
	c.setRollingUpdateFlagForStatefulSet(c.Statefulset, val, "applyRollingUpdateFlag")
 | 
					 | 
				
			||||||
	sset, err := c.updateStatefulSetAnnotations(c.Statefulset.GetAnnotations())
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	c.Statefulset = sset
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// TODO getRollingUpdateFlagFromPod
 | 
					 | 
				
			||||||
// getRollingUpdateFlagFromStatefulSet returns the value of the rollingUpdate flag from the passed
 | 
					 | 
				
			||||||
// StatefulSet, reverting to the default value in case of errors
 | 
					 | 
				
			||||||
func (c *Cluster) getRollingUpdateFlagFromStatefulSet(sset *appsv1.StatefulSet, defaultValue bool) (flag bool) {
 | 
					 | 
				
			||||||
	anno := sset.GetAnnotations()
 | 
					 | 
				
			||||||
	flag = defaultValue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	stringFlag, exists := anno[rollingUpdateStatefulsetAnnotationKey]
 | 
					 | 
				
			||||||
	if exists {
 | 
					 | 
				
			||||||
		var err error
 | 
					 | 
				
			||||||
		if flag, err = strconv.ParseBool(stringFlag); err != nil {
 | 
					 | 
				
			||||||
			c.logger.Warnf("error when parsing %q annotation for the statefulset %q: expected boolean value, got %q\n",
 | 
					 | 
				
			||||||
				rollingUpdateStatefulsetAnnotationKey,
 | 
					 | 
				
			||||||
				types.NamespacedName{Namespace: sset.Namespace, Name: sset.Name},
 | 
					 | 
				
			||||||
				stringFlag)
 | 
					 | 
				
			||||||
			flag = defaultValue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return flag
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// TODO mergeRollingUpdateFlagUsingCache(runningPod *v1.Pod) bool {
 | 
					 | 
				
			||||||
// mergeRollingUpdateFlagUsingCache returns the value of the rollingUpdate flag from the passed
 | 
					// mergeRollingUpdateFlagUsingCache returns the value of the rollingUpdate flag from the passed
 | 
				
			||||||
// statefulset, however, the value can be cleared if there is a cached flag in the cluster that
 | 
					// statefulset, however, the value can be cleared if there is a cached flag in the cluster that
 | 
				
			||||||
// is set to false (the discrepancy could be a result of a failed StatefulSet update)
 | 
					// is set to false (the discrepancy could be a result of a failed StatefulSet update)
 | 
				
			||||||
func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *appsv1.StatefulSet) bool {
 | 
					func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *appsv1.StatefulSet) bool {
 | 
				
			||||||
	var (
 | 
						var (
 | 
				
			||||||
		cachedStatefulsetExists, clearRollingUpdateFromCache, podsRollingUpdateRequired bool
 | 
							cachedStatefulsetExists, podsRollingUpdateRequired bool
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if c.Statefulset != nil {
 | 
						if c.Statefulset != nil {
 | 
				
			||||||
		// if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update
 | 
							// if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update
 | 
				
			||||||
		// the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying
 | 
							// the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying
 | 
				
			||||||
		// on the 'cached' in-memory flag.
 | 
							// on the 'cached' in-memory flag.
 | 
				
			||||||
 | 
							c.logger.Debugf("cached StatefulSet value exists")
 | 
				
			||||||
		cachedStatefulsetExists = true
 | 
							cachedStatefulsetExists = true
 | 
				
			||||||
		clearRollingUpdateFromCache = !c.getRollingUpdateFlagFromStatefulSet(c.Statefulset, true)
 | 
					 | 
				
			||||||
		c.logger.Debugf("cached StatefulSet value exists, rollingUpdate flag is %t", clearRollingUpdateFromCache)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if podsRollingUpdateRequired = c.getRollingUpdateFlagFromStatefulSet(runningStatefulSet, false); podsRollingUpdateRequired {
 | 
						podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag(cachedStatefulsetExists)
 | 
				
			||||||
		if cachedStatefulsetExists && clearRollingUpdateFromCache {
 | 
					
 | 
				
			||||||
			c.logger.Infof("clearing the rolling update flag based on the cached information")
 | 
						if podsToRollCount > 0 {
 | 
				
			||||||
			podsRollingUpdateRequired = false
 | 
							c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount)
 | 
				
			||||||
		} else {
 | 
							podsRollingUpdateRequired = true
 | 
				
			||||||
			c.logger.Infof("found a statefulset with an unfinished rolling update of the pods")
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return podsRollingUpdateRequired
 | 
						return podsRollingUpdateRequired
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// updatePodAnnotations
 | 
					 | 
				
			||||||
func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) {
 | 
					func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) {
 | 
				
			||||||
	c.logger.Debugf("patching statefulset annotations")
 | 
						c.logger.Debugf("patching statefulset annotations")
 | 
				
			||||||
	patchData, err := metaAnnotationsPatch(annotations)
 | 
						patchData, err := metaAnnotationsPatch(annotations)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -280,6 +280,12 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
	var (
 | 
						var (
 | 
				
			||||||
		podsRollingUpdateRequired bool
 | 
							podsRollingUpdateRequired bool
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pods, err := c.listPods()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							c.logger.Infof("could not list pods of the statefulset: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
 | 
						// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
 | 
				
			||||||
	sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{})
 | 
						sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
| 
						 | 
					@ -289,10 +295,6 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
		// statefulset does not exist, try to re-create it
 | 
							// statefulset does not exist, try to re-create it
 | 
				
			||||||
		c.Statefulset = nil
 | 
							c.Statefulset = nil
 | 
				
			||||||
		c.logger.Infof("could not find the cluster's statefulset")
 | 
							c.logger.Infof("could not find the cluster's statefulset")
 | 
				
			||||||
		pods, err := c.listPods()
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return fmt.Errorf("could not list pods of the statefulset: %v", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		sset, err = c.createStatefulSet()
 | 
							sset, err = c.createStatefulSet()
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
| 
						 | 
					@ -305,58 +307,73 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		podsRollingUpdateRequired = (len(pods) > 0)
 | 
							podsRollingUpdateRequired = (len(pods) > 0)
 | 
				
			||||||
		if podsRollingUpdateRequired {
 | 
							if podsRollingUpdateRequired {
 | 
				
			||||||
			c.logger.Warningf("found pods from the previous statefulset: trigger rolling update")
 | 
								if err = c.enableRollingUpdateFlagForPods(pods, "pods from previous statefulset"); err != nil {
 | 
				
			||||||
			if err := c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired); err != nil {
 | 
									c.logger.Warnf("updating rolling update flag for existing pods failed: %v", err)
 | 
				
			||||||
				return fmt.Errorf("could not set rolling update flag for the statefulset: %v", err)
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
 | 
							c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		podsRollingUpdateRequired = c.mergeRollingUpdateFlagUsingCache(sset)
 | 
							// check if there are still pods with a rolling update flag
 | 
				
			||||||
 | 
							// default value for flag depends on a potentially cached StatefulSet
 | 
				
			||||||
 | 
							podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag(c.Statefulset != nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if podsToRollCount > 0 {
 | 
				
			||||||
 | 
								c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount)
 | 
				
			||||||
 | 
								podsRollingUpdateRequired = true
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// statefulset is already there, make sure we use its definition in order to compare with the spec.
 | 
							// statefulset is already there, make sure we use its definition in order to compare with the spec.
 | 
				
			||||||
		c.Statefulset = sset
 | 
							c.Statefulset = sset
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		desiredSS, err := c.generateStatefulSet(&c.Spec)
 | 
							desiredSts, err := c.generateStatefulSet(&c.Spec)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return fmt.Errorf("could not generate statefulset: %v", err)
 | 
								return fmt.Errorf("could not generate statefulset: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "from cache")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		cmp := c.compareStatefulSetWith(desiredSS)
 | 
							cmp := c.compareStatefulSetWith(desiredSts)
 | 
				
			||||||
		if !cmp.match {
 | 
							if !cmp.match {
 | 
				
			||||||
			if cmp.rollingUpdate && !podsRollingUpdateRequired {
 | 
								if cmp.rollingUpdate && !podsRollingUpdateRequired {
 | 
				
			||||||
				podsRollingUpdateRequired = true
 | 
									podsRollingUpdateRequired = cmp.rollingUpdate
 | 
				
			||||||
				c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "statefulset changes")
 | 
									if err = c.enableRollingUpdateFlagForPods(pods, "pod changes"); err != nil {
 | 
				
			||||||
 | 
										return fmt.Errorf("updating rolling update flag for pods failed: %v", err)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
 | 
								c.logStatefulSetChanges(c.Statefulset, desiredSts, false, cmp.reasons)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if !cmp.replace {
 | 
								if !cmp.replace {
 | 
				
			||||||
				if err := c.updateStatefulSet(desiredSS); err != nil {
 | 
									if err := c.updateStatefulSet(desiredSts); err != nil {
 | 
				
			||||||
					return fmt.Errorf("could not update statefulset: %v", err)
 | 
										return fmt.Errorf("could not update statefulset: %v", err)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			} else {
 | 
								} else {
 | 
				
			||||||
				if err := c.replaceStatefulSet(desiredSS); err != nil {
 | 
									if err := c.replaceStatefulSet(desiredSts); err != nil {
 | 
				
			||||||
					return fmt.Errorf("could not replace statefulset: %v", err)
 | 
										return fmt.Errorf("could not replace statefulset: %v", err)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// TODO why is this necessary?
 | 
				
			||||||
		c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations)))
 | 
							c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if !podsRollingUpdateRequired && !c.OpConfig.EnableLazySpiloUpgrade {
 | 
							if !podsRollingUpdateRequired && !c.OpConfig.EnableLazySpiloUpgrade {
 | 
				
			||||||
			// even if desired and actual statefulsets match
 | 
								// even if the desired and the running statefulsets match
 | 
				
			||||||
			// there still may be not up-to-date pods on condition
 | 
								// there still may be not up-to-date pods on condition
 | 
				
			||||||
			//  (a) the lazy update was just disabled
 | 
								//  (a) the lazy update was just disabled
 | 
				
			||||||
			// and
 | 
								// and
 | 
				
			||||||
			//  (b) some of the pods were not restarted when the lazy update was still in place
 | 
								//  (b) some of the pods were not restarted when the lazy update was still in place
 | 
				
			||||||
			podsRollingUpdateRequired, err = c.mustUpdatePodsAfterLazyUpdate(desiredSS)
 | 
								for _, pod := range pods {
 | 
				
			||||||
			if err != nil {
 | 
									effectivePodImage := pod.Spec.Containers[0].Image
 | 
				
			||||||
				return fmt.Errorf("could not list pods of the statefulset: %v", err)
 | 
									stsImage := desiredSts.Spec.Template.Spec.Containers[0].Image
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									if stsImage != effectivePodImage {
 | 
				
			||||||
 | 
										podsRollingUpdateRequired = true
 | 
				
			||||||
 | 
										if err = c.enableRollingUpdateFlagForPod(pod, "pod not yet restarted due to lazy update"); err != nil {
 | 
				
			||||||
 | 
											c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Apply special PostgreSQL parameters that can only be set via the Patroni API.
 | 
						// Apply special PostgreSQL parameters that can only be set via the Patroni API.
 | 
				
			||||||
| 
						 | 
					@ -376,9 +393,6 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		c.logger.Infof("pods have been recreated")
 | 
							c.logger.Infof("pods have been recreated")
 | 
				
			||||||
		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 | 
							c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 | 
				
			||||||
		if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil {
 | 
					 | 
				
			||||||
			c.logger.Warningf("could not clear rolling update for the statefulset: %v", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue