diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 3ca6d79f0..6fb2b7eee 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -668,3 +668,37 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus { Error: c.Error, } } + +// ManualFailover does manual failover to a candidate pod +func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedName) error { + c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate) + podLabelErr := make(chan error) + stopCh := make(chan struct{}) + defer close(podLabelErr) + + go func() { + ch := c.registerPodSubscriber(candidate) + defer c.unregisterPodSubscriber(candidate) + + role := Master + + select { + case <-stopCh: + case podLabelErr <- c.waitForPodLabel(ch, &role): + } + }() + + if err := c.patroni.Failover(curMaster, candidate.Name); err != nil { + close(stopCh) + return fmt.Errorf("could not failover: %v", err) + } + c.logger.Debugf("successfully failed over from %q to %q", curMaster.Name, candidate) + + defer close(stopCh) + + if err := <-podLabelErr; err != nil { + return fmt.Errorf("could not get master pod label: %v", err) + } + + return nil +} diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index aa595d914..e48739ae2 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -104,7 +104,7 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { if err := c.waitForPodDeletion(ch); err != nil { return err } - if err := c.waitForPodLabel(ch); err != nil { + if err := c.waitForPodLabel(ch, nil); err != nil { return err } c.logger.Infof("pod %q is ready", podName) @@ -127,6 +127,7 @@ func (c *Cluster) recreatePods() error { c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items)) var masterPod v1.Pod + replicas := make([]spec.NamespacedName, 0) for _, pod := range pods.Items { role := c.podSpiloRole(&pod) @@ -138,11 +139,17 @@ func (c *Cluster) recreatePods() error { if err := c.recreatePod(pod); err != nil { return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) } + + replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) } + if masterPod.Name == "" { c.logger.Warningln("no master pod in the cluster") } else { - //TODO: do manual failover + err := c.ManualFailover(&masterPod, masterCandidate(replicas)) + if err != nil { + return fmt.Errorf("could not perform manual failover: %v", err) + } //TODO: specify master, leave new master empty c.logger.Infof("recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta)) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 1e79c1ae5..7f5c1cd6b 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -3,6 +3,7 @@ package cluster import ( "encoding/json" "fmt" + "math/rand" "strings" "time" @@ -182,15 +183,17 @@ func (c *Cluster) getTeamMembers() ([]string, error) { return teamInfo.Members, nil } -func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent) error { +func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRole) error { for { select { case podEvent := <-podEvents: - role := c.podSpiloRole(podEvent.CurPod) - // We cannot assume any role of the newly created pod. Normally, for a multi-pod cluster - // we should observe the 'replica' value, but it could be that some pods are not allowed - // to promote, therefore, the new pod could be a master as well. - if role == constants.PodRoleMaster || role == constants.PodRoleReplica { + podRole := PostgresRole(podEvent.CurPod.Labels[c.OpConfig.PodRoleLabel]) + + if role == nil { + if podRole == Master || podRole == Replica { + return nil + } + } else if *role == podRole { return nil } case <-time.After(c.OpConfig.PodLabelWaitTimeout): @@ -342,3 +345,7 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st func (c *Cluster) podSpiloRole(pod *v1.Pod) string { return pod.Labels[c.OpConfig.PodRoleLabel] } + +func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName { + return replicas[rand.Intn(len(replicas))] +}