add a call to Patroni Api to fetch node state
This commit is contained in:
parent
eb3f7fcf1e
commit
8811dfd278
|
|
@ -294,6 +294,19 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
|||
return pod, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
state, err := c.patroni.GetNodeState(&pod)
|
||||
if err != nil || state != "running" {
|
||||
c.logger.Warningf("cannot re-create pod %s: patroni not in 'running' state", pod.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Cluster) recreatePods() error {
|
||||
c.setProcessName("starting to recreate pods")
|
||||
ls := c.labelsSet(false)
|
||||
|
|
@ -309,6 +322,10 @@ func (c *Cluster) recreatePods() error {
|
|||
}
|
||||
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
|
||||
|
||||
if !c.isSafeToRecreatePods(pods) {
|
||||
return fmt.Errorf("postpone pod recreation until next Sync: some pods are being initilalized and recreation is unsafe")
|
||||
}
|
||||
|
||||
var (
|
||||
masterPod, newMasterPod, newPod *v1.Pod
|
||||
)
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -25,6 +25,31 @@ const (
|
|||
type Interface interface {
|
||||
Switchover(master *v1.Pod, candidate string) error
|
||||
SetPostgresParameters(server *v1.Pod, options map[string]string) error
|
||||
GetNodeState(pod *v1.Pod) (string, error)
|
||||
}
|
||||
|
||||
// HttpGetResponse contains data returned by Get to host/8008
|
||||
type HttpGetResponse struct {
|
||||
State string `json:"type,omitempty"`
|
||||
PostmasterStartTime string `json:"type,omitempty"`
|
||||
Role string `json:"type,omitempty"`
|
||||
ServerVersion int `json:"type,omitempty"`
|
||||
ClusterUnlocked bool `json:"type,omitempty"`
|
||||
Timeline int `json:"type,omitempty"`
|
||||
Xlog Xlog `json:"type,omitempty"`
|
||||
DatabaseSystemIdentifier string `json:"type,omitempty"`
|
||||
patroniInfo Info `json:"type,omitempty"`
|
||||
}
|
||||
|
||||
// Xlog contains wal locaiton
|
||||
type Xlog struct {
|
||||
location int `json:"type,omitempty"`
|
||||
}
|
||||
|
||||
// Info cotains Patroni version and cluser scope
|
||||
type Info struct {
|
||||
version string `json:"type,omitempty"`
|
||||
scope string `json:"type,omitempty"`
|
||||
}
|
||||
|
||||
// Patroni API client
|
||||
|
|
@ -123,3 +148,29 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
|
|||
}
|
||||
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
|
||||
}
|
||||
|
||||
//GetNodeState returns node state reported by Patroni API call.
|
||||
func (p *Patroni) GetNodeState(server *v1.Pod) (string, error) {
|
||||
|
||||
var pResponse HttpGetResponse
|
||||
|
||||
apiURLString, err := apiURL(server)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
response, err := p.httpClient.Get(apiURLString)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not perform Get request: %v", err)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
|
||||
err = json.Unmarshal(body, &pResponse)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not unmarshal response: %v", err)
|
||||
}
|
||||
|
||||
return pResponse.State, nil
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue