diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 229648dd1..b5415dc4e 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sort" "strconv" "time" @@ -459,8 +460,13 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp // 1. we have not observed a new master pod when re-creating former replicas // 2. we know possible switchover targets even when no replicas were recreated if newMasterPod == nil && len(replicas) > 0 { - if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil { - c.logger.Warningf("could not perform switch over: %v", err) + masterCandidate, err := c.getSwitchoverCandidate(masterPod) + if err != nil { + // do not recreate master now so it will keep the update flag and switchover will be retried on next sync + return fmt.Errorf("skipping switchover: %v", err) + } + if err := c.Switchover(masterPod, masterCandidate); err != nil { + return fmt.Errorf("could not perform switch over: %v", err) } } else if newMasterPod == nil && len(replicas) == 0 { c.logger.Warningf("cannot perform switch over before re-creating the pod: no replicas") @@ -475,6 +481,50 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp return nil } +func (c *Cluster) getSwitchoverCandidate(master *v1.Pod) (spec.NamespacedName, error) { + + var members []patroni.ClusterMember + candidates := make([]patroni.ClusterMember, 0) + syncCandidates := make([]patroni.ClusterMember, 0) + + err := retryutil.Retry(1*time.Second, 5*time.Second, + func() (bool, error) { + var err error + members, err = c.patroni.GetClusterMembers(master) + + if err != nil { + return false, err + } + return true, nil + }, + ) + if err != nil { + return spec.NamespacedName{}, fmt.Errorf("failed to get Patroni cluster members: %s", err) + } + + for _, member := range members { + if PostgresRole(member.Role) != Leader && PostgresRole(member.Role) != StandbyLeader && member.State == "running" { + candidates = append(candidates, member) + if PostgresRole(member.Role) == SyncStandby { + syncCandidates = append(syncCandidates, member) + } + } + } + + // pick candidate with lowest lag + // if sync_standby replicas were found assume synchronous_mode is enabled and ignore other candidates list + if len(syncCandidates) > 0 { + sort.Slice(syncCandidates, func(i, j int) bool { return syncCandidates[i].LagInMb < syncCandidates[j].LagInMb }) + return spec.NamespacedName{Namespace: master.Namespace, Name: syncCandidates[0].Name}, nil + } + if len(candidates) > 0 { + sort.Slice(candidates, func(i, j int) bool { return candidates[i].LagInMb < candidates[j].LagInMb }) + return spec.NamespacedName{Namespace: master.Namespace, Name: candidates[0].Name}, nil + } + + return spec.NamespacedName{}, fmt.Errorf("no switchover candidate found") +} + func (c *Cluster) podIsEndOfLife(pod *v1.Pod) (bool, error) { node, err := c.KubeClient.Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{}) if err != nil { diff --git a/pkg/cluster/pod_test.go b/pkg/cluster/pod_test.go new file mode 100644 index 000000000..a533ebafd --- /dev/null +++ b/pkg/cluster/pod_test.go @@ -0,0 +1,86 @@ +package cluster + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "testing" + + "github.com/golang/mock/gomock" + "github.com/zalando/postgres-operator/mocks" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/spec" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + "github.com/zalando/postgres-operator/pkg/util/patroni" +) + +func TestGetSwitchoverCandidate(t *testing.T) { + testName := "test getting right switchover candidate" + namespace := "default" + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var cluster = New(Config{}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + + // simulate different member scenarios + tests := []struct { + subtest string + clusterJson string + expectedCandidate spec.NamespacedName + expectedError error + }{ + { + subtest: "choose sync_standby over replica", + clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "sync_standby", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 0}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 0}]}`, + expectedCandidate: spec.NamespacedName{Namespace: namespace, Name: "acid-test-cluster-1"}, + expectedError: nil, + }, + { + subtest: "choose replica with lowest lag", + clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "replica", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 5}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 2}]}`, + expectedCandidate: spec.NamespacedName{Namespace: namespace, Name: "acid-test-cluster-2"}, + expectedError: nil, + }, + { + subtest: "choose first replica when lag is equal evrywhere", + clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "replica", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 5}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 5}]}`, + expectedCandidate: spec.NamespacedName{Namespace: namespace, Name: "acid-test-cluster-1"}, + expectedError: nil, + }, + { + subtest: "no running replica available", + clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 2}, {"name": "acid-test-cluster-1", "role": "replica", "state": "starting", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 2}]}`, + expectedCandidate: spec.NamespacedName{}, + expectedError: fmt.Errorf("no switchover candidate found"), + }, + } + + for _, tt := range tests { + // mocking cluster members + r := ioutil.NopCloser(bytes.NewReader([]byte(tt.clusterJson))) + + response := http.Response{ + StatusCode: 200, + Body: r, + } + + mockClient := mocks.NewMockHTTPClient(ctrl) + mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil).AnyTimes() + + p := patroni.New(patroniLogger, mockClient) + cluster.patroni = p + mockMasterPod := newMockPod("192.168.100.1") + mockMasterPod.Namespace = namespace + + candidate, err := cluster.getSwitchoverCandidate(mockMasterPod) + if err != nil && err.Error() != tt.expectedError.Error() { + t.Errorf("%s - %s: unexpected error, %v", testName, tt.subtest, err) + } + + if candidate != tt.expectedCandidate { + t.Errorf("%s - %s: unexpect switchover candidate, got %s, expected %s", testName, tt.subtest, candidate, tt.expectedCandidate) + } + } +} diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 199914ccc..67b4ee395 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -14,11 +14,14 @@ import ( type PostgresRole string const ( - // Master role - Master PostgresRole = "master" - - // Replica role + // spilo roles + Master PostgresRole = "master" Replica PostgresRole = "replica" + + // roles returned by Patroni cluster endpoint + Leader PostgresRole = "leader" + StandbyLeader PostgresRole = "standby_leader" + SyncStandby PostgresRole = "sync_standby" ) // PodEventType represents the type of a pod-related event diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 4350f9d56..43c3282d4 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -6,7 +6,6 @@ import ( "encoding/gob" "encoding/json" "fmt" - "math/rand" "reflect" "sort" "strings" @@ -525,10 +524,6 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st "tprgroup", acidzalando.GroupName) } -func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName { - return replicas[rand.Intn(len(replicas))] -} - func cloneSpec(from *acidv1.Postgresql) (*acidv1.Postgresql, error) { var ( buf bytes.Buffer diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index 4de3b8201..f030659ca 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -21,6 +21,7 @@ import ( const ( failoverPath = "/failover" configPath = "/config" + clusterPath = "/cluster" statusPath = "/patroni" restartPath = "/restart" apiPort = 8008 @@ -29,6 +30,7 @@ const ( // Interface describe patroni methods type Interface interface { + GetClusterMembers(master *v1.Pod) ([]ClusterMember, error) Switchover(master *v1.Pod, candidate string) error SetPostgresParameters(server *v1.Pod, options map[string]string) error GetMemberData(server *v1.Pod) (MemberData, error) @@ -175,6 +177,20 @@ func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) } +// ClusterMembers array of cluster members from Patroni API +type ClusterMembers struct { + Members []ClusterMember `json:"members"` +} + +// ClusterMember cluster member data from Patroni API +type ClusterMember struct { + Name string `json:"name"` + Role string `json:"role"` + State string `json:"state"` + Timeline int `json:"timeline"` + LagInMb int `json:"lag"` +} + // MemberDataPatroni child element type MemberDataPatroni struct { Version string `json:"version"` @@ -246,6 +262,27 @@ func (p *Patroni) Restart(server *v1.Pod) error { return nil } +// GetClusterMembers read cluster data from patroni API +func (p *Patroni) GetClusterMembers(server *v1.Pod) ([]ClusterMember, error) { + + apiURLString, err := apiURL(server) + if err != nil { + return []ClusterMember{}, err + } + body, err := p.httpGet(apiURLString + clusterPath) + if err != nil { + return []ClusterMember{}, err + } + + data := ClusterMembers{} + err = json.Unmarshal([]byte(body), &data) + if err != nil { + return []ClusterMember{}, err + } + + return data.Members, nil +} + // GetMemberData read member data from patroni API func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) { diff --git a/pkg/util/patroni/patroni_test.go b/pkg/util/patroni/patroni_test.go index 5a6b2657c..48d27c8dc 100644 --- a/pkg/util/patroni/patroni_test.go +++ b/pkg/util/patroni/patroni_test.go @@ -85,6 +85,55 @@ func TestApiURL(t *testing.T) { } } +func TestGetClusterMembers(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + expectedClusterMemberData := []ClusterMember{ + { + Name: "acid-test-cluster-0", + Role: "leader", + State: "running", + Timeline: 1, + LagInMb: 0, + }, { + Name: "acid-test-cluster-1", + Role: "sync_standby", + State: "running", + Timeline: 1, + LagInMb: 0, + }, { + Name: "acid-test-cluster-2", + Role: "replica", + State: "running", + Timeline: 1, + LagInMb: 0, + }} + + json := `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "sync_standby", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 0}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 0}]}` + r := ioutil.NopCloser(bytes.NewReader([]byte(json))) + + response := http.Response{ + StatusCode: 200, + Body: r, + } + + mockClient := mocks.NewMockHTTPClient(ctrl) + mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil) + + p := New(logger, mockClient) + + clusterMemberData, err := p.GetClusterMembers(newMockPod("192.168.100.1")) + + if !reflect.DeepEqual(expectedClusterMemberData, clusterMemberData) { + t.Errorf("Patroni cluster members differ: expected: %#v, got: %#v", expectedClusterMemberData, clusterMemberData) + } + + if err != nil { + t.Errorf("Could not read Patroni data: %v", err) + } +} + func TestGetMemberData(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish()