From 0618723a6118341ea46fb4b6ed1cc91f72a601dd Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Mon, 9 Apr 2018 18:07:24 +0200 Subject: [PATCH 1/5] Check rolling updates using controller revisions. Compare pods controller revisions with the one for the statefulset to determine whether the pod is running the latest revision and, therefore, no rolling update is necessary. This is performed only during the operator start, afterwards the rolling update status that is stored locally in the cluster structure is used for all rolling update decisions. --- pkg/cluster/cluster.go | 23 ++++++++------- pkg/cluster/cluster_test.go | 4 +-- pkg/cluster/sync.go | 47 +++++++++++++++++------------- pkg/cluster/util.go | 57 +++++++++++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 32 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index bfb8e35d7..04a890c73 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -71,12 +71,13 @@ type Cluster struct { deleteOptions *metav1.DeleteOptions podEventsQueue *cache.FIFO - teamsAPIClient teams.Interface - oauthTokenGetter OAuthTokenGetter - KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? - currentProcess spec.Process - processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex - specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex + teamsAPIClient teams.Interface + oauthTokenGetter OAuthTokenGetter + KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? + currentProcess spec.Process + processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex + specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex + pendingRollingUpdate *bool // indicates the cluster needs a rolling update } type compareStatefulsetResult struct { @@ -109,10 +110,11 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service), Endpoints: make(map[PostgresRole]*v1.Endpoints)}, - userSyncStrategy: users.DefaultUserSyncStrategy{}, - deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, - podEventsQueue: podEventsQueue, - KubeClient: kubeClient, + userSyncStrategy: users.DefaultUserSyncStrategy{}, + deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, + podEventsQueue: podEventsQueue, + KubeClient: kubeClient, + pendingRollingUpdate: nil, } cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName()) cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) @@ -215,6 +217,7 @@ func (c *Cluster) Create() error { }() c.setStatus(spec.ClusterStatusCreating) + c.setPendingRollingUpgrade(false) for _, role := range []PostgresRole{Master, Replica} { diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 823d3baf9..34f64e655 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -34,8 +34,8 @@ func TestInitRobotUsers(t *testing.T) { { manifestUsers: map[string]spec.UserFlags{"foo": {"superuser", "createdb"}}, infraRoles: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}}, - result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}}, - err: nil, + result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}}, + err: nil, }, { manifestUsers: map[string]spec.UserFlags{"!fooBar": {"superuser", "createdb"}}, diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 5a77e658b..114eb91de 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -249,10 +249,17 @@ func (c *Cluster) syncStatefulSet() error { return nil } c.logger.Infof("found pods without the statefulset: trigger rolling update") + c.setPendingRollingUpgrade(true) } else { // statefulset is already there, make sure we use its definition in order to compare with the spec. c.Statefulset = sset + // resolve the pending rolling upgrade flags as soon as we read an actual statefulset from kubernetes. + // we must do it before updating statefulsets; after an update, the statfulset will receive a new + // updateRevision, different from the one the pods run with. + if err := c.resolvePendingRollingUpdate(sset); err != nil { + return fmt.Errorf("could not resolve the rolling upgrade status: %v", err) + } desiredSS, err := c.generateStatefulSet(&c.Spec) if err != nil { @@ -260,33 +267,33 @@ func (c *Cluster) syncStatefulSet() error { } cmp := c.compareStatefulSetWith(desiredSS) - if cmp.match { - return nil - } - c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) - - if !cmp.replace { - if err := c.updateStatefulSet(desiredSS); err != nil { - return fmt.Errorf("could not update statefulset: %v", err) + if !cmp.match { + if cmp.rollingUpdate { + c.setPendingRollingUpgrade(true) } - } else { - if err := c.replaceStatefulSet(desiredSS); err != nil { - return fmt.Errorf("could not replace statefulset: %v", err) - } - } + c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) - if !cmp.rollingUpdate { - c.logger.Debugln("no rolling update is needed") - return nil + if !cmp.replace { + if err := c.updateStatefulSet(desiredSS); err != nil { + return fmt.Errorf("could not update statefulset: %v", err) + } + } else { + if err := c.replaceStatefulSet(desiredSS); err != nil { + return fmt.Errorf("could not replace statefulset: %v", err) + } + } } } // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) - c.logger.Debugln("performing rolling update") - if err := c.recreatePods(); err != nil { - return fmt.Errorf("could not recreate pods: %v", err) + if *c.pendingRollingUpdate { + c.logger.Debugln("performing rolling update") + if err := c.recreatePods(); err != nil { + return fmt.Errorf("could not recreate pods: %v", err) + } + c.setPendingRollingUpgrade(false) + c.logger.Infof("pods have been recreated") } - c.logger.Infof("pods have been recreated") return nil } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 4a4f4e04a..5b6e8c66b 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -39,6 +39,10 @@ func NewSecretOauthTokenGetter(kubeClient *k8sutil.KubernetesClient, return &SecretOauthTokenGetter{kubeClient, OAuthTokenSecretName} } +const ( + podControllerRevisionHashLabel = "controller-revision-hash" +) + func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) { //TODO: we can move this function to the Controller in case it will be needed there. As for now we use it only in the Cluster // Temporary getting postgresql-operator secret from the NamespaceDefault @@ -430,3 +434,56 @@ func (c *Cluster) GetSpec() (*spec.Postgresql, error) { func (c *Cluster) patroniUsesKubernetes() bool { return c.OpConfig.EtcdHost == "" } + +func (c *Cluster) setPendingRollingUpgrade(val bool) { + if c.pendingRollingUpdate == nil { + c.pendingRollingUpdate = new(bool) + } + *c.pendingRollingUpdate = val + c.logger.Debugf("pending rolling upgrade was set to %b", val) +} + +// resolvePendingRollingUpdate figures out if rolling upgrade is necessary +// based on the states of the cluster statefulset and pods +func (c *Cluster) resolvePendingRollingUpdate(sset *v1beta1.StatefulSet) error { + // XXX: it looks like we will always trigger a rolling update if the + // pods are on a different revision from a statefulset, even if the + // statefulset change that caused it didn't require a rolling update + // originally. + if c.pendingRollingUpdate != nil { + return nil + } + c.logger.Debugf("evaluating rolling upgrade requirement") + effectiveRevision := sset.Status.UpdateRevision + if effectiveRevision == "" { + if sset.Status.CurrentRevision == "" { + c.logger.Debugf("statefulset doesn't have a current revision, no rolling upgrade") + // the statefulset does not have a currentRevision, it must be new; hence, no rollingUpdate + c.setPendingRollingUpgrade(false) + return nil + } + effectiveRevision = sset.Status.CurrentRevision + } + + // fetch all pods related to this cluster + pods, err := c.listPods() + if err != nil { + return err + } + // check their revisions + for _, pod := range pods { + podRevision, present := pod.Labels[podControllerRevisionHashLabel] + // empty or missing revision indicates a new pod - doesn't need a rolling upgrade + if !present || podRevision == "" { + continue + } + c.logger.Debugf("observing pod revision %q vs statefulset revision %q", podRevision, effectiveRevision) + if podRevision != effectiveRevision { + // pod is on a different revision - trigger the rolling upgrade + c.setPendingRollingUpgrade(true) + return nil + } + } + c.setPendingRollingUpgrade(false) + return nil +} From 1a20362c5bf7c257e9fb58bce9b6c654095595fb Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Thu, 3 May 2018 19:19:52 +0200 Subject: [PATCH 2/5] Initial implementation for the statefulset annotations indicating rolling updates. --- .gitignore | 1 + pkg/cluster/cluster.go | 34 +++++++++++++----------- pkg/cluster/resources.go | 50 ++++++++++++++++++++++++++++++++--- pkg/cluster/sync.go | 47 ++++++++++++++++++++------------- pkg/cluster/util.go | 57 ---------------------------------------- 5 files changed, 95 insertions(+), 94 deletions(-) diff --git a/.gitignore b/.gitignore index 19fdc3bb1..ad08aa383 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ # Folders _obj _test +_manifests # Architecture specific extensions/prefixes *.[568vq] diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index db39e9068..65fca3417 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -72,13 +72,12 @@ type Cluster struct { deleteOptions *metav1.DeleteOptions podEventsQueue *cache.FIFO - teamsAPIClient teams.Interface - oauthTokenGetter OAuthTokenGetter - KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? - currentProcess spec.Process - processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex - specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex - pendingRollingUpdate *bool // indicates the cluster needs a rolling update + teamsAPIClient teams.Interface + oauthTokenGetter OAuthTokenGetter + KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? + currentProcess spec.Process + processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex + specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex } type compareStatefulsetResult struct { @@ -111,11 +110,10 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service), Endpoints: make(map[PostgresRole]*v1.Endpoints)}, - userSyncStrategy: users.DefaultUserSyncStrategy{}, - deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, - podEventsQueue: podEventsQueue, - KubeClient: kubeClient, - pendingRollingUpdate: nil, + userSyncStrategy: users.DefaultUserSyncStrategy{}, + deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, + podEventsQueue: podEventsQueue, + KubeClient: kubeClient, } cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName()) cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) @@ -251,7 +249,6 @@ func (c *Cluster) Create() error { }() c.setStatus(spec.ClusterStatusCreating) - c.setPendingRollingUpgrade(false) for _, role := range []PostgresRole{Master, Replica} { @@ -301,7 +298,7 @@ func (c *Cluster) Create() error { if c.Statefulset != nil { return fmt.Errorf("statefulset already exists in the cluster") } - ss, err = c.createStatefulSet() + ss, err = c.createStatefulSet(false) if err != nil { return fmt.Errorf("could not create statefulset: %v", err) } @@ -345,6 +342,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp match = false reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one") } + if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) { + match = false + reasons = append(reasons, "new statefulset's annotations doesn't match the current one") + } if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) { needsRollUpdate = true reasons = append(reasons, "new statefulset's container specification doesn't match the current one") @@ -396,9 +397,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp } if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) { - needsRollUpdate = true + match = false needsReplace = true - reasons = append(reasons, "new statefulset's metadata annotations doesn't match the current one") + needsRollUpdate = true + reasons = append(reasons, "new statefulset's pod template metadata annotations doesn't match the current one") } if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) { needsReplace = true diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 7a602c73d..a4135a534 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -17,6 +17,10 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) +const ( + RollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update" +) + func (c *Cluster) listResources() error { if c.PodDisruptionBudget != nil { c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) @@ -59,9 +63,38 @@ func (c *Cluster) listResources() error { return nil } -func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { +func setRollingUpdateFlag(sset *v1beta1.StatefulSet, val bool) { + anno := sset.GetAnnotations() + fmt.Printf("rolling upgrade flag has been set to %t", val) + if anno == nil { + anno = make(map[string]string) + } + anno[RollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val) + sset.SetAnnotations(anno) +} + +func getRollingUpdateFlag(sset *v1beta1.StatefulSet, defaultValue bool) (flag bool) { + anno := sset.GetAnnotations() + flag = defaultValue + + stringFlag, exists := anno[RollingUpdateStatefulsetAnnotationKey] + if exists { + var err error + if flag, err = strconv.ParseBool(stringFlag); err != nil { + fmt.Printf("error when parsing %s annotation for the statefulset %s: expected boolean value, got %s\n", + RollingUpdateStatefulsetAnnotationKey, + types.NamespacedName{sset.Namespace, sset.Name}, + stringFlag) + flag = defaultValue + } + } + return flag +} + +func (c *Cluster) createStatefulSet(pendingRollingUpgrade bool) (*v1beta1.StatefulSet, error) { c.setProcessName("creating statefulset") statefulSetSpec, err := c.generateStatefulSet(&c.Spec) + setRollingUpdateFlag(statefulSetSpec, pendingRollingUpgrade) if err != nil { return nil, fmt.Errorf("could not generate statefulset: %v", err) } @@ -128,7 +161,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error { return nil } -func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { +func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet, includeAnnotations bool) error { c.setProcessName("updating statefulset") if c.Statefulset == nil { return fmt.Errorf("there is no statefulset in the cluster") @@ -153,8 +186,19 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { types.MergePatchType, patchData, "") if err != nil { - return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err) + return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err) } + if includeAnnotations && newStatefulSet.Annotations != nil { + patchData := metadataAnnotationsPatch(newStatefulSet.Annotations) + statefulSet, err = c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( + c.Statefulset.Name, + types.StrategicMergePatchType, + []byte(patchData), "") + if err != nil { + return fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err) + } + } + c.Statefulset = statefulSet return nil diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 1064f124c..1c1fcba61 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -220,7 +220,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { } func (c *Cluster) syncStatefulSet() error { - + var ( + cachedRollingUpdateFlag, podsRollingUpdateRequired bool + ) sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { @@ -234,7 +236,8 @@ func (c *Cluster) syncStatefulSet() error { return fmt.Errorf("could not list pods of the statefulset: %v", err) } - sset, err = c.createStatefulSet() + podsRollingUpdateRequired := (len(pods) > 0) + sset, err = c.createStatefulSet(podsRollingUpdateRequired) if err != nil { return fmt.Errorf("could not create missing statefulset: %v", err) } @@ -244,36 +247,42 @@ func (c *Cluster) syncStatefulSet() error { } c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) - if len(pods) <= 0 { - return nil - } - c.logger.Infof("found pods without the statefulset: trigger rolling update") - c.setPendingRollingUpgrade(true) } else { + if c.Statefulset != nil { + // if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update + // the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying + // on the 'cached' in-memory flag. + cachedRollingUpdateFlag = getRollingUpdateFlag(c.Statefulset, true) + c.logger.Debugf("cached statefulset value exists, rollingUpdate flag is %t", cachedRollingUpdateFlag) + } // statefulset is already there, make sure we use its definition in order to compare with the spec. c.Statefulset = sset - // resolve the pending rolling upgrade flags as soon as we read an actual statefulset from kubernetes. - // we must do it before updating statefulsets; after an update, the statfulset will receive a new - // updateRevision, different from the one the pods run with. - if err := c.resolvePendingRollingUpdate(sset); err != nil { - return fmt.Errorf("could not resolve the rolling upgrade status: %v", err) + if podsRollingUpdateRequired = getRollingUpdateFlag(c.Statefulset, false); podsRollingUpdateRequired { + if cachedRollingUpdateFlag { + c.logger.Infof("found a statefulset with an unfinished pods rolling update") + } else { + c.logger.Infof("clearing the rolling update flag based on the cached information") + podsRollingUpdateRequired = false + } } desiredSS, err := c.generateStatefulSet(&c.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } + setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired) cmp := c.compareStatefulSetWith(desiredSS) if !cmp.match { - if cmp.rollingUpdate { - c.setPendingRollingUpgrade(true) + if cmp.rollingUpdate && !podsRollingUpdateRequired { + podsRollingUpdateRequired = true + setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired) } c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) if !cmp.replace { - if err := c.updateStatefulSet(desiredSS); err != nil { + if err := c.updateStatefulSet(desiredSS, true); err != nil { return fmt.Errorf("could not update statefulset: %v", err) } } else { @@ -285,15 +294,17 @@ func (c *Cluster) syncStatefulSet() error { } // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) - if *c.pendingRollingUpdate { + if podsRollingUpdateRequired { c.logger.Debugln("performing rolling update") if err := c.recreatePods(); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } - c.setPendingRollingUpgrade(false) c.logger.Infof("pods have been recreated") + setRollingUpdateFlag(c.Statefulset, false) + if err := c.updateStatefulSet(c.Statefulset, true); err != nil { + c.logger.Warningf("could not clear rolling update for the statefulset") + } } - return nil } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 4a493c5d1..b72835311 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -39,10 +39,6 @@ func NewSecretOauthTokenGetter(kubeClient *k8sutil.KubernetesClient, return &SecretOauthTokenGetter{kubeClient, OAuthTokenSecretName} } -const ( - podControllerRevisionHashLabel = "controller-revision-hash" -) - func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) { //TODO: we can move this function to the Controller in case it will be needed there. As for now we use it only in the Cluster // Temporary getting postgresql-operator secret from the NamespaceDefault @@ -462,56 +458,3 @@ func (c *Cluster) GetSpec() (*spec.Postgresql, error) { func (c *Cluster) patroniUsesKubernetes() bool { return c.OpConfig.EtcdHost == "" } - -func (c *Cluster) setPendingRollingUpgrade(val bool) { - if c.pendingRollingUpdate == nil { - c.pendingRollingUpdate = new(bool) - } - *c.pendingRollingUpdate = val - c.logger.Debugf("pending rolling upgrade was set to %b", val) -} - -// resolvePendingRollingUpdate figures out if rolling upgrade is necessary -// based on the states of the cluster statefulset and pods -func (c *Cluster) resolvePendingRollingUpdate(sset *v1beta1.StatefulSet) error { - // XXX: it looks like we will always trigger a rolling update if the - // pods are on a different revision from a statefulset, even if the - // statefulset change that caused it didn't require a rolling update - // originally. - if c.pendingRollingUpdate != nil { - return nil - } - c.logger.Debugf("evaluating rolling upgrade requirement") - effectiveRevision := sset.Status.UpdateRevision - if effectiveRevision == "" { - if sset.Status.CurrentRevision == "" { - c.logger.Debugf("statefulset doesn't have a current revision, no rolling upgrade") - // the statefulset does not have a currentRevision, it must be new; hence, no rollingUpdate - c.setPendingRollingUpgrade(false) - return nil - } - effectiveRevision = sset.Status.CurrentRevision - } - - // fetch all pods related to this cluster - pods, err := c.listPods() - if err != nil { - return err - } - // check their revisions - for _, pod := range pods { - podRevision, present := pod.Labels[podControllerRevisionHashLabel] - // empty or missing revision indicates a new pod - doesn't need a rolling upgrade - if !present || podRevision == "" { - continue - } - c.logger.Debugf("observing pod revision %q vs statefulset revision %q", podRevision, effectiveRevision) - if podRevision != effectiveRevision { - // pod is on a different revision - trigger the rolling upgrade - c.setPendingRollingUpgrade(true) - return nil - } - } - c.setPendingRollingUpgrade(false) - return nil -} From ce0d4af91ce8a2e8b52bc79febb71f9f2f4cbaed Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Thu, 3 May 2018 19:19:52 +0200 Subject: [PATCH 3/5] Initial implementation for the statefulset annotations indicating rolling updates. --- .gitignore | 1 + pkg/cluster/cluster.go | 34 +++++++++++++----------- pkg/cluster/resources.go | 50 ++++++++++++++++++++++++++++++++--- pkg/cluster/sync.go | 47 ++++++++++++++++++++------------- pkg/cluster/util.go | 57 ---------------------------------------- 5 files changed, 95 insertions(+), 94 deletions(-) diff --git a/.gitignore b/.gitignore index 19fdc3bb1..ad08aa383 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ # Folders _obj _test +_manifests # Architecture specific extensions/prefixes *.[568vq] diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index db39e9068..65fca3417 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -72,13 +72,12 @@ type Cluster struct { deleteOptions *metav1.DeleteOptions podEventsQueue *cache.FIFO - teamsAPIClient teams.Interface - oauthTokenGetter OAuthTokenGetter - KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? - currentProcess spec.Process - processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex - specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex - pendingRollingUpdate *bool // indicates the cluster needs a rolling update + teamsAPIClient teams.Interface + oauthTokenGetter OAuthTokenGetter + KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? + currentProcess spec.Process + processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex + specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex } type compareStatefulsetResult struct { @@ -111,11 +110,10 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service), Endpoints: make(map[PostgresRole]*v1.Endpoints)}, - userSyncStrategy: users.DefaultUserSyncStrategy{}, - deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, - podEventsQueue: podEventsQueue, - KubeClient: kubeClient, - pendingRollingUpdate: nil, + userSyncStrategy: users.DefaultUserSyncStrategy{}, + deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, + podEventsQueue: podEventsQueue, + KubeClient: kubeClient, } cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName()) cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) @@ -251,7 +249,6 @@ func (c *Cluster) Create() error { }() c.setStatus(spec.ClusterStatusCreating) - c.setPendingRollingUpgrade(false) for _, role := range []PostgresRole{Master, Replica} { @@ -301,7 +298,7 @@ func (c *Cluster) Create() error { if c.Statefulset != nil { return fmt.Errorf("statefulset already exists in the cluster") } - ss, err = c.createStatefulSet() + ss, err = c.createStatefulSet(false) if err != nil { return fmt.Errorf("could not create statefulset: %v", err) } @@ -345,6 +342,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp match = false reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one") } + if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) { + match = false + reasons = append(reasons, "new statefulset's annotations doesn't match the current one") + } if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) { needsRollUpdate = true reasons = append(reasons, "new statefulset's container specification doesn't match the current one") @@ -396,9 +397,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp } if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) { - needsRollUpdate = true + match = false needsReplace = true - reasons = append(reasons, "new statefulset's metadata annotations doesn't match the current one") + needsRollUpdate = true + reasons = append(reasons, "new statefulset's pod template metadata annotations doesn't match the current one") } if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) { needsReplace = true diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 7a602c73d..a4135a534 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -17,6 +17,10 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) +const ( + RollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update" +) + func (c *Cluster) listResources() error { if c.PodDisruptionBudget != nil { c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) @@ -59,9 +63,38 @@ func (c *Cluster) listResources() error { return nil } -func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { +func setRollingUpdateFlag(sset *v1beta1.StatefulSet, val bool) { + anno := sset.GetAnnotations() + fmt.Printf("rolling upgrade flag has been set to %t", val) + if anno == nil { + anno = make(map[string]string) + } + anno[RollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val) + sset.SetAnnotations(anno) +} + +func getRollingUpdateFlag(sset *v1beta1.StatefulSet, defaultValue bool) (flag bool) { + anno := sset.GetAnnotations() + flag = defaultValue + + stringFlag, exists := anno[RollingUpdateStatefulsetAnnotationKey] + if exists { + var err error + if flag, err = strconv.ParseBool(stringFlag); err != nil { + fmt.Printf("error when parsing %s annotation for the statefulset %s: expected boolean value, got %s\n", + RollingUpdateStatefulsetAnnotationKey, + types.NamespacedName{sset.Namespace, sset.Name}, + stringFlag) + flag = defaultValue + } + } + return flag +} + +func (c *Cluster) createStatefulSet(pendingRollingUpgrade bool) (*v1beta1.StatefulSet, error) { c.setProcessName("creating statefulset") statefulSetSpec, err := c.generateStatefulSet(&c.Spec) + setRollingUpdateFlag(statefulSetSpec, pendingRollingUpgrade) if err != nil { return nil, fmt.Errorf("could not generate statefulset: %v", err) } @@ -128,7 +161,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error { return nil } -func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { +func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet, includeAnnotations bool) error { c.setProcessName("updating statefulset") if c.Statefulset == nil { return fmt.Errorf("there is no statefulset in the cluster") @@ -153,8 +186,19 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { types.MergePatchType, patchData, "") if err != nil { - return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err) + return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err) } + if includeAnnotations && newStatefulSet.Annotations != nil { + patchData := metadataAnnotationsPatch(newStatefulSet.Annotations) + statefulSet, err = c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( + c.Statefulset.Name, + types.StrategicMergePatchType, + []byte(patchData), "") + if err != nil { + return fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err) + } + } + c.Statefulset = statefulSet return nil diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 1064f124c..1c1fcba61 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -220,7 +220,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { } func (c *Cluster) syncStatefulSet() error { - + var ( + cachedRollingUpdateFlag, podsRollingUpdateRequired bool + ) sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { @@ -234,7 +236,8 @@ func (c *Cluster) syncStatefulSet() error { return fmt.Errorf("could not list pods of the statefulset: %v", err) } - sset, err = c.createStatefulSet() + podsRollingUpdateRequired := (len(pods) > 0) + sset, err = c.createStatefulSet(podsRollingUpdateRequired) if err != nil { return fmt.Errorf("could not create missing statefulset: %v", err) } @@ -244,36 +247,42 @@ func (c *Cluster) syncStatefulSet() error { } c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) - if len(pods) <= 0 { - return nil - } - c.logger.Infof("found pods without the statefulset: trigger rolling update") - c.setPendingRollingUpgrade(true) } else { + if c.Statefulset != nil { + // if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update + // the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying + // on the 'cached' in-memory flag. + cachedRollingUpdateFlag = getRollingUpdateFlag(c.Statefulset, true) + c.logger.Debugf("cached statefulset value exists, rollingUpdate flag is %t", cachedRollingUpdateFlag) + } // statefulset is already there, make sure we use its definition in order to compare with the spec. c.Statefulset = sset - // resolve the pending rolling upgrade flags as soon as we read an actual statefulset from kubernetes. - // we must do it before updating statefulsets; after an update, the statfulset will receive a new - // updateRevision, different from the one the pods run with. - if err := c.resolvePendingRollingUpdate(sset); err != nil { - return fmt.Errorf("could not resolve the rolling upgrade status: %v", err) + if podsRollingUpdateRequired = getRollingUpdateFlag(c.Statefulset, false); podsRollingUpdateRequired { + if cachedRollingUpdateFlag { + c.logger.Infof("found a statefulset with an unfinished pods rolling update") + } else { + c.logger.Infof("clearing the rolling update flag based on the cached information") + podsRollingUpdateRequired = false + } } desiredSS, err := c.generateStatefulSet(&c.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } + setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired) cmp := c.compareStatefulSetWith(desiredSS) if !cmp.match { - if cmp.rollingUpdate { - c.setPendingRollingUpgrade(true) + if cmp.rollingUpdate && !podsRollingUpdateRequired { + podsRollingUpdateRequired = true + setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired) } c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) if !cmp.replace { - if err := c.updateStatefulSet(desiredSS); err != nil { + if err := c.updateStatefulSet(desiredSS, true); err != nil { return fmt.Errorf("could not update statefulset: %v", err) } } else { @@ -285,15 +294,17 @@ func (c *Cluster) syncStatefulSet() error { } // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) - if *c.pendingRollingUpdate { + if podsRollingUpdateRequired { c.logger.Debugln("performing rolling update") if err := c.recreatePods(); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } - c.setPendingRollingUpgrade(false) c.logger.Infof("pods have been recreated") + setRollingUpdateFlag(c.Statefulset, false) + if err := c.updateStatefulSet(c.Statefulset, true); err != nil { + c.logger.Warningf("could not clear rolling update for the statefulset") + } } - return nil } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 4a493c5d1..b72835311 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -39,10 +39,6 @@ func NewSecretOauthTokenGetter(kubeClient *k8sutil.KubernetesClient, return &SecretOauthTokenGetter{kubeClient, OAuthTokenSecretName} } -const ( - podControllerRevisionHashLabel = "controller-revision-hash" -) - func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) { //TODO: we can move this function to the Controller in case it will be needed there. As for now we use it only in the Cluster // Temporary getting postgresql-operator secret from the NamespaceDefault @@ -462,56 +458,3 @@ func (c *Cluster) GetSpec() (*spec.Postgresql, error) { func (c *Cluster) patroniUsesKubernetes() bool { return c.OpConfig.EtcdHost == "" } - -func (c *Cluster) setPendingRollingUpgrade(val bool) { - if c.pendingRollingUpdate == nil { - c.pendingRollingUpdate = new(bool) - } - *c.pendingRollingUpdate = val - c.logger.Debugf("pending rolling upgrade was set to %b", val) -} - -// resolvePendingRollingUpdate figures out if rolling upgrade is necessary -// based on the states of the cluster statefulset and pods -func (c *Cluster) resolvePendingRollingUpdate(sset *v1beta1.StatefulSet) error { - // XXX: it looks like we will always trigger a rolling update if the - // pods are on a different revision from a statefulset, even if the - // statefulset change that caused it didn't require a rolling update - // originally. - if c.pendingRollingUpdate != nil { - return nil - } - c.logger.Debugf("evaluating rolling upgrade requirement") - effectiveRevision := sset.Status.UpdateRevision - if effectiveRevision == "" { - if sset.Status.CurrentRevision == "" { - c.logger.Debugf("statefulset doesn't have a current revision, no rolling upgrade") - // the statefulset does not have a currentRevision, it must be new; hence, no rollingUpdate - c.setPendingRollingUpgrade(false) - return nil - } - effectiveRevision = sset.Status.CurrentRevision - } - - // fetch all pods related to this cluster - pods, err := c.listPods() - if err != nil { - return err - } - // check their revisions - for _, pod := range pods { - podRevision, present := pod.Labels[podControllerRevisionHashLabel] - // empty or missing revision indicates a new pod - doesn't need a rolling upgrade - if !present || podRevision == "" { - continue - } - c.logger.Debugf("observing pod revision %q vs statefulset revision %q", podRevision, effectiveRevision) - if podRevision != effectiveRevision { - // pod is on a different revision - trigger the rolling upgrade - c.setPendingRollingUpgrade(true) - return nil - } - } - c.setPendingRollingUpgrade(false) - return nil -} From 11d568bf65ce83ba714a515d692ae4f64b3fdc98 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Tue, 15 May 2018 16:50:03 +0200 Subject: [PATCH 4/5] Address code review by @zerg-junior - new info messages, rename the annotation flag. --- pkg/cluster/resources.go | 2 +- pkg/cluster/sync.go | 1 + pkg/cluster/util.go | 6 +++++- pkg/util/k8sutil/k8sutil.go | 9 +++++++-- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 256a7acb9..1a1f3c69a 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -18,7 +18,7 @@ import ( ) const ( - RollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update" + RollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update-required" ) func (c *Cluster) listResources() error { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index d3bb567ff..48ceed992 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -247,6 +247,7 @@ func (c *Cluster) syncStatefulSet() error { podsRollingUpdateRequired = (len(pods) > 0) if podsRollingUpdateRequired { + c.logger.Warningf("found pods from the previous statefulset: trigger rolling update") c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired) } c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 5a5447918..516a59b9e 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -21,6 +21,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" + "reflect" ) // OAuthTokenGetter provides the method for fetching OAuth tokens @@ -172,7 +173,10 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate util.NameFromMeta(old.ObjectMeta), ) } - c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) + if !reflect.DeepEqual(old.Annotations, new.Annotations) { + c.logger.Debugf("metadata.annotation diff\n%s\n", util.PrettyDiff(old.Annotations, new.Annotations)) + } + c.logger.Debugf("spec diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) if len(reasons) > 0 { for _, reason := range reasons { diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index e9c899b82..142d4f822 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -131,8 +131,13 @@ func SameService(cur, new *v1.Service) (match bool, reason string) { oldELBAnnotation := cur.Annotations[constants.ElbTimeoutAnnotationName] newELBAnnotation := new.Annotations[constants.ElbTimeoutAnnotationName] - if oldDNSAnnotation != newDNSAnnotation || oldELBAnnotation != newELBAnnotation { - return false, fmt.Sprintf("new service's %q annotation doesn't match the current one", constants.ZalandoDNSNameAnnotation) + if oldDNSAnnotation != newDNSAnnotation { + return false, fmt.Sprintf("new service's %q annotation value %q doesn't match the current one %q", + constants.ZalandoDNSNameAnnotation, newDNSAnnotation, oldDNSAnnotation) + } + if oldELBAnnotation != newELBAnnotation { + return false, fmt.Sprintf("new service's %q annotation value %q doesn't match the current one %q", + constants.ElbTimeoutAnnotationName, oldELBAnnotation, newELBAnnotation) } return true, "" From cf800aef902677511ad42a96e3142f73c558e3c6 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Tue, 15 May 2018 16:53:12 +0200 Subject: [PATCH 5/5] Minor import fix --- pkg/cluster/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 516a59b9e..e7db26d82 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "math/rand" + "reflect" "sort" "strings" "time" @@ -21,7 +22,6 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" - "reflect" ) // OAuthTokenGetter provides the method for fetching OAuth tokens