Merge pull request #286 from zalando-incubator/rolling_updates_with_statefulset_annotations

Annotate StatefulSets with pending rolling update status flag
This commit is contained in:
zerg-junior 2018-05-15 17:16:16 +02:00 committed by GitHub
commit c99cdd7915
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 193 additions and 62 deletions

1
.gitignore vendored
View File

@ -6,6 +6,7 @@
# Folders
_obj
_test
_manifests
# Architecture specific extensions/prefixes
*.[568vq]

View File

@ -350,6 +350,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
match = false
reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one")
}
if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) {
match = false
reasons = append(reasons, "new statefulset's annotations doesn't match the current one")
}
if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) {
needsRollUpdate = true
reasons = append(reasons, "new statefulset's container specification doesn't match the current one")
@ -401,9 +405,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) {
needsRollUpdate = true
match = false
needsReplace = true
reasons = append(reasons, "new statefulset's metadata annotations doesn't match the current one")
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod template metadata annotations doesn't match the current one")
}
if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) {
needsReplace = true

View File

@ -564,9 +564,10 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu
statefulSet := &v1beta1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: c.statefulSetName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Name: c.statefulSetName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: map[string]string{RollingUpdateStatefulsetAnnotationKey: "false"},
},
Spec: v1beta1.StatefulSetSpec{
Replicas: &numberOfInstances,

View File

@ -17,6 +17,10 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
)
const (
RollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update-required"
)
func (c *Cluster) listResources() error {
if c.PodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID)
@ -128,6 +132,95 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error {
return nil
}
// setRollingUpdateFlagForStatefulSet sets the indicator or the rolling upgrade requirement
// in the StatefulSet annotation.
func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *v1beta1.StatefulSet, val bool) {
anno := sset.GetAnnotations()
c.logger.Debugf("rolling upgrade flag has been set to %t", val)
if anno == nil {
anno = make(map[string]string)
}
anno[RollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val)
sset.SetAnnotations(anno)
}
// applyRollingUpdateFlagforStatefulSet sets the rolling update flag for the cluster's StatefulSet
// and applies that setting to the actual running cluster.
func (c *Cluster) applyRollingUpdateFlagforStatefulSet(val bool) error {
c.setRollingUpdateFlagForStatefulSet(c.Statefulset, val)
sset, err := c.updateStatefulSetAnnotations(c.Statefulset.GetAnnotations())
if err != nil {
return err
}
c.Statefulset = sset
return nil
}
// getRollingUpdateFlagFromStatefulSet returns the value of the rollingUpdate flag from the passed
// StatefulSet, reverting to the default value in case of errors
func (c *Cluster) getRollingUpdateFlagFromStatefulSet(sset *v1beta1.StatefulSet, defaultValue bool) (flag bool) {
anno := sset.GetAnnotations()
flag = defaultValue
stringFlag, exists := anno[RollingUpdateStatefulsetAnnotationKey]
if exists {
var err error
if flag, err = strconv.ParseBool(stringFlag); err != nil {
c.logger.Warnf("error when parsing %q annotation for the statefulset %q: expected boolean value, got %q\n",
RollingUpdateStatefulsetAnnotationKey,
types.NamespacedName{sset.Namespace, sset.Name},
stringFlag)
flag = defaultValue
}
}
return flag
}
// mergeRollingUpdateFlagUsingCache return the value of the rollingUpdate flag from the passed
// statefulset, however, the value can be cleared if there is a cached flag in the cluster that
// is set to false (the disrepancy could be a result of a failed StatefulSet update).s
func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *v1beta1.StatefulSet) bool {
var (
cachedStatefulsetExists, clearRollingUpdateFromCache, podsRollingUpdateRequired bool
)
if c.Statefulset != nil {
// if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update
// the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying
// on the 'cached' in-memory flag.
cachedStatefulsetExists = true
clearRollingUpdateFromCache = !c.getRollingUpdateFlagFromStatefulSet(c.Statefulset, true)
c.logger.Debugf("cached StatefulSet value exists, rollingUpdate flag is %t", clearRollingUpdateFromCache)
}
if podsRollingUpdateRequired = c.getRollingUpdateFlagFromStatefulSet(runningStatefulSet, false); podsRollingUpdateRequired {
if cachedStatefulsetExists && clearRollingUpdateFromCache {
c.logger.Infof("clearing the rolling update flag based on the cached information")
podsRollingUpdateRequired = false
} else {
c.logger.Infof("found a statefulset with an unfinished pods rolling update")
}
}
return podsRollingUpdateRequired
}
func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*v1beta1.StatefulSet, error) {
c.logger.Debugf("updating statefulset annotations")
patchData, err := metaAnnotationsPatch(annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for the statefulset metadata: %v", err)
}
result, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch(
c.Statefulset.Name,
types.MergePatchType,
[]byte(patchData), "")
if err != nil {
return nil, fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err)
}
return result, nil
}
func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
c.setProcessName("updating statefulset")
if c.Statefulset == nil {
@ -153,8 +246,16 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
types.MergePatchType,
patchData, "")
if err != nil {
return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err)
return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err)
}
if newStatefulSet.Annotations != nil {
statefulSet, err = c.updateStatefulSetAnnotations(newStatefulSet.Annotations)
if err != nil {
return err
}
}
c.Statefulset = statefulSet
return nil
@ -298,16 +399,19 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
return nil
}
// update the service annotation in order to propagate ELB notation.
if len(newService.ObjectMeta.Annotations) > 0 {
annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations)
if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil {
_, err = c.KubeClient.Services(serviceName.Namespace).Patch(
serviceName.Name,
types.MergePatchType,
[]byte(annotationsPatchData), "")
_, err := c.KubeClient.Services(serviceName.Namespace).Patch(
serviceName.Name,
types.StrategicMergePatchType,
[]byte(annotationsPatchData), "")
if err != nil {
return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err)
if err != nil {
return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err)
}
} else {
return fmt.Errorf("could not form patch for the service metadata: %v", err)
}
}
@ -316,6 +420,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
}
// update the service spec
svc, err := c.KubeClient.Services(serviceName.Namespace).Patch(
serviceName.Name,
types.MergePatchType,

View File

@ -220,7 +220,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
}
func (c *Cluster) syncStatefulSet() error {
var (
podsRollingUpdateRequired bool
)
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{})
if err != nil {
if !k8sutil.ResourceNotFound(err) {
@ -243,13 +245,15 @@ func (c *Cluster) syncStatefulSet() error {
return fmt.Errorf("cluster is not ready: %v", err)
}
c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
if len(pods) <= 0 {
return nil
podsRollingUpdateRequired = (len(pods) > 0)
if podsRollingUpdateRequired {
c.logger.Warningf("found pods from the previous statefulset: trigger rolling update")
c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired)
}
c.logger.Infof("found pods without the statefulset: trigger rolling update")
c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
} else {
podsRollingUpdateRequired = c.mergeRollingUpdateFlagUsingCache(sset)
// statefulset is already there, make sure we use its definition in order to compare with the spec.
c.Statefulset = sset
@ -257,36 +261,39 @@ func (c *Cluster) syncStatefulSet() error {
if err != nil {
return fmt.Errorf("could not generate statefulset: %v", err)
}
c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired)
cmp := c.compareStatefulSetWith(desiredSS)
if cmp.match {
return nil
}
c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
if !cmp.replace {
if err := c.updateStatefulSet(desiredSS); err != nil {
return fmt.Errorf("could not update statefulset: %v", err)
if !cmp.match {
if cmp.rollingUpdate && !podsRollingUpdateRequired {
podsRollingUpdateRequired = true
c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired)
}
} else {
if err := c.replaceStatefulSet(desiredSS); err != nil {
return fmt.Errorf("could not replace statefulset: %v", err)
}
}
c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
if !cmp.rollingUpdate {
c.logger.Debugln("no rolling update is needed")
return nil
if !cmp.replace {
if err := c.updateStatefulSet(desiredSS); err != nil {
return fmt.Errorf("could not update statefulset: %v", err)
}
} else {
if err := c.replaceStatefulSet(desiredSS); err != nil {
return fmt.Errorf("could not replace statefulset: %v", err)
}
}
}
}
// if we get here we also need to re-create the pods (either leftovers from the old
// statefulset or those that got their configuration from the outdated statefulset)
c.logger.Debugln("performing rolling update")
if err := c.recreatePods(); err != nil {
return fmt.Errorf("could not recreate pods: %v", err)
if podsRollingUpdateRequired {
c.logger.Debugln("performing rolling update")
if err := c.recreatePods(); err != nil {
return fmt.Errorf("could not recreate pods: %v", err)
}
c.logger.Infof("pods have been recreated")
if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil {
c.logger.Warningf("could not clear rolling update for the statefulset: %v", err)
}
}
c.logger.Infof("pods have been recreated")
return nil
}

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"reflect"
"sort"
"strings"
"time"
@ -133,21 +134,23 @@ func normalizeUserFlags(userFlags []string) ([]string, error) {
return flags, nil
}
// specPatch produces a JSON of the Kubernetes object specification passed (typically service or
// statefulset) to use it in a MergePatch.
func specPatch(spec interface{}) ([]byte, error) {
return json.Marshal(struct {
Spec interface{} `json:"spec"`
}{spec})
}
func metadataAnnotationsPatch(annotations map[string]string) string {
annotationsList := make([]string, 0, len(annotations))
for name, value := range annotations {
annotationsList = append(annotationsList, fmt.Sprintf(`"%s":"%s"`, name, value))
}
annotationsString := strings.Join(annotationsList, ",")
// TODO: perhaps use patchStrategy:action json annotation instead of constructing the patch literally.
return fmt.Sprintf(constants.ServiceMetadataAnnotationReplaceFormat, annotationsString)
// metaAnnotationsPatch produces a JSON of the object metadata that has only the annotation
// field in order to use it in a MergePatch. Note that we don't patch the complete metadata, since
// it contains the current revision of the object that could be outdated at the time we patch.
func metaAnnotationsPatch(annotations map[string]string) ([]byte, error) {
var meta metav1.ObjectMeta
meta.Annotations = annotations
return json.Marshal(struct {
ObjMeta interface{} `json:"metadata"`
}{&meta})
}
func (c *Cluster) logPDBChanges(old, new *policybeta1.PodDisruptionBudget, isUpdate bool, reason string) {
@ -170,7 +173,10 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate
util.NameFromMeta(old.ObjectMeta),
)
}
c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec))
if !reflect.DeepEqual(old.Annotations, new.Annotations) {
c.logger.Debugf("metadata.annotation diff\n%s\n", util.PrettyDiff(old.Annotations, new.Annotations))
}
c.logger.Debugf("spec diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec))
if len(reasons) > 0 {
for _, reason := range reasons {

View File

@ -2,10 +2,9 @@ package constants
// Names and values in Kubernetes annotation for services, statefulsets and volumes
const (
ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname"
ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout"
ElbTimeoutAnnotationValue = "3600"
KubeIAmAnnotation = "iam.amazonaws.com/role"
VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by"
ServiceMetadataAnnotationReplaceFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}`
ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname"
ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout"
ElbTimeoutAnnotationValue = "3600"
KubeIAmAnnotation = "iam.amazonaws.com/role"
VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by"
)

View File

@ -120,17 +120,24 @@ func SameService(cur, new *v1.Service) (match bool, reason string) {
newSourceRanges := new.Spec.LoadBalancerSourceRanges
/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */
if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) {
return true, ""
}
if !reflect.DeepEqual(oldSourceRanges, newSourceRanges) {
return false, "new service's LoadBalancerSourceRange doesn't match the current one"
if (len(oldSourceRanges) != 0) || (len(newSourceRanges) != 0) {
if !reflect.DeepEqual(oldSourceRanges, newSourceRanges) {
return false, "new service's LoadBalancerSourceRange doesn't match the current one"
}
}
oldDNSAnnotation := cur.Annotations[constants.ZalandoDNSNameAnnotation]
newDNSAnnotation := new.Annotations[constants.ZalandoDNSNameAnnotation]
oldELBAnnotation := cur.Annotations[constants.ElbTimeoutAnnotationName]
newELBAnnotation := new.Annotations[constants.ElbTimeoutAnnotationName]
if oldDNSAnnotation != newDNSAnnotation {
return false, fmt.Sprintf("new service's %q annotation doesn't match the current one", constants.ZalandoDNSNameAnnotation)
return false, fmt.Sprintf("new service's %q annotation value %q doesn't match the current one %q",
constants.ZalandoDNSNameAnnotation, newDNSAnnotation, oldDNSAnnotation)
}
if oldELBAnnotation != newELBAnnotation {
return false, fmt.Sprintf("new service's %q annotation value %q doesn't match the current one %q",
constants.ElbTimeoutAnnotationName, oldELBAnnotation, newELBAnnotation)
}
return true, ""