Replace the statefulset if it cannot be updated. (#18)
Updates to statefulset spec for fields other than 'replicas' and containers' are forbidden. However, it is possible to delete the old statefulset without deleting its pods and create the new one, using the changed specs. The new statefulset shall pick up the orphaned pods. Change the statefulset's comparison to return the combined effect of all checks, not just the first non-matching field.
This commit is contained in:
parent
356be8f0f1
commit
4457ce4e47
|
|
@ -267,7 +267,7 @@ func (c Cluster) sameVolumeWith(volume spec.Volume) (match bool, reason string)
|
|||
return
|
||||
}
|
||||
|
||||
func (c Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (match, needsRollUpdate bool, reason string) {
|
||||
func (c Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (match, needsReplace bool, needsRollUpdate bool, reason string) {
|
||||
match = true
|
||||
//TODO: improve me
|
||||
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
|
||||
|
|
@ -275,44 +275,91 @@ func (c Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (match
|
|||
reason = "new statefulset's number of replicas doesn't match the current one"
|
||||
}
|
||||
if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) {
|
||||
match = false
|
||||
needsRollUpdate = true
|
||||
reason = "new statefulset's container specification doesn't match the current one"
|
||||
return
|
||||
}
|
||||
if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 {
|
||||
c.logger.Warnf("StatefulSet '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta))
|
||||
return
|
||||
}
|
||||
// In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through
|
||||
// and the combined effect of all the changes should be applied.
|
||||
// TODO: log all reasons for changing the statefulset, not just the last one.
|
||||
// TODO: make sure this is in sync with genPodTemplate, ideally by using the same list of fields to generate
|
||||
// the template and the diff
|
||||
if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName {
|
||||
needsReplace = true
|
||||
needsRollUpdate = true
|
||||
reason = "new statefulset's serviceAccountName service asccount name doesn't match the current one"
|
||||
}
|
||||
if *c.Statefulset.Spec.Template.Spec.TerminationGracePeriodSeconds != *statefulSet.Spec.Template.Spec.TerminationGracePeriodSeconds {
|
||||
needsReplace = true
|
||||
needsRollUpdate = true
|
||||
reason = "new statefulset's terminationGracePeriodSeconds doesn't match the current one"
|
||||
}
|
||||
// Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta
|
||||
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Labels, statefulSet.Spec.Template.Labels) {
|
||||
needsReplace = true
|
||||
needsRollUpdate = true
|
||||
reason = "new statefulset's metadata labels doesn't match the current one"
|
||||
}
|
||||
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) {
|
||||
needsRollUpdate = true
|
||||
needsReplace = true
|
||||
reason = "new statefulset's metadata annotations doesn't match the current one"
|
||||
}
|
||||
if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) {
|
||||
needsReplace = true
|
||||
needsRollUpdate = true
|
||||
reason = "new statefulset's volumeClaimTemplates contains different number of volumes to the old one"
|
||||
}
|
||||
for i := 0; i < len(c.Statefulset.Spec.VolumeClaimTemplates); i++ {
|
||||
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
|
||||
// Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta
|
||||
if name != statefulSet.Spec.VolumeClaimTemplates[i].Name {
|
||||
needsReplace = true
|
||||
needsRollUpdate = true
|
||||
reason = fmt.Sprintf("new statefulset's name for volume %d doesn't match the current one", i)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
|
||||
needsReplace = true
|
||||
needsRollUpdate = true
|
||||
reason = fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name)
|
||||
}
|
||||
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
|
||||
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
|
||||
needsReplace = true
|
||||
needsRollUpdate = true
|
||||
reason = fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name)
|
||||
}
|
||||
}
|
||||
|
||||
container1 := c.Statefulset.Spec.Template.Spec.Containers[0]
|
||||
container2 := statefulSet.Spec.Template.Spec.Containers[0]
|
||||
if container1.Image != container2.Image {
|
||||
match = false
|
||||
needsRollUpdate = true
|
||||
reason = "new statefulset's container image doesn't match the current one"
|
||||
return
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(container1.Ports, container2.Ports) {
|
||||
match = false
|
||||
needsRollUpdate = true
|
||||
reason = "new statefulset's container ports don't match the current one"
|
||||
return
|
||||
}
|
||||
|
||||
if !compareResources(&container1.Resources, &container2.Resources) {
|
||||
match = false
|
||||
needsRollUpdate = true
|
||||
reason = "new statefulset's container resources don't match the current ones"
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(container1.Env, container2.Env) {
|
||||
match = false
|
||||
needsRollUpdate = true
|
||||
reason = "new statefulset's container environment doesn't match the current one"
|
||||
}
|
||||
|
||||
if needsRollUpdate || needsReplace {
|
||||
match = false
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -373,15 +420,23 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("Can't generate StatefulSet: %s", err)
|
||||
}
|
||||
sameSS, rollingUpdate, reason := c.compareStatefulSetWith(newStatefulSet)
|
||||
|
||||
sameSS, needsReplace, rollingUpdate, reason := c.compareStatefulSetWith(newStatefulSet)
|
||||
|
||||
if !sameSS {
|
||||
c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, reason)
|
||||
//TODO: mind the case of updating allowedSourceRanges
|
||||
if !needsReplace {
|
||||
if err := c.updateStatefulSet(newStatefulSet); err != nil {
|
||||
c.setStatus(spec.ClusterStatusUpdateFailed)
|
||||
return fmt.Errorf("Can't upate StatefulSet: %s", err)
|
||||
}
|
||||
} else {
|
||||
if err := c.replaceStatefulSet(newStatefulSet); err != nil {
|
||||
c.setStatus(spec.ClusterStatusUpdateFailed)
|
||||
return fmt.Errorf("Can't replace StatefulSet: %s", err)
|
||||
}
|
||||
}
|
||||
//TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted
|
||||
c.logger.Infof("StatefulSet '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"encoding/json"
|
||||
"k8s.io/client-go/pkg/api/resource"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
|
|
@ -107,13 +108,22 @@ func (c *Cluster) generateSpiloJSONConfiguration(pg *spec.PostgresqlParam, patro
|
|||
config.Bootstrap.Initdb = []interface{}{map[string]string{"auth-host": "md5"},
|
||||
map[string]string{"auth-local": "trust"}}
|
||||
|
||||
initdbOptionNames := []string{}
|
||||
|
||||
for k := range patroni.InitDB {
|
||||
initdbOptionNames = append(initdbOptionNames, k)
|
||||
}
|
||||
/* We need to sort the user-defined options to more easily compare the resulting specs */
|
||||
sort.Strings(initdbOptionNames)
|
||||
|
||||
// Initdb parameters in the manifest take priority over the default ones
|
||||
// The whole type switch dance is caused by the ability to specify both
|
||||
// maps and normal string items in the array of initdb options. We need
|
||||
// both to convert the initial key-value to strings when necessary, and
|
||||
// to de-duplicate the options supplied.
|
||||
PATRONI_INITDB_PARAMS:
|
||||
for k, v := range patroni.InitDB {
|
||||
for _, k := range initdbOptionNames {
|
||||
v := patroni.InitDB[k]
|
||||
for i, defaultParam := range config.Bootstrap.Initdb {
|
||||
switch defaultParam.(type) {
|
||||
case map[string]string:
|
||||
|
|
@ -127,7 +137,8 @@ PATRONI_INITDB_PARAMS:
|
|||
}
|
||||
case string:
|
||||
{
|
||||
if k == v {
|
||||
/* if the option already occurs in the list */
|
||||
if defaultParam.(string) == v {
|
||||
continue PATRONI_INITDB_PARAMS
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/spec"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
|
||||
|
|
@ -129,6 +130,8 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
|
|||
}
|
||||
statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta)
|
||||
|
||||
c.logger.Debugf("Updating StatefulSet")
|
||||
|
||||
patchData, err := specPatch(newStatefulSet.Spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't form patch for the StatefulSet '%s': %s", statefulSetName, err)
|
||||
|
|
@ -146,6 +149,53 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// replaceStatefulSet deletes an old StatefulSet and creates the new using spec in the PostgreSQL TPR.
|
||||
func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
|
||||
if c.Statefulset == nil {
|
||||
return fmt.Errorf("There is no StatefulSet in the cluster")
|
||||
}
|
||||
|
||||
statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta)
|
||||
c.logger.Debugf("Replacing StatefulSet")
|
||||
|
||||
// Delete the current statefulset without deleting the pods
|
||||
orphanDepencies := true
|
||||
oldStatefulset := c.Statefulset
|
||||
|
||||
options := v1.DeleteOptions{OrphanDependents: &orphanDepencies}
|
||||
if err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options); err != nil {
|
||||
return fmt.Errorf("Can't delete statefulset '%s': %s", statefulSetName, err)
|
||||
}
|
||||
// make sure we clear the stored statefulset status if the subsequent create fails.
|
||||
c.Statefulset = nil
|
||||
// wait until the statefulset is truly deleted
|
||||
c.logger.Debugf("Waiting for the statefulset to be deleted")
|
||||
|
||||
err := retryutil.Retry(constants.StatefulsetDeletionInterval, constants.StatefulsetDeletionTimeout,
|
||||
func() (bool, error) {
|
||||
_, err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name)
|
||||
return err != nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Errorf("could not delete statefulset: %s", err)
|
||||
}
|
||||
|
||||
// create the new statefulset with the desired spec. It would take over the remaining pods.
|
||||
createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't create statefulset '%s': %s", statefulSetName, err)
|
||||
} else {
|
||||
// check that all the previous replicas were picked up.
|
||||
if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas &&
|
||||
createdStatefulset.Status.Replicas != oldStatefulset.Status.Replicas {
|
||||
c.logger.Warnf("Number of pods for the old and updated Statefulsets is not identical")
|
||||
}
|
||||
}
|
||||
c.Statefulset = createdStatefulset
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (c *Cluster) deleteStatefulSet() error {
|
||||
c.logger.Debugln("Deleting StatefulSet")
|
||||
if c.Statefulset == nil {
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ func (c *Cluster) syncEndpoint() error {
|
|||
|
||||
func (c *Cluster) syncStatefulSet() error {
|
||||
cSpec := c.Spec
|
||||
var rollUpdate bool
|
||||
var rollUpdate, needsReplace bool
|
||||
if c.Statefulset == nil {
|
||||
c.logger.Infof("Can't find the cluster's StatefulSet")
|
||||
pods, err := c.listPods()
|
||||
|
|
@ -137,20 +137,27 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
match bool
|
||||
reason string
|
||||
)
|
||||
|
||||
desiredSS, err := c.genStatefulSet(cSpec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't generate StatefulSet: %s", err)
|
||||
}
|
||||
|
||||
match, rollUpdate, reason = c.compareStatefulSetWith(desiredSS)
|
||||
match, needsReplace, rollUpdate, reason = c.compareStatefulSetWith(desiredSS)
|
||||
if match {
|
||||
return nil
|
||||
}
|
||||
c.logStatefulSetChanges(c.Statefulset, desiredSS, false, reason)
|
||||
|
||||
if !needsReplace {
|
||||
if err := c.updateStatefulSet(desiredSS); err != nil {
|
||||
return fmt.Errorf("Can't update StatefulSet: %s", err)
|
||||
}
|
||||
} else {
|
||||
if err := c.replaceStatefulSet(desiredSS); err != nil {
|
||||
return fmt.Errorf("Can't replace StatefulSet: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !rollUpdate {
|
||||
c.logger.Debugln("No rolling update is needed")
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
package constants
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
//Constants
|
||||
TPRName = "postgresql"
|
||||
|
|
@ -22,6 +24,9 @@ const (
|
|||
PodRoleReplica = "replica"
|
||||
SuperuserKeyName = "superuser"
|
||||
ReplicationUserKeyName = "replication"
|
||||
StatefulsetDeletionInterval = 1 * time.Second
|
||||
StatefulsetDeletionTimeout = 30 * time.Second
|
||||
|
||||
RoleFlagSuperuser = "SUPERUSER"
|
||||
RoleFlagInherit = "INHERIT"
|
||||
RoleFlagLogin = "LOGIN"
|
||||
|
|
|
|||
Loading…
Reference in New Issue