From ce0d4af91ce8a2e8b52bc79febb71f9f2f4cbaed Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Thu, 3 May 2018 19:19:52 +0200 Subject: [PATCH] Initial implementation for the statefulset annotations indicating rolling updates. --- .gitignore | 1 + pkg/cluster/cluster.go | 34 +++++++++++++----------- pkg/cluster/resources.go | 50 ++++++++++++++++++++++++++++++++--- pkg/cluster/sync.go | 47 ++++++++++++++++++++------------- pkg/cluster/util.go | 57 ---------------------------------------- 5 files changed, 95 insertions(+), 94 deletions(-) diff --git a/.gitignore b/.gitignore index 19fdc3bb1..ad08aa383 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ # Folders _obj _test +_manifests # Architecture specific extensions/prefixes *.[568vq] diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index db39e9068..65fca3417 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -72,13 +72,12 @@ type Cluster struct { deleteOptions *metav1.DeleteOptions podEventsQueue *cache.FIFO - teamsAPIClient teams.Interface - oauthTokenGetter OAuthTokenGetter - KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? - currentProcess spec.Process - processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex - specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex - pendingRollingUpdate *bool // indicates the cluster needs a rolling update + teamsAPIClient teams.Interface + oauthTokenGetter OAuthTokenGetter + KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? + currentProcess spec.Process + processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex + specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex } type compareStatefulsetResult struct { @@ -111,11 +110,10 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service), Endpoints: make(map[PostgresRole]*v1.Endpoints)}, - userSyncStrategy: users.DefaultUserSyncStrategy{}, - deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, - podEventsQueue: podEventsQueue, - KubeClient: kubeClient, - pendingRollingUpdate: nil, + userSyncStrategy: users.DefaultUserSyncStrategy{}, + deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, + podEventsQueue: podEventsQueue, + KubeClient: kubeClient, } cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName()) cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) @@ -251,7 +249,6 @@ func (c *Cluster) Create() error { }() c.setStatus(spec.ClusterStatusCreating) - c.setPendingRollingUpgrade(false) for _, role := range []PostgresRole{Master, Replica} { @@ -301,7 +298,7 @@ func (c *Cluster) Create() error { if c.Statefulset != nil { return fmt.Errorf("statefulset already exists in the cluster") } - ss, err = c.createStatefulSet() + ss, err = c.createStatefulSet(false) if err != nil { return fmt.Errorf("could not create statefulset: %v", err) } @@ -345,6 +342,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp match = false reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one") } + if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) { + match = false + reasons = append(reasons, "new statefulset's annotations doesn't match the current one") + } if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) { needsRollUpdate = true reasons = append(reasons, "new statefulset's container specification doesn't match the current one") @@ -396,9 +397,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp } if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) { - needsRollUpdate = true + match = false needsReplace = true - reasons = append(reasons, "new statefulset's metadata annotations doesn't match the current one") + needsRollUpdate = true + reasons = append(reasons, "new statefulset's pod template metadata annotations doesn't match the current one") } if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) { needsReplace = true diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 7a602c73d..a4135a534 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -17,6 +17,10 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) +const ( + RollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update" +) + func (c *Cluster) listResources() error { if c.PodDisruptionBudget != nil { c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) @@ -59,9 +63,38 @@ func (c *Cluster) listResources() error { return nil } -func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { +func setRollingUpdateFlag(sset *v1beta1.StatefulSet, val bool) { + anno := sset.GetAnnotations() + fmt.Printf("rolling upgrade flag has been set to %t", val) + if anno == nil { + anno = make(map[string]string) + } + anno[RollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val) + sset.SetAnnotations(anno) +} + +func getRollingUpdateFlag(sset *v1beta1.StatefulSet, defaultValue bool) (flag bool) { + anno := sset.GetAnnotations() + flag = defaultValue + + stringFlag, exists := anno[RollingUpdateStatefulsetAnnotationKey] + if exists { + var err error + if flag, err = strconv.ParseBool(stringFlag); err != nil { + fmt.Printf("error when parsing %s annotation for the statefulset %s: expected boolean value, got %s\n", + RollingUpdateStatefulsetAnnotationKey, + types.NamespacedName{sset.Namespace, sset.Name}, + stringFlag) + flag = defaultValue + } + } + return flag +} + +func (c *Cluster) createStatefulSet(pendingRollingUpgrade bool) (*v1beta1.StatefulSet, error) { c.setProcessName("creating statefulset") statefulSetSpec, err := c.generateStatefulSet(&c.Spec) + setRollingUpdateFlag(statefulSetSpec, pendingRollingUpgrade) if err != nil { return nil, fmt.Errorf("could not generate statefulset: %v", err) } @@ -128,7 +161,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error { return nil } -func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { +func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet, includeAnnotations bool) error { c.setProcessName("updating statefulset") if c.Statefulset == nil { return fmt.Errorf("there is no statefulset in the cluster") @@ -153,8 +186,19 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { types.MergePatchType, patchData, "") if err != nil { - return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err) + return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err) } + if includeAnnotations && newStatefulSet.Annotations != nil { + patchData := metadataAnnotationsPatch(newStatefulSet.Annotations) + statefulSet, err = c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( + c.Statefulset.Name, + types.StrategicMergePatchType, + []byte(patchData), "") + if err != nil { + return fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err) + } + } + c.Statefulset = statefulSet return nil diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 1064f124c..1c1fcba61 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -220,7 +220,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { } func (c *Cluster) syncStatefulSet() error { - + var ( + cachedRollingUpdateFlag, podsRollingUpdateRequired bool + ) sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { @@ -234,7 +236,8 @@ func (c *Cluster) syncStatefulSet() error { return fmt.Errorf("could not list pods of the statefulset: %v", err) } - sset, err = c.createStatefulSet() + podsRollingUpdateRequired := (len(pods) > 0) + sset, err = c.createStatefulSet(podsRollingUpdateRequired) if err != nil { return fmt.Errorf("could not create missing statefulset: %v", err) } @@ -244,36 +247,42 @@ func (c *Cluster) syncStatefulSet() error { } c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) - if len(pods) <= 0 { - return nil - } - c.logger.Infof("found pods without the statefulset: trigger rolling update") - c.setPendingRollingUpgrade(true) } else { + if c.Statefulset != nil { + // if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update + // the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying + // on the 'cached' in-memory flag. + cachedRollingUpdateFlag = getRollingUpdateFlag(c.Statefulset, true) + c.logger.Debugf("cached statefulset value exists, rollingUpdate flag is %t", cachedRollingUpdateFlag) + } // statefulset is already there, make sure we use its definition in order to compare with the spec. c.Statefulset = sset - // resolve the pending rolling upgrade flags as soon as we read an actual statefulset from kubernetes. - // we must do it before updating statefulsets; after an update, the statfulset will receive a new - // updateRevision, different from the one the pods run with. - if err := c.resolvePendingRollingUpdate(sset); err != nil { - return fmt.Errorf("could not resolve the rolling upgrade status: %v", err) + if podsRollingUpdateRequired = getRollingUpdateFlag(c.Statefulset, false); podsRollingUpdateRequired { + if cachedRollingUpdateFlag { + c.logger.Infof("found a statefulset with an unfinished pods rolling update") + } else { + c.logger.Infof("clearing the rolling update flag based on the cached information") + podsRollingUpdateRequired = false + } } desiredSS, err := c.generateStatefulSet(&c.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } + setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired) cmp := c.compareStatefulSetWith(desiredSS) if !cmp.match { - if cmp.rollingUpdate { - c.setPendingRollingUpgrade(true) + if cmp.rollingUpdate && !podsRollingUpdateRequired { + podsRollingUpdateRequired = true + setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired) } c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) if !cmp.replace { - if err := c.updateStatefulSet(desiredSS); err != nil { + if err := c.updateStatefulSet(desiredSS, true); err != nil { return fmt.Errorf("could not update statefulset: %v", err) } } else { @@ -285,15 +294,17 @@ func (c *Cluster) syncStatefulSet() error { } // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) - if *c.pendingRollingUpdate { + if podsRollingUpdateRequired { c.logger.Debugln("performing rolling update") if err := c.recreatePods(); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } - c.setPendingRollingUpgrade(false) c.logger.Infof("pods have been recreated") + setRollingUpdateFlag(c.Statefulset, false) + if err := c.updateStatefulSet(c.Statefulset, true); err != nil { + c.logger.Warningf("could not clear rolling update for the statefulset") + } } - return nil } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 4a493c5d1..b72835311 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -39,10 +39,6 @@ func NewSecretOauthTokenGetter(kubeClient *k8sutil.KubernetesClient, return &SecretOauthTokenGetter{kubeClient, OAuthTokenSecretName} } -const ( - podControllerRevisionHashLabel = "controller-revision-hash" -) - func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) { //TODO: we can move this function to the Controller in case it will be needed there. As for now we use it only in the Cluster // Temporary getting postgresql-operator secret from the NamespaceDefault @@ -462,56 +458,3 @@ func (c *Cluster) GetSpec() (*spec.Postgresql, error) { func (c *Cluster) patroniUsesKubernetes() bool { return c.OpConfig.EtcdHost == "" } - -func (c *Cluster) setPendingRollingUpgrade(val bool) { - if c.pendingRollingUpdate == nil { - c.pendingRollingUpdate = new(bool) - } - *c.pendingRollingUpdate = val - c.logger.Debugf("pending rolling upgrade was set to %b", val) -} - -// resolvePendingRollingUpdate figures out if rolling upgrade is necessary -// based on the states of the cluster statefulset and pods -func (c *Cluster) resolvePendingRollingUpdate(sset *v1beta1.StatefulSet) error { - // XXX: it looks like we will always trigger a rolling update if the - // pods are on a different revision from a statefulset, even if the - // statefulset change that caused it didn't require a rolling update - // originally. - if c.pendingRollingUpdate != nil { - return nil - } - c.logger.Debugf("evaluating rolling upgrade requirement") - effectiveRevision := sset.Status.UpdateRevision - if effectiveRevision == "" { - if sset.Status.CurrentRevision == "" { - c.logger.Debugf("statefulset doesn't have a current revision, no rolling upgrade") - // the statefulset does not have a currentRevision, it must be new; hence, no rollingUpdate - c.setPendingRollingUpgrade(false) - return nil - } - effectiveRevision = sset.Status.CurrentRevision - } - - // fetch all pods related to this cluster - pods, err := c.listPods() - if err != nil { - return err - } - // check their revisions - for _, pod := range pods { - podRevision, present := pod.Labels[podControllerRevisionHashLabel] - // empty or missing revision indicates a new pod - doesn't need a rolling upgrade - if !present || podRevision == "" { - continue - } - c.logger.Debugf("observing pod revision %q vs statefulset revision %q", podRevision, effectiveRevision) - if podRevision != effectiveRevision { - // pod is on a different revision - trigger the rolling upgrade - c.setPendingRollingUpgrade(true) - return nil - } - } - c.setPendingRollingUpgrade(false) - return nil -}