move retry reads against Patroni API to pod.go
This commit is contained in:
parent
88a263db13
commit
37b3db9df7
|
|
@ -13,6 +13,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
||||
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||
"github.com/zalando/postgres-operator/pkg/spec"
|
||||
"github.com/zalando/postgres-operator/pkg/util"
|
||||
"github.com/zalando/postgres-operator/pkg/util/patroni"
|
||||
|
|
@ -349,6 +350,54 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) getPatroniConfig(pod *v1.Pod) (acidv1.Patroni, map[string]string, error) {
|
||||
var (
|
||||
patroniConfig acidv1.Patroni
|
||||
pgParameters map[string]string
|
||||
)
|
||||
podName := util.NameFromMeta(pod.ObjectMeta)
|
||||
err := retryutil.Retry(1*time.Second, 5*time.Second,
|
||||
func() (bool, error) {
|
||||
var err error
|
||||
patroniConfig, pgParameters, err = c.patroni.GetConfig(pod)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return acidv1.Patroni{}, nil, fmt.Errorf("could not get Postgres config from pod %s: %v", podName, err)
|
||||
}
|
||||
|
||||
return patroniConfig, pgParameters, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) {
|
||||
var memberData patroni.MemberData
|
||||
err := retryutil.Retry(1*time.Second, 5*time.Second,
|
||||
func() (bool, error) {
|
||||
var err error
|
||||
memberData, err = c.patroni.GetMemberData(pod)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return patroni.MemberData{}, fmt.Errorf("could not get member data: %v", err)
|
||||
}
|
||||
if memberData.State == "creating replica" {
|
||||
return patroni.MemberData{}, fmt.Errorf("replica currently being initialized")
|
||||
}
|
||||
|
||||
return memberData, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
||||
ch := c.registerPodSubscriber(podName)
|
||||
defer c.unregisterPodSubscriber(podName)
|
||||
|
|
|
|||
|
|
@ -15,8 +15,6 @@ import (
|
|||
"github.com/zalando/postgres-operator/pkg/util"
|
||||
"github.com/zalando/postgres-operator/pkg/util/constants"
|
||||
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
|
||||
"github.com/zalando/postgres-operator/pkg/util/patroni"
|
||||
"github.com/zalando/postgres-operator/pkg/util/retryutil"
|
||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policybeta1 "k8s.io/api/policy/v1beta1"
|
||||
|
|
@ -405,25 +403,9 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
|
||||
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
|
||||
for i, pod := range pods {
|
||||
var (
|
||||
patroniConfig acidv1.Patroni
|
||||
pgParameters map[string]string
|
||||
err error
|
||||
)
|
||||
podName := util.NameFromMeta(pods[i].ObjectMeta)
|
||||
err = retryutil.Retry(1*time.Second, 5*time.Second,
|
||||
func() (bool, error) {
|
||||
var err error
|
||||
patroniConfig, pgParameters, err = c.patroni.GetConfig(&pod)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
},
|
||||
)
|
||||
patroniConfig, pgParameters, err := c.getPatroniConfig(&pod)
|
||||
if err != nil {
|
||||
c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err)
|
||||
c.logger.Warningf("%v", err)
|
||||
isSafeToRecreatePods = false
|
||||
continue
|
||||
}
|
||||
|
|
@ -434,7 +416,7 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 {
|
||||
restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters)
|
||||
if err != nil {
|
||||
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
|
||||
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err)
|
||||
continue
|
||||
}
|
||||
// it could take up to LoopWait to apply the config
|
||||
|
|
@ -456,7 +438,7 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
continue
|
||||
}
|
||||
if err = c.restartInstance(&pod, restartWait); err != nil {
|
||||
c.logger.Errorf("could not restart Postgres in %s pod %s: %v", role, pod.Name, err)
|
||||
c.logger.Errorf("%v", err)
|
||||
isSafeToRecreatePods = false
|
||||
}
|
||||
}
|
||||
|
|
@ -464,9 +446,8 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
// in most cases only the master should be left to restart
|
||||
if len(remainingPods) > 0 {
|
||||
for _, remainingPod := range remainingPods {
|
||||
role := PostgresRole(remainingPod.Labels[c.OpConfig.PodRoleLabel])
|
||||
if err = c.restartInstance(remainingPod, restartWait); err != nil {
|
||||
c.logger.Errorf("could not restart Postgres in %s pod %s: %v", role, remainingPod.Name, err)
|
||||
c.logger.Errorf("%v", err)
|
||||
isSafeToRecreatePods = false
|
||||
}
|
||||
}
|
||||
|
|
@ -491,27 +472,12 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
|
||||
func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error {
|
||||
// if the config update requires a restart, call Patroni restart
|
||||
var memberData patroni.MemberData
|
||||
err := retryutil.Retry(1*time.Second, 5*time.Second,
|
||||
func() (bool, error) {
|
||||
var err error
|
||||
memberData, err = c.patroni.GetMemberData(pod)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get member data: %v", err)
|
||||
}
|
||||
if memberData.State == "creating replica" {
|
||||
return fmt.Errorf("replica currently being initialized")
|
||||
}
|
||||
|
||||
podName := util.NameFromMeta(pod.ObjectMeta)
|
||||
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
|
||||
memberData, err := c.getPatroniMemberData(pod)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not restart Postgres in %s pod %s: %v", role, podName, err)
|
||||
}
|
||||
|
||||
// do restart only when it is pending
|
||||
if memberData.PendingRestart {
|
||||
|
|
|
|||
Loading…
Reference in New Issue