diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 65fca3417..9c9ef131c 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -298,7 +298,7 @@ func (c *Cluster) Create() error { if c.Statefulset != nil { return fmt.Errorf("statefulset already exists in the cluster") } - ss, err = c.createStatefulSet(false) + ss, err = c.createStatefulSet() if err != nil { return fmt.Errorf("could not create statefulset: %v", err) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 18350f526..89b47384c 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -564,9 +564,10 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu statefulSet := &v1beta1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ - Name: c.statefulSetName(), - Namespace: c.Namespace, - Labels: c.labelsSet(true), + Name: c.statefulSetName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + Annotations: map[string]string{RollingUpdateStatefulsetAnnotationKey: "false"}, }, Spec: v1beta1.StatefulSetSpec{ Replicas: &numberOfInstances, @@ -704,6 +705,11 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *sp return *spec.UseLoadBalancer } + // if the value is explicitly set in a Postgresql manifest, follow this setting + if spec.EnableMasterLoadBalancer != nil { + return *spec.EnableMasterLoadBalancer + } + // `enable_load_balancer`` governs LB for a master service // there is no equivalent deprecated operator option for the replica LB if c.OpConfig.EnableLoadBalancer != nil { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index a4135a534..7d9c4020c 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -63,38 +63,9 @@ func (c *Cluster) listResources() error { return nil } -func setRollingUpdateFlag(sset *v1beta1.StatefulSet, val bool) { - anno := sset.GetAnnotations() - fmt.Printf("rolling upgrade flag has been set to %t", val) - if anno == nil { - anno = make(map[string]string) - } - anno[RollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val) - sset.SetAnnotations(anno) -} - -func getRollingUpdateFlag(sset *v1beta1.StatefulSet, defaultValue bool) (flag bool) { - anno := sset.GetAnnotations() - flag = defaultValue - - stringFlag, exists := anno[RollingUpdateStatefulsetAnnotationKey] - if exists { - var err error - if flag, err = strconv.ParseBool(stringFlag); err != nil { - fmt.Printf("error when parsing %s annotation for the statefulset %s: expected boolean value, got %s\n", - RollingUpdateStatefulsetAnnotationKey, - types.NamespacedName{sset.Namespace, sset.Name}, - stringFlag) - flag = defaultValue - } - } - return flag -} - -func (c *Cluster) createStatefulSet(pendingRollingUpgrade bool) (*v1beta1.StatefulSet, error) { +func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { c.setProcessName("creating statefulset") statefulSetSpec, err := c.generateStatefulSet(&c.Spec) - setRollingUpdateFlag(statefulSetSpec, pendingRollingUpgrade) if err != nil { return nil, fmt.Errorf("could not generate statefulset: %v", err) } @@ -161,7 +132,96 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error { return nil } -func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet, includeAnnotations bool) error { +// setRollingUpdateFlagForStatefulSet sets the indicator or the rolling upgrade requirement +// in the StatefulSet annotation. +func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *v1beta1.StatefulSet, val bool) { + anno := sset.GetAnnotations() + c.logger.Debugf("rolling upgrade flag has been set to %t", val) + if anno == nil { + anno = make(map[string]string) + } + anno[RollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val) + sset.SetAnnotations(anno) +} + +// applyRollingUpdateFlagforStatefulSet sets the rolling update flag for the cluster's StatefulSet +// and applies that setting to the actual running cluster. +func (c *Cluster) applyRollingUpdateFlagforStatefulSet(val bool) error { + c.setRollingUpdateFlagForStatefulSet(c.Statefulset, val) + sset, err := c.updateStatefulSetAnnotations(c.Statefulset.GetAnnotations()) + if err != nil { + return err + } + c.Statefulset = sset + return nil +} + +// getRollingUpdateFlagFromStatefulSet returns the value of the rollingUpdate flag from the passed +// StatefulSet, reverting to the default value in case of errors +func (c *Cluster) getRollingUpdateFlagFromStatefulSet(sset *v1beta1.StatefulSet, defaultValue bool) (flag bool) { + anno := sset.GetAnnotations() + flag = defaultValue + + stringFlag, exists := anno[RollingUpdateStatefulsetAnnotationKey] + if exists { + var err error + if flag, err = strconv.ParseBool(stringFlag); err != nil { + c.logger.Warnf("error when parsing %q annotation for the statefulset %q: expected boolean value, got %q\n", + RollingUpdateStatefulsetAnnotationKey, + types.NamespacedName{sset.Namespace, sset.Name}, + stringFlag) + flag = defaultValue + } + } + return flag +} + +// mergeRollingUpdateFlagUsingCache return the value of the rollingUpdate flag from the passed +// statefulset, however, the value can be cleared if there is a cached flag in the cluster that +// is set to false (the disrepancy could be a result of a failed StatefulSet update).s +func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *v1beta1.StatefulSet) bool { + var ( + cachedStatefulsetExists, clearRollingUpdateFromCache, podsRollingUpdateRequired bool + ) + + if c.Statefulset != nil { + // if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update + // the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying + // on the 'cached' in-memory flag. + cachedStatefulsetExists = true + clearRollingUpdateFromCache = !c.getRollingUpdateFlagFromStatefulSet(c.Statefulset, true) + c.logger.Debugf("cached StatefulSet value exists, rollingUpdate flag is %t", clearRollingUpdateFromCache) + } + + if podsRollingUpdateRequired = c.getRollingUpdateFlagFromStatefulSet(runningStatefulSet, false); podsRollingUpdateRequired { + if cachedStatefulsetExists && clearRollingUpdateFromCache { + c.logger.Infof("clearing the rolling update flag based on the cached information") + podsRollingUpdateRequired = false + } else { + c.logger.Infof("found a statefulset with an unfinished pods rolling update") + + } + } + return podsRollingUpdateRequired +} + +func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*v1beta1.StatefulSet, error) { + c.logger.Debugf("updating statefulset annotations") + patchData, err := metaAnnotationsPatch(annotations) + if err != nil { + return nil, fmt.Errorf("could not form patch for the statefulset metadata: %v", err) + } + result, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( + c.Statefulset.Name, + types.MergePatchType, + []byte(patchData), "") + if err != nil { + return nil, fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err) + } + return result, nil + +} +func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { c.setProcessName("updating statefulset") if c.Statefulset == nil { return fmt.Errorf("there is no statefulset in the cluster") @@ -188,14 +248,11 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet, include if err != nil { return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err) } - if includeAnnotations && newStatefulSet.Annotations != nil { - patchData := metadataAnnotationsPatch(newStatefulSet.Annotations) - statefulSet, err = c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( - c.Statefulset.Name, - types.StrategicMergePatchType, - []byte(patchData), "") + + if newStatefulSet.Annotations != nil { + statefulSet, err = c.updateStatefulSetAnnotations(newStatefulSet.Annotations) if err != nil { - return fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err) + return err } } @@ -342,16 +399,19 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error return nil } + // update the service annotation in order to propagate ELB notation. if len(newService.ObjectMeta.Annotations) > 0 { - annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations) + if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil { + _, err = c.KubeClient.Services(serviceName.Namespace).Patch( + serviceName.Name, + types.MergePatchType, + []byte(annotationsPatchData), "") - _, err := c.KubeClient.Services(serviceName.Namespace).Patch( - serviceName.Name, - types.StrategicMergePatchType, - []byte(annotationsPatchData), "") - - if err != nil { - return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) + if err != nil { + return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) + } + } else { + return fmt.Errorf("could not form patch for the service metadata: %v", err) } } @@ -360,6 +420,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) } + // update the service spec svc, err := c.KubeClient.Services(serviceName.Namespace).Patch( serviceName.Name, types.MergePatchType, diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 1c1fcba61..d3bb567ff 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -221,7 +221,7 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { func (c *Cluster) syncStatefulSet() error { var ( - cachedRollingUpdateFlag, podsRollingUpdateRequired bool + podsRollingUpdateRequired bool ) sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{}) if err != nil { @@ -236,8 +236,7 @@ func (c *Cluster) syncStatefulSet() error { return fmt.Errorf("could not list pods of the statefulset: %v", err) } - podsRollingUpdateRequired := (len(pods) > 0) - sset, err = c.createStatefulSet(podsRollingUpdateRequired) + sset, err = c.createStatefulSet() if err != nil { return fmt.Errorf("could not create missing statefulset: %v", err) } @@ -246,43 +245,33 @@ func (c *Cluster) syncStatefulSet() error { return fmt.Errorf("cluster is not ready: %v", err) } + podsRollingUpdateRequired = (len(pods) > 0) + if podsRollingUpdateRequired { + c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired) + } c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) } else { - if c.Statefulset != nil { - // if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update - // the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying - // on the 'cached' in-memory flag. - cachedRollingUpdateFlag = getRollingUpdateFlag(c.Statefulset, true) - c.logger.Debugf("cached statefulset value exists, rollingUpdate flag is %t", cachedRollingUpdateFlag) - } + podsRollingUpdateRequired = c.mergeRollingUpdateFlagUsingCache(sset) // statefulset is already there, make sure we use its definition in order to compare with the spec. c.Statefulset = sset - if podsRollingUpdateRequired = getRollingUpdateFlag(c.Statefulset, false); podsRollingUpdateRequired { - if cachedRollingUpdateFlag { - c.logger.Infof("found a statefulset with an unfinished pods rolling update") - } else { - c.logger.Infof("clearing the rolling update flag based on the cached information") - podsRollingUpdateRequired = false - } - } desiredSS, err := c.generateStatefulSet(&c.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } - setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired) + c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired) cmp := c.compareStatefulSetWith(desiredSS) if !cmp.match { if cmp.rollingUpdate && !podsRollingUpdateRequired { podsRollingUpdateRequired = true - setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired) + c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired) } c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) if !cmp.replace { - if err := c.updateStatefulSet(desiredSS, true); err != nil { + if err := c.updateStatefulSet(desiredSS); err != nil { return fmt.Errorf("could not update statefulset: %v", err) } } else { @@ -300,9 +289,8 @@ func (c *Cluster) syncStatefulSet() error { return fmt.Errorf("could not recreate pods: %v", err) } c.logger.Infof("pods have been recreated") - setRollingUpdateFlag(c.Statefulset, false) - if err := c.updateStatefulSet(c.Statefulset, true); err != nil { - c.logger.Warningf("could not clear rolling update for the statefulset") + if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil { + c.logger.Warningf("could not clear rolling update for the statefulset: %v", err) } } return nil diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index b72835311..5a5447918 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -133,21 +133,23 @@ func normalizeUserFlags(userFlags []string) ([]string, error) { return flags, nil } +// specPatch produces a JSON of the Kubernetes object specification passed (typically service or +// statefulset) to use it in a MergePatch. func specPatch(spec interface{}) ([]byte, error) { return json.Marshal(struct { Spec interface{} `json:"spec"` }{spec}) } -func metadataAnnotationsPatch(annotations map[string]string) string { - annotationsList := make([]string, 0, len(annotations)) - - for name, value := range annotations { - annotationsList = append(annotationsList, fmt.Sprintf(`"%s":"%s"`, name, value)) - } - annotationsString := strings.Join(annotationsList, ",") - // TODO: perhaps use patchStrategy:action json annotation instead of constructing the patch literally. - return fmt.Sprintf(constants.ServiceMetadataAnnotationReplaceFormat, annotationsString) +// metaAnnotationsPatch produces a JSON of the object metadata that has only the annotation +// field in order to use it in a MergePatch. Note that we don't patch the complete metadata, since +// it contains the current revision of the object that could be outdated at the time we patch. +func metaAnnotationsPatch(annotations map[string]string) ([]byte, error) { + var meta metav1.ObjectMeta + meta.Annotations = annotations + return json.Marshal(struct { + ObjMeta interface{} `json:"metadata"` + }{&meta}) } func (c *Cluster) logPDBChanges(old, new *policybeta1.PodDisruptionBudget, isUpdate bool, reason string) { diff --git a/pkg/util/constants/annotations.go b/pkg/util/constants/annotations.go index 48e41cb16..0b93fc2e1 100644 --- a/pkg/util/constants/annotations.go +++ b/pkg/util/constants/annotations.go @@ -2,10 +2,9 @@ package constants // Names and values in Kubernetes annotation for services, statefulsets and volumes const ( - ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" - ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" - ElbTimeoutAnnotationValue = "3600" - KubeIAmAnnotation = "iam.amazonaws.com/role" - VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" - ServiceMetadataAnnotationReplaceFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}` + ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" + ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" + ElbTimeoutAnnotationValue = "3600" + KubeIAmAnnotation = "iam.amazonaws.com/role" + VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 1e50bd034..e9c899b82 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -120,16 +120,18 @@ func SameService(cur, new *v1.Service) (match bool, reason string) { newSourceRanges := new.Spec.LoadBalancerSourceRanges /* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ - if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) { - return true, "" - } - if !reflect.DeepEqual(oldSourceRanges, newSourceRanges) { - return false, "new service's LoadBalancerSourceRange doesn't match the current one" + if (len(oldSourceRanges) != 0) || (len(newSourceRanges) != 0) { + if !reflect.DeepEqual(oldSourceRanges, newSourceRanges) { + return false, "new service's LoadBalancerSourceRange doesn't match the current one" + } } oldDNSAnnotation := cur.Annotations[constants.ZalandoDNSNameAnnotation] newDNSAnnotation := new.Annotations[constants.ZalandoDNSNameAnnotation] - if oldDNSAnnotation != newDNSAnnotation { + oldELBAnnotation := cur.Annotations[constants.ElbTimeoutAnnotationName] + newELBAnnotation := new.Annotations[constants.ElbTimeoutAnnotationName] + + if oldDNSAnnotation != newDNSAnnotation || oldELBAnnotation != newELBAnnotation { return false, fmt.Sprintf("new service's %q annotation doesn't match the current one", constants.ZalandoDNSNameAnnotation) }