diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b6251eafc..2260f0e96 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -858,6 +858,7 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus { // ManualFailover does manual failover to a candidate pod func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedName) error { c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate) + podLabelErr := make(chan error) stopCh := make(chan struct{}) defer close(podLabelErr) @@ -868,11 +869,12 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam role := Master - _, err := c.waitForPodLabel(ch, &role) - select { case <-stopCh: - case podLabelErr <- err: + case podLabelErr <- func() error { + _, err := c.waitForPodLabel(ch, stopCh, &role) + return err + }(): } }() diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 823d3baf9..34f64e655 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -34,8 +34,8 @@ func TestInitRobotUsers(t *testing.T) { { manifestUsers: map[string]spec.UserFlags{"foo": {"superuser", "createdb"}}, infraRoles: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}}, - result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}}, - err: nil, + result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}}, + err: nil, }, { manifestUsers: map[string]spec.UserFlags{"!fooBar": {"superuser", "createdb"}}, diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 4e500de29..18350f526 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -707,7 +707,7 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *sp // `enable_load_balancer`` governs LB for a master service // there is no equivalent deprecated operator option for the replica LB if c.OpConfig.EnableLoadBalancer != nil { - c.logger.Debugf("The operator configmap sets the deprecated `enable_load_balancer` param. Consider using the `enable_master_load_balancer` or `enable_replica_load_balancer` instead.", c.Name) + c.logger.Debugf("The operator configmap sets the deprecated `enable_load_balancer` param. Consider using the `enable_master_load_balancer` or `enable_replica_load_balancer` instead.") return *c.OpConfig.EnableLoadBalancer } diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 28960d9c1..432597f7f 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -149,13 +149,19 @@ func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) { } func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) { + + // Wait until at least one replica pod will come up + if err := c.waitForAnyReplicaLabelReady(); err != nil { + c.logger.Warningf("could not find at least one ready replica: %v", err) + } + replicas, err := c.getRolePods(Replica) if err != nil { return nil, fmt.Errorf("could not get replica pods: %v", err) } if len(replicas) == 0 { - c.logger.Warningf("single master pod for cluster %q, migration will cause disruption of the service") + c.logger.Warningf("no available master candidates, migration will cause longer downtime of the master instance") return nil, nil } @@ -168,12 +174,16 @@ func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) { } } } - c.logger.Debug("no available master candidates on live nodes") + c.logger.Warningf("no available master candidates on live nodes") return &replicas[rand.Intn(len(replicas))], nil } // MigrateMasterPod migrates master pod via failover to a replica func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { + var ( + masterCandidatePod *v1.Pod + ) + oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) if err != nil { @@ -193,10 +203,13 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { c.logger.Warningf("pod %q is not a master", podName) return nil } - - masterCandidatePod, err := c.masterCandidate(oldMaster.Spec.NodeName) - if err != nil { - return fmt.Errorf("could not get new master candidate: %v", err) + if *c.Statefulset.Spec.Replicas == 1 { + c.logger.Warningf("single master pod for cluster %q, migration will cause longer downtime of the master instance", c.clusterName()) + } else { + masterCandidatePod, err = c.masterCandidate(oldMaster.Spec.NodeName) + if err != nil { + return fmt.Errorf("could not get new master candidate: %v", err) + } } // there are two cases for each postgres cluster that has its master pod on the node to migrate from: @@ -250,6 +263,7 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) + stopChan := make(chan struct{}) if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil { return nil, fmt.Errorf("could not delete pod: %v", err) @@ -258,7 +272,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { if err := c.waitForPodDeletion(ch); err != nil { return nil, err } - if pod, err := c.waitForPodLabel(ch, nil); err != nil { + if pod, err := c.waitForPodLabel(ch, stopChan, nil); err != nil { return nil, err } else { c.logger.Infof("pod %q has been recreated", podName) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 1220aeb86..b72835311 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -225,7 +225,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) { return teamInfo.Members, nil } -func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRole) (*v1.Pod, error) { +func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, stopChan chan struct{}, role *PostgresRole) (*v1.Pod, error) { timeout := time.After(c.OpConfig.PodLabelWaitTimeout) for { select { @@ -241,6 +241,8 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRo } case <-timeout: return nil, fmt.Errorf("pod label wait timeout") + case <-stopChan: + return nil, fmt.Errorf("pod label wait cancelled") } } } @@ -278,7 +280,10 @@ func (c *Cluster) waitStatefulsetReady() error { }) } -func (c *Cluster) waitPodLabelsReady() error { +func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error { + var ( + podsNumber int + ) ls := c.labelsSet(false) namespace := c.Namespace @@ -295,35 +300,56 @@ func (c *Cluster) waitPodLabelsReady() error { c.OpConfig.PodRoleLabel: string(Replica), }).String(), } - pods, err := c.KubeClient.Pods(namespace).List(listOptions) - if err != nil { - return err + podsNumber = 1 + if !anyReplica { + pods, err := c.KubeClient.Pods(namespace).List(listOptions) + if err != nil { + return err + } + podsNumber = len(pods.Items) + c.logger.Debugf("Waiting for %d pods to become ready", podsNumber) + } else { + c.logger.Debugf("Waiting for any replica pod to become ready") } - podsNumber := len(pods.Items) - err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + err := retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { - masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption) - if err2 != nil { - return false, err2 + masterCount := 0 + if !anyReplica { + masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption) + if err2 != nil { + return false, err2 + } + if len(masterPods.Items) > 1 { + return false, fmt.Errorf("too many masters (%d pods with the master label found)", + len(masterPods.Items)) + } + masterCount = len(masterPods.Items) } replicaPods, err2 := c.KubeClient.Pods(namespace).List(replicaListOption) if err2 != nil { return false, err2 } - if len(masterPods.Items) > 1 { - return false, fmt.Errorf("too many masters") - } - if len(replicaPods.Items) == podsNumber { + replicaCount := len(replicaPods.Items) + if anyReplica && replicaCount > 0 { + c.logger.Debugf("Found %d running replica pods", replicaCount) return true, nil } - return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil + return masterCount+replicaCount >= podsNumber, nil }) return err } +func (c *Cluster) waitForAnyReplicaLabelReady() error { + return c._waitPodLabelsReady(true) +} + +func (c *Cluster) waitForAllPodsLabelReady() error { + return c._waitPodLabelsReady(false) +} + func (c *Cluster) waitStatefulsetPodsReady() error { c.setProcessName("waiting for the pods of the statefulset") // TODO: wait for the first Pod only @@ -332,7 +358,7 @@ func (c *Cluster) waitStatefulsetPodsReady() error { } // TODO: wait only for master - if err := c.waitPodLabelsReady(); err != nil { + if err := c.waitForAllPodsLabelReady(); err != nil { return fmt.Errorf("pod labels error: %v", err) }