Check rolling updates using controller revisions.
Compare pods controller revisions with the one for the statefulset to determine whether the pod is running the latest revision and, therefore, no rolling update is necessary. This is performed only during the operator start, afterwards the rolling update status that is stored locally in the cluster structure is used for all rolling update decisions.
This commit is contained in:
parent
88c68712b6
commit
0618723a61
|
|
@ -71,12 +71,13 @@ type Cluster struct {
|
||||||
deleteOptions *metav1.DeleteOptions
|
deleteOptions *metav1.DeleteOptions
|
||||||
podEventsQueue *cache.FIFO
|
podEventsQueue *cache.FIFO
|
||||||
|
|
||||||
teamsAPIClient teams.Interface
|
teamsAPIClient teams.Interface
|
||||||
oauthTokenGetter OAuthTokenGetter
|
oauthTokenGetter OAuthTokenGetter
|
||||||
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
||||||
currentProcess spec.Process
|
currentProcess spec.Process
|
||||||
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
|
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
|
||||||
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
|
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
|
||||||
|
pendingRollingUpdate *bool // indicates the cluster needs a rolling update
|
||||||
}
|
}
|
||||||
|
|
||||||
type compareStatefulsetResult struct {
|
type compareStatefulsetResult struct {
|
||||||
|
|
@ -109,10 +110,11 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
|
||||||
Secrets: make(map[types.UID]*v1.Secret),
|
Secrets: make(map[types.UID]*v1.Secret),
|
||||||
Services: make(map[PostgresRole]*v1.Service),
|
Services: make(map[PostgresRole]*v1.Service),
|
||||||
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
|
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
|
||||||
userSyncStrategy: users.DefaultUserSyncStrategy{},
|
userSyncStrategy: users.DefaultUserSyncStrategy{},
|
||||||
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
|
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
|
||||||
podEventsQueue: podEventsQueue,
|
podEventsQueue: podEventsQueue,
|
||||||
KubeClient: kubeClient,
|
KubeClient: kubeClient,
|
||||||
|
pendingRollingUpdate: nil,
|
||||||
}
|
}
|
||||||
cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
|
cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
|
||||||
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
|
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
|
||||||
|
|
@ -215,6 +217,7 @@ func (c *Cluster) Create() error {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.setStatus(spec.ClusterStatusCreating)
|
c.setStatus(spec.ClusterStatusCreating)
|
||||||
|
c.setPendingRollingUpgrade(false)
|
||||||
|
|
||||||
for _, role := range []PostgresRole{Master, Replica} {
|
for _, role := range []PostgresRole{Master, Replica} {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,8 @@ func TestInitRobotUsers(t *testing.T) {
|
||||||
{
|
{
|
||||||
manifestUsers: map[string]spec.UserFlags{"foo": {"superuser", "createdb"}},
|
manifestUsers: map[string]spec.UserFlags{"foo": {"superuser", "createdb"}},
|
||||||
infraRoles: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}},
|
infraRoles: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}},
|
||||||
result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}},
|
result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}},
|
||||||
err: nil,
|
err: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
manifestUsers: map[string]spec.UserFlags{"!fooBar": {"superuser", "createdb"}},
|
manifestUsers: map[string]spec.UserFlags{"!fooBar": {"superuser", "createdb"}},
|
||||||
|
|
|
||||||
|
|
@ -249,10 +249,17 @@ func (c *Cluster) syncStatefulSet() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
c.logger.Infof("found pods without the statefulset: trigger rolling update")
|
c.logger.Infof("found pods without the statefulset: trigger rolling update")
|
||||||
|
c.setPendingRollingUpgrade(true)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// statefulset is already there, make sure we use its definition in order to compare with the spec.
|
// statefulset is already there, make sure we use its definition in order to compare with the spec.
|
||||||
c.Statefulset = sset
|
c.Statefulset = sset
|
||||||
|
// resolve the pending rolling upgrade flags as soon as we read an actual statefulset from kubernetes.
|
||||||
|
// we must do it before updating statefulsets; after an update, the statfulset will receive a new
|
||||||
|
// updateRevision, different from the one the pods run with.
|
||||||
|
if err := c.resolvePendingRollingUpdate(sset); err != nil {
|
||||||
|
return fmt.Errorf("could not resolve the rolling upgrade status: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
desiredSS, err := c.generateStatefulSet(&c.Spec)
|
desiredSS, err := c.generateStatefulSet(&c.Spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -260,33 +267,33 @@ func (c *Cluster) syncStatefulSet() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
cmp := c.compareStatefulSetWith(desiredSS)
|
cmp := c.compareStatefulSetWith(desiredSS)
|
||||||
if cmp.match {
|
if !cmp.match {
|
||||||
return nil
|
if cmp.rollingUpdate {
|
||||||
}
|
c.setPendingRollingUpgrade(true)
|
||||||
c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
|
|
||||||
|
|
||||||
if !cmp.replace {
|
|
||||||
if err := c.updateStatefulSet(desiredSS); err != nil {
|
|
||||||
return fmt.Errorf("could not update statefulset: %v", err)
|
|
||||||
}
|
}
|
||||||
} else {
|
c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
|
||||||
if err := c.replaceStatefulSet(desiredSS); err != nil {
|
|
||||||
return fmt.Errorf("could not replace statefulset: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !cmp.rollingUpdate {
|
if !cmp.replace {
|
||||||
c.logger.Debugln("no rolling update is needed")
|
if err := c.updateStatefulSet(desiredSS); err != nil {
|
||||||
return nil
|
return fmt.Errorf("could not update statefulset: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := c.replaceStatefulSet(desiredSS); err != nil {
|
||||||
|
return fmt.Errorf("could not replace statefulset: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// if we get here we also need to re-create the pods (either leftovers from the old
|
// if we get here we also need to re-create the pods (either leftovers from the old
|
||||||
// statefulset or those that got their configuration from the outdated statefulset)
|
// statefulset or those that got their configuration from the outdated statefulset)
|
||||||
c.logger.Debugln("performing rolling update")
|
if *c.pendingRollingUpdate {
|
||||||
if err := c.recreatePods(); err != nil {
|
c.logger.Debugln("performing rolling update")
|
||||||
return fmt.Errorf("could not recreate pods: %v", err)
|
if err := c.recreatePods(); err != nil {
|
||||||
|
return fmt.Errorf("could not recreate pods: %v", err)
|
||||||
|
}
|
||||||
|
c.setPendingRollingUpgrade(false)
|
||||||
|
c.logger.Infof("pods have been recreated")
|
||||||
}
|
}
|
||||||
c.logger.Infof("pods have been recreated")
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,10 @@ func NewSecretOauthTokenGetter(kubeClient *k8sutil.KubernetesClient,
|
||||||
return &SecretOauthTokenGetter{kubeClient, OAuthTokenSecretName}
|
return &SecretOauthTokenGetter{kubeClient, OAuthTokenSecretName}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
podControllerRevisionHashLabel = "controller-revision-hash"
|
||||||
|
)
|
||||||
|
|
||||||
func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) {
|
func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) {
|
||||||
//TODO: we can move this function to the Controller in case it will be needed there. As for now we use it only in the Cluster
|
//TODO: we can move this function to the Controller in case it will be needed there. As for now we use it only in the Cluster
|
||||||
// Temporary getting postgresql-operator secret from the NamespaceDefault
|
// Temporary getting postgresql-operator secret from the NamespaceDefault
|
||||||
|
|
@ -430,3 +434,56 @@ func (c *Cluster) GetSpec() (*spec.Postgresql, error) {
|
||||||
func (c *Cluster) patroniUsesKubernetes() bool {
|
func (c *Cluster) patroniUsesKubernetes() bool {
|
||||||
return c.OpConfig.EtcdHost == ""
|
return c.OpConfig.EtcdHost == ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) setPendingRollingUpgrade(val bool) {
|
||||||
|
if c.pendingRollingUpdate == nil {
|
||||||
|
c.pendingRollingUpdate = new(bool)
|
||||||
|
}
|
||||||
|
*c.pendingRollingUpdate = val
|
||||||
|
c.logger.Debugf("pending rolling upgrade was set to %b", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolvePendingRollingUpdate figures out if rolling upgrade is necessary
|
||||||
|
// based on the states of the cluster statefulset and pods
|
||||||
|
func (c *Cluster) resolvePendingRollingUpdate(sset *v1beta1.StatefulSet) error {
|
||||||
|
// XXX: it looks like we will always trigger a rolling update if the
|
||||||
|
// pods are on a different revision from a statefulset, even if the
|
||||||
|
// statefulset change that caused it didn't require a rolling update
|
||||||
|
// originally.
|
||||||
|
if c.pendingRollingUpdate != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
c.logger.Debugf("evaluating rolling upgrade requirement")
|
||||||
|
effectiveRevision := sset.Status.UpdateRevision
|
||||||
|
if effectiveRevision == "" {
|
||||||
|
if sset.Status.CurrentRevision == "" {
|
||||||
|
c.logger.Debugf("statefulset doesn't have a current revision, no rolling upgrade")
|
||||||
|
// the statefulset does not have a currentRevision, it must be new; hence, no rollingUpdate
|
||||||
|
c.setPendingRollingUpgrade(false)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
effectiveRevision = sset.Status.CurrentRevision
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch all pods related to this cluster
|
||||||
|
pods, err := c.listPods()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// check their revisions
|
||||||
|
for _, pod := range pods {
|
||||||
|
podRevision, present := pod.Labels[podControllerRevisionHashLabel]
|
||||||
|
// empty or missing revision indicates a new pod - doesn't need a rolling upgrade
|
||||||
|
if !present || podRevision == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
c.logger.Debugf("observing pod revision %q vs statefulset revision %q", podRevision, effectiveRevision)
|
||||||
|
if podRevision != effectiveRevision {
|
||||||
|
// pod is on a different revision - trigger the rolling upgrade
|
||||||
|
c.setPendingRollingUpgrade(true)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.setPendingRollingUpgrade(false)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue