Initial implementation for the statefulset annotations indicating rolling updates.

This commit is contained in:
Oleksii Kliukin 2018-05-03 19:19:52 +02:00 committed by Oleksii Kliukin
parent 43a1db2128
commit ce0d4af91c
5 changed files with 95 additions and 94 deletions

1
.gitignore vendored
View File

@ -6,6 +6,7 @@
# Folders
_obj
_test
_manifests
# Architecture specific extensions/prefixes
*.[568vq]

View File

@ -72,13 +72,12 @@ type Cluster struct {
deleteOptions *metav1.DeleteOptions
podEventsQueue *cache.FIFO
teamsAPIClient teams.Interface
oauthTokenGetter OAuthTokenGetter
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
currentProcess spec.Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
pendingRollingUpdate *bool // indicates the cluster needs a rolling update
teamsAPIClient teams.Interface
oauthTokenGetter OAuthTokenGetter
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
currentProcess spec.Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
}
type compareStatefulsetResult struct {
@ -111,11 +110,10 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
userSyncStrategy: users.DefaultUserSyncStrategy{},
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
podEventsQueue: podEventsQueue,
KubeClient: kubeClient,
pendingRollingUpdate: nil,
userSyncStrategy: users.DefaultUserSyncStrategy{},
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
podEventsQueue: podEventsQueue,
KubeClient: kubeClient,
}
cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
@ -251,7 +249,6 @@ func (c *Cluster) Create() error {
}()
c.setStatus(spec.ClusterStatusCreating)
c.setPendingRollingUpgrade(false)
for _, role := range []PostgresRole{Master, Replica} {
@ -301,7 +298,7 @@ func (c *Cluster) Create() error {
if c.Statefulset != nil {
return fmt.Errorf("statefulset already exists in the cluster")
}
ss, err = c.createStatefulSet()
ss, err = c.createStatefulSet(false)
if err != nil {
return fmt.Errorf("could not create statefulset: %v", err)
}
@ -345,6 +342,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
match = false
reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one")
}
if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) {
match = false
reasons = append(reasons, "new statefulset's annotations doesn't match the current one")
}
if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) {
needsRollUpdate = true
reasons = append(reasons, "new statefulset's container specification doesn't match the current one")
@ -396,9 +397,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) {
needsRollUpdate = true
match = false
needsReplace = true
reasons = append(reasons, "new statefulset's metadata annotations doesn't match the current one")
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod template metadata annotations doesn't match the current one")
}
if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) {
needsReplace = true

View File

@ -17,6 +17,10 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
)
const (
RollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update"
)
func (c *Cluster) listResources() error {
if c.PodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID)
@ -59,9 +63,38 @@ func (c *Cluster) listResources() error {
return nil
}
func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
func setRollingUpdateFlag(sset *v1beta1.StatefulSet, val bool) {
anno := sset.GetAnnotations()
fmt.Printf("rolling upgrade flag has been set to %t", val)
if anno == nil {
anno = make(map[string]string)
}
anno[RollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val)
sset.SetAnnotations(anno)
}
func getRollingUpdateFlag(sset *v1beta1.StatefulSet, defaultValue bool) (flag bool) {
anno := sset.GetAnnotations()
flag = defaultValue
stringFlag, exists := anno[RollingUpdateStatefulsetAnnotationKey]
if exists {
var err error
if flag, err = strconv.ParseBool(stringFlag); err != nil {
fmt.Printf("error when parsing %s annotation for the statefulset %s: expected boolean value, got %s\n",
RollingUpdateStatefulsetAnnotationKey,
types.NamespacedName{sset.Namespace, sset.Name},
stringFlag)
flag = defaultValue
}
}
return flag
}
func (c *Cluster) createStatefulSet(pendingRollingUpgrade bool) (*v1beta1.StatefulSet, error) {
c.setProcessName("creating statefulset")
statefulSetSpec, err := c.generateStatefulSet(&c.Spec)
setRollingUpdateFlag(statefulSetSpec, pendingRollingUpgrade)
if err != nil {
return nil, fmt.Errorf("could not generate statefulset: %v", err)
}
@ -128,7 +161,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error {
return nil
}
func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet, includeAnnotations bool) error {
c.setProcessName("updating statefulset")
if c.Statefulset == nil {
return fmt.Errorf("there is no statefulset in the cluster")
@ -153,8 +186,19 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
types.MergePatchType,
patchData, "")
if err != nil {
return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err)
return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err)
}
if includeAnnotations && newStatefulSet.Annotations != nil {
patchData := metadataAnnotationsPatch(newStatefulSet.Annotations)
statefulSet, err = c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch(
c.Statefulset.Name,
types.StrategicMergePatchType,
[]byte(patchData), "")
if err != nil {
return fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err)
}
}
c.Statefulset = statefulSet
return nil

View File

@ -220,7 +220,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
}
func (c *Cluster) syncStatefulSet() error {
var (
cachedRollingUpdateFlag, podsRollingUpdateRequired bool
)
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{})
if err != nil {
if !k8sutil.ResourceNotFound(err) {
@ -234,7 +236,8 @@ func (c *Cluster) syncStatefulSet() error {
return fmt.Errorf("could not list pods of the statefulset: %v", err)
}
sset, err = c.createStatefulSet()
podsRollingUpdateRequired := (len(pods) > 0)
sset, err = c.createStatefulSet(podsRollingUpdateRequired)
if err != nil {
return fmt.Errorf("could not create missing statefulset: %v", err)
}
@ -244,36 +247,42 @@ func (c *Cluster) syncStatefulSet() error {
}
c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
if len(pods) <= 0 {
return nil
}
c.logger.Infof("found pods without the statefulset: trigger rolling update")
c.setPendingRollingUpgrade(true)
} else {
if c.Statefulset != nil {
// if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update
// the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying
// on the 'cached' in-memory flag.
cachedRollingUpdateFlag = getRollingUpdateFlag(c.Statefulset, true)
c.logger.Debugf("cached statefulset value exists, rollingUpdate flag is %t", cachedRollingUpdateFlag)
}
// statefulset is already there, make sure we use its definition in order to compare with the spec.
c.Statefulset = sset
// resolve the pending rolling upgrade flags as soon as we read an actual statefulset from kubernetes.
// we must do it before updating statefulsets; after an update, the statfulset will receive a new
// updateRevision, different from the one the pods run with.
if err := c.resolvePendingRollingUpdate(sset); err != nil {
return fmt.Errorf("could not resolve the rolling upgrade status: %v", err)
if podsRollingUpdateRequired = getRollingUpdateFlag(c.Statefulset, false); podsRollingUpdateRequired {
if cachedRollingUpdateFlag {
c.logger.Infof("found a statefulset with an unfinished pods rolling update")
} else {
c.logger.Infof("clearing the rolling update flag based on the cached information")
podsRollingUpdateRequired = false
}
}
desiredSS, err := c.generateStatefulSet(&c.Spec)
if err != nil {
return fmt.Errorf("could not generate statefulset: %v", err)
}
setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired)
cmp := c.compareStatefulSetWith(desiredSS)
if !cmp.match {
if cmp.rollingUpdate {
c.setPendingRollingUpgrade(true)
if cmp.rollingUpdate && !podsRollingUpdateRequired {
podsRollingUpdateRequired = true
setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired)
}
c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
if !cmp.replace {
if err := c.updateStatefulSet(desiredSS); err != nil {
if err := c.updateStatefulSet(desiredSS, true); err != nil {
return fmt.Errorf("could not update statefulset: %v", err)
}
} else {
@ -285,15 +294,17 @@ func (c *Cluster) syncStatefulSet() error {
}
// if we get here we also need to re-create the pods (either leftovers from the old
// statefulset or those that got their configuration from the outdated statefulset)
if *c.pendingRollingUpdate {
if podsRollingUpdateRequired {
c.logger.Debugln("performing rolling update")
if err := c.recreatePods(); err != nil {
return fmt.Errorf("could not recreate pods: %v", err)
}
c.setPendingRollingUpgrade(false)
c.logger.Infof("pods have been recreated")
setRollingUpdateFlag(c.Statefulset, false)
if err := c.updateStatefulSet(c.Statefulset, true); err != nil {
c.logger.Warningf("could not clear rolling update for the statefulset")
}
}
return nil
}

View File

@ -39,10 +39,6 @@ func NewSecretOauthTokenGetter(kubeClient *k8sutil.KubernetesClient,
return &SecretOauthTokenGetter{kubeClient, OAuthTokenSecretName}
}
const (
podControllerRevisionHashLabel = "controller-revision-hash"
)
func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) {
//TODO: we can move this function to the Controller in case it will be needed there. As for now we use it only in the Cluster
// Temporary getting postgresql-operator secret from the NamespaceDefault
@ -462,56 +458,3 @@ func (c *Cluster) GetSpec() (*spec.Postgresql, error) {
func (c *Cluster) patroniUsesKubernetes() bool {
return c.OpConfig.EtcdHost == ""
}
func (c *Cluster) setPendingRollingUpgrade(val bool) {
if c.pendingRollingUpdate == nil {
c.pendingRollingUpdate = new(bool)
}
*c.pendingRollingUpdate = val
c.logger.Debugf("pending rolling upgrade was set to %b", val)
}
// resolvePendingRollingUpdate figures out if rolling upgrade is necessary
// based on the states of the cluster statefulset and pods
func (c *Cluster) resolvePendingRollingUpdate(sset *v1beta1.StatefulSet) error {
// XXX: it looks like we will always trigger a rolling update if the
// pods are on a different revision from a statefulset, even if the
// statefulset change that caused it didn't require a rolling update
// originally.
if c.pendingRollingUpdate != nil {
return nil
}
c.logger.Debugf("evaluating rolling upgrade requirement")
effectiveRevision := sset.Status.UpdateRevision
if effectiveRevision == "" {
if sset.Status.CurrentRevision == "" {
c.logger.Debugf("statefulset doesn't have a current revision, no rolling upgrade")
// the statefulset does not have a currentRevision, it must be new; hence, no rollingUpdate
c.setPendingRollingUpgrade(false)
return nil
}
effectiveRevision = sset.Status.CurrentRevision
}
// fetch all pods related to this cluster
pods, err := c.listPods()
if err != nil {
return err
}
// check their revisions
for _, pod := range pods {
podRevision, present := pod.Labels[podControllerRevisionHashLabel]
// empty or missing revision indicates a new pod - doesn't need a rolling upgrade
if !present || podRevision == "" {
continue
}
c.logger.Debugf("observing pod revision %q vs statefulset revision %q", podRevision, effectiveRevision)
if podRevision != effectiveRevision {
// pod is on a different revision - trigger the rolling upgrade
c.setPendingRollingUpgrade(true)
return nil
}
}
c.setPendingRollingUpgrade(false)
return nil
}