diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 805112e29..705d2b987 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -380,54 +380,10 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { return pod, nil } -func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool { - - /* - Operator should not re-create pods if there is at least one replica being bootstrapped - because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag). - - XXX operator cannot forbid replica re-init, so we might still fail if re-init is started - after this check succeeds but before a pod is re-created - */ - for _, pod := range pods { - c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP) - } - - for _, pod := range pods { - - var data patroni.MemberData - - err := retryutil.Retry(1*time.Second, 5*time.Second, - func() (bool, error) { - var err error - data, err = c.patroni.GetMemberData(&pod) - - if err != nil { - return false, err - } - return true, nil - }, - ) - - if err != nil { - c.logger.Errorf("failed to get Patroni state for pod: %s", err) - return false - } else if data.State == "creating replica" { - c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name) - return false - } - } - return true -} - func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error { c.setProcessName("starting to recreate pods") c.logger.Infof("there are %d pods in the cluster to recreate", len(pods)) - if !c.isSafeToRecreatePods(pods) { - return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized") - } - var ( masterPod, newMasterPod *v1.Pod ) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 2df168a6e..8daddae77 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -15,6 +15,8 @@ import ( "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + "github.com/zalando/postgres-operator/pkg/util/patroni" + "github.com/zalando/postgres-operator/pkg/util/retryutil" batchv1beta1 "k8s.io/api/batch/v1beta1" v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" @@ -277,6 +279,7 @@ func (c *Cluster) syncStatefulSet() error { restartMasterFirst bool ) podsToRecreate := make([]v1.Pod, 0) + isSafeToRecreatePods := true switchoverCandidates := make([]spec.NamespacedName, 0) pods, err := c.listPods() @@ -402,18 +405,33 @@ func (c *Cluster) syncStatefulSet() error { // get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used. for i, pod := range pods { - emptyPatroniConfig := acidv1.Patroni{} + var ( + patroniConfig acidv1.Patroni + pgParameters map[string]string + err error + ) podName := util.NameFromMeta(pods[i].ObjectMeta) - patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod) + err = retryutil.Retry(1*time.Second, 5*time.Second, + func() (bool, error) { + var err error + patroniConfig, pgParameters, err = c.patroni.GetConfig(&pod) + + if err != nil { + return false, err + } + return true, nil + }, + ) if err != nil { c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) + isSafeToRecreatePods = false continue } restartWait = patroniConfig.LoopWait // empty config probably means cluster is not fully initialized yet, e.g. restoring from backup // do not attempt a restart - if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { + if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 { restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) if err != nil { c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) @@ -437,50 +455,75 @@ func (c *Cluster) syncStatefulSet() error { remainingPods = append(remainingPods, &pods[i]) continue } - c.restartInstance(&pod, restartWait) + if err = c.restartInstance(&pod, restartWait); err != nil { + c.logger.Errorf("could not restart Postgres in %s pod %s: %v", role, pod.Name, err) + isSafeToRecreatePods = false + } } // in most cases only the master should be left to restart if len(remainingPods) > 0 { for _, remainingPod := range remainingPods { - c.restartInstance(remainingPod, restartWait) + role := PostgresRole(remainingPod.Labels[c.OpConfig.PodRoleLabel]) + if err = c.restartInstance(remainingPod, restartWait); err != nil { + c.logger.Errorf("could not restart Postgres in %s pod %s: %v", role, remainingPod.Name, err) + isSafeToRecreatePods = false + } } } // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) if len(podsToRecreate) > 0 { - c.logger.Debugln("performing rolling update") - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") - if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { - return fmt.Errorf("could not recreate pods: %v", err) + if isSafeToRecreatePods { + c.logger.Debugln("performing rolling update") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") + if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { + return fmt.Errorf("could not recreate pods: %v", err) + } + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") + } else { + c.logger.Warningf("postpone pod recreation until next sync") } - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") } return nil } -func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) { +func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error { + // if the config update requires a restart, call Patroni restart + var memberData patroni.MemberData + err := retryutil.Retry(1*time.Second, 5*time.Second, + func() (bool, error) { + var err error + memberData, err = c.patroni.GetMemberData(pod) + + if err != nil { + return false, err + } + return true, nil + }, + ) + if err != nil { + return fmt.Errorf("could not get member data: %v", err) + } + if memberData.State == "creating replica" { + return fmt.Errorf("replica currently being initialized") + } + podName := util.NameFromMeta(pod.ObjectMeta) role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) - // if the config update requires a restart, call Patroni restart - memberData, err := c.patroni.GetMemberData(pod) - if err != nil { - c.logger.Debugf("could not get member data of %s pod %s - skipping possible restart attempt: %v", role, podName, err) - return - } - // do restart only when it is pending if memberData.PendingRestart { - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, podName)) if err := c.patroni.Restart(pod); err != nil { - c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) - return + return err } time.Sleep(time.Duration(restartWait) * time.Second) - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, podName)) } + + return nil } // AnnotationsToPropagate get the annotations to update if required