do not recreate pods if previous Patroni API calls fail
This commit is contained in:
parent
a78a619e90
commit
88a263db13
|
|
@ -380,54 +380,10 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
||||||
return pod, nil
|
return pod, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool {
|
|
||||||
|
|
||||||
/*
|
|
||||||
Operator should not re-create pods if there is at least one replica being bootstrapped
|
|
||||||
because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag).
|
|
||||||
|
|
||||||
XXX operator cannot forbid replica re-init, so we might still fail if re-init is started
|
|
||||||
after this check succeeds but before a pod is re-created
|
|
||||||
*/
|
|
||||||
for _, pod := range pods {
|
|
||||||
c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, pod := range pods {
|
|
||||||
|
|
||||||
var data patroni.MemberData
|
|
||||||
|
|
||||||
err := retryutil.Retry(1*time.Second, 5*time.Second,
|
|
||||||
func() (bool, error) {
|
|
||||||
var err error
|
|
||||||
data, err = c.patroni.GetMemberData(&pod)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Errorf("failed to get Patroni state for pod: %s", err)
|
|
||||||
return false
|
|
||||||
} else if data.State == "creating replica" {
|
|
||||||
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error {
|
func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error {
|
||||||
c.setProcessName("starting to recreate pods")
|
c.setProcessName("starting to recreate pods")
|
||||||
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods))
|
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods))
|
||||||
|
|
||||||
if !c.isSafeToRecreatePods(pods) {
|
|
||||||
return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized")
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
masterPod, newMasterPod *v1.Pod
|
masterPod, newMasterPod *v1.Pod
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,8 @@ import (
|
||||||
"github.com/zalando/postgres-operator/pkg/util"
|
"github.com/zalando/postgres-operator/pkg/util"
|
||||||
"github.com/zalando/postgres-operator/pkg/util/constants"
|
"github.com/zalando/postgres-operator/pkg/util/constants"
|
||||||
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
|
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
|
||||||
|
"github.com/zalando/postgres-operator/pkg/util/patroni"
|
||||||
|
"github.com/zalando/postgres-operator/pkg/util/retryutil"
|
||||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
policybeta1 "k8s.io/api/policy/v1beta1"
|
policybeta1 "k8s.io/api/policy/v1beta1"
|
||||||
|
|
@ -277,6 +279,7 @@ func (c *Cluster) syncStatefulSet() error {
|
||||||
restartMasterFirst bool
|
restartMasterFirst bool
|
||||||
)
|
)
|
||||||
podsToRecreate := make([]v1.Pod, 0)
|
podsToRecreate := make([]v1.Pod, 0)
|
||||||
|
isSafeToRecreatePods := true
|
||||||
switchoverCandidates := make([]spec.NamespacedName, 0)
|
switchoverCandidates := make([]spec.NamespacedName, 0)
|
||||||
|
|
||||||
pods, err := c.listPods()
|
pods, err := c.listPods()
|
||||||
|
|
@ -402,18 +405,33 @@ func (c *Cluster) syncStatefulSet() error {
|
||||||
// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
|
// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
|
||||||
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
|
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
|
||||||
for i, pod := range pods {
|
for i, pod := range pods {
|
||||||
emptyPatroniConfig := acidv1.Patroni{}
|
var (
|
||||||
|
patroniConfig acidv1.Patroni
|
||||||
|
pgParameters map[string]string
|
||||||
|
err error
|
||||||
|
)
|
||||||
podName := util.NameFromMeta(pods[i].ObjectMeta)
|
podName := util.NameFromMeta(pods[i].ObjectMeta)
|
||||||
patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod)
|
err = retryutil.Retry(1*time.Second, 5*time.Second,
|
||||||
|
func() (bool, error) {
|
||||||
|
var err error
|
||||||
|
patroniConfig, pgParameters, err = c.patroni.GetConfig(&pod)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
},
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err)
|
c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err)
|
||||||
|
isSafeToRecreatePods = false
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
restartWait = patroniConfig.LoopWait
|
restartWait = patroniConfig.LoopWait
|
||||||
|
|
||||||
// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
|
// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
|
||||||
// do not attempt a restart
|
// do not attempt a restart
|
||||||
if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 {
|
if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 {
|
||||||
restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters)
|
restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
|
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
|
||||||
|
|
@ -437,50 +455,75 @@ func (c *Cluster) syncStatefulSet() error {
|
||||||
remainingPods = append(remainingPods, &pods[i])
|
remainingPods = append(remainingPods, &pods[i])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c.restartInstance(&pod, restartWait)
|
if err = c.restartInstance(&pod, restartWait); err != nil {
|
||||||
|
c.logger.Errorf("could not restart Postgres in %s pod %s: %v", role, pod.Name, err)
|
||||||
|
isSafeToRecreatePods = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// in most cases only the master should be left to restart
|
// in most cases only the master should be left to restart
|
||||||
if len(remainingPods) > 0 {
|
if len(remainingPods) > 0 {
|
||||||
for _, remainingPod := range remainingPods {
|
for _, remainingPod := range remainingPods {
|
||||||
c.restartInstance(remainingPod, restartWait)
|
role := PostgresRole(remainingPod.Labels[c.OpConfig.PodRoleLabel])
|
||||||
|
if err = c.restartInstance(remainingPod, restartWait); err != nil {
|
||||||
|
c.logger.Errorf("could not restart Postgres in %s pod %s: %v", role, remainingPod.Name, err)
|
||||||
|
isSafeToRecreatePods = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we get here we also need to re-create the pods (either leftovers from the old
|
// if we get here we also need to re-create the pods (either leftovers from the old
|
||||||
// statefulset or those that got their configuration from the outdated statefulset)
|
// statefulset or those that got their configuration from the outdated statefulset)
|
||||||
if len(podsToRecreate) > 0 {
|
if len(podsToRecreate) > 0 {
|
||||||
|
if isSafeToRecreatePods {
|
||||||
c.logger.Debugln("performing rolling update")
|
c.logger.Debugln("performing rolling update")
|
||||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
|
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
|
||||||
if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil {
|
if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil {
|
||||||
return fmt.Errorf("could not recreate pods: %v", err)
|
return fmt.Errorf("could not recreate pods: %v", err)
|
||||||
}
|
}
|
||||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
|
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
|
||||||
|
} else {
|
||||||
|
c.logger.Warningf("postpone pod recreation until next sync")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) {
|
func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error {
|
||||||
|
// if the config update requires a restart, call Patroni restart
|
||||||
|
var memberData patroni.MemberData
|
||||||
|
err := retryutil.Retry(1*time.Second, 5*time.Second,
|
||||||
|
func() (bool, error) {
|
||||||
|
var err error
|
||||||
|
memberData, err = c.patroni.GetMemberData(pod)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get member data: %v", err)
|
||||||
|
}
|
||||||
|
if memberData.State == "creating replica" {
|
||||||
|
return fmt.Errorf("replica currently being initialized")
|
||||||
|
}
|
||||||
|
|
||||||
podName := util.NameFromMeta(pod.ObjectMeta)
|
podName := util.NameFromMeta(pod.ObjectMeta)
|
||||||
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
|
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
|
||||||
|
|
||||||
// if the config update requires a restart, call Patroni restart
|
|
||||||
memberData, err := c.patroni.GetMemberData(pod)
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Debugf("could not get member data of %s pod %s - skipping possible restart attempt: %v", role, podName, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// do restart only when it is pending
|
// do restart only when it is pending
|
||||||
if memberData.PendingRestart {
|
if memberData.PendingRestart {
|
||||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name))
|
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, podName))
|
||||||
if err := c.patroni.Restart(pod); err != nil {
|
if err := c.patroni.Restart(pod); err != nil {
|
||||||
c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
time.Sleep(time.Duration(restartWait) * time.Second)
|
time.Sleep(time.Duration(restartWait) * time.Second)
|
||||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name))
|
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, podName))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AnnotationsToPropagate get the annotations to update if required
|
// AnnotationsToPropagate get the annotations to update if required
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue