From fe47f9ebeadd54639913296735158b42d17ee012 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Thu, 3 May 2018 10:20:24 +0200 Subject: [PATCH] Improve the pod moving behavior during the Kubernetes cluster upgrade. (#281) * Improve the pod moving behavior during the Kubernetes cluster upgrade. Fix an issue of not waiting for at least one replica to become ready (if the Statefulset indicates there are replicas) when moving the master pod off the decomissioned node. Resolves the first part of #279. Small fixes to error messages. * Eliminate a race condition during the swithover. When the operator initiates the failover (switchover) that fails and then retries it for a second time it may happen that the previous waitForPodChannel is still active. As a result, the operator subscribes to the former master pod two times, causing a panic. The problem was that the original code didn't bother to cancel the waitForPodLalbel for the new master pod in the case when the failover fails. This commit fixes it by adding a stop channel to that function. Code review by @zerg-junior --- pkg/cluster/cluster.go | 8 +++-- pkg/cluster/cluster_test.go | 4 +-- pkg/cluster/k8sres.go | 2 +- pkg/cluster/pod.go | 28 +++++++++++++----- pkg/cluster/util.go | 58 +++++++++++++++++++++++++++---------- 5 files changed, 71 insertions(+), 29 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b6251eafc..2260f0e96 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -858,6 +858,7 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus { // ManualFailover does manual failover to a candidate pod func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedName) error { c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate) + podLabelErr := make(chan error) stopCh := make(chan struct{}) defer close(podLabelErr) @@ -868,11 +869,12 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam role := Master - _, err := c.waitForPodLabel(ch, &role) - select { case <-stopCh: - case podLabelErr <- err: + case podLabelErr <- func() error { + _, err := c.waitForPodLabel(ch, stopCh, &role) + return err + }(): } }() diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 823d3baf9..34f64e655 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -34,8 +34,8 @@ func TestInitRobotUsers(t *testing.T) { { manifestUsers: map[string]spec.UserFlags{"foo": {"superuser", "createdb"}}, infraRoles: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}}, - result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}}, - err: nil, + result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}}, + err: nil, }, { manifestUsers: map[string]spec.UserFlags{"!fooBar": {"superuser", "createdb"}}, diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 4e500de29..18350f526 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -707,7 +707,7 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *sp // `enable_load_balancer`` governs LB for a master service // there is no equivalent deprecated operator option for the replica LB if c.OpConfig.EnableLoadBalancer != nil { - c.logger.Debugf("The operator configmap sets the deprecated `enable_load_balancer` param. Consider using the `enable_master_load_balancer` or `enable_replica_load_balancer` instead.", c.Name) + c.logger.Debugf("The operator configmap sets the deprecated `enable_load_balancer` param. Consider using the `enable_master_load_balancer` or `enable_replica_load_balancer` instead.") return *c.OpConfig.EnableLoadBalancer } diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 28960d9c1..432597f7f 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -149,13 +149,19 @@ func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) { } func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) { + + // Wait until at least one replica pod will come up + if err := c.waitForAnyReplicaLabelReady(); err != nil { + c.logger.Warningf("could not find at least one ready replica: %v", err) + } + replicas, err := c.getRolePods(Replica) if err != nil { return nil, fmt.Errorf("could not get replica pods: %v", err) } if len(replicas) == 0 { - c.logger.Warningf("single master pod for cluster %q, migration will cause disruption of the service") + c.logger.Warningf("no available master candidates, migration will cause longer downtime of the master instance") return nil, nil } @@ -168,12 +174,16 @@ func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) { } } } - c.logger.Debug("no available master candidates on live nodes") + c.logger.Warningf("no available master candidates on live nodes") return &replicas[rand.Intn(len(replicas))], nil } // MigrateMasterPod migrates master pod via failover to a replica func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { + var ( + masterCandidatePod *v1.Pod + ) + oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) if err != nil { @@ -193,10 +203,13 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { c.logger.Warningf("pod %q is not a master", podName) return nil } - - masterCandidatePod, err := c.masterCandidate(oldMaster.Spec.NodeName) - if err != nil { - return fmt.Errorf("could not get new master candidate: %v", err) + if *c.Statefulset.Spec.Replicas == 1 { + c.logger.Warningf("single master pod for cluster %q, migration will cause longer downtime of the master instance", c.clusterName()) + } else { + masterCandidatePod, err = c.masterCandidate(oldMaster.Spec.NodeName) + if err != nil { + return fmt.Errorf("could not get new master candidate: %v", err) + } } // there are two cases for each postgres cluster that has its master pod on the node to migrate from: @@ -250,6 +263,7 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) + stopChan := make(chan struct{}) if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil { return nil, fmt.Errorf("could not delete pod: %v", err) @@ -258,7 +272,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { if err := c.waitForPodDeletion(ch); err != nil { return nil, err } - if pod, err := c.waitForPodLabel(ch, nil); err != nil { + if pod, err := c.waitForPodLabel(ch, stopChan, nil); err != nil { return nil, err } else { c.logger.Infof("pod %q has been recreated", podName) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 1220aeb86..b72835311 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -225,7 +225,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) { return teamInfo.Members, nil } -func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRole) (*v1.Pod, error) { +func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, stopChan chan struct{}, role *PostgresRole) (*v1.Pod, error) { timeout := time.After(c.OpConfig.PodLabelWaitTimeout) for { select { @@ -241,6 +241,8 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRo } case <-timeout: return nil, fmt.Errorf("pod label wait timeout") + case <-stopChan: + return nil, fmt.Errorf("pod label wait cancelled") } } } @@ -278,7 +280,10 @@ func (c *Cluster) waitStatefulsetReady() error { }) } -func (c *Cluster) waitPodLabelsReady() error { +func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error { + var ( + podsNumber int + ) ls := c.labelsSet(false) namespace := c.Namespace @@ -295,35 +300,56 @@ func (c *Cluster) waitPodLabelsReady() error { c.OpConfig.PodRoleLabel: string(Replica), }).String(), } - pods, err := c.KubeClient.Pods(namespace).List(listOptions) - if err != nil { - return err + podsNumber = 1 + if !anyReplica { + pods, err := c.KubeClient.Pods(namespace).List(listOptions) + if err != nil { + return err + } + podsNumber = len(pods.Items) + c.logger.Debugf("Waiting for %d pods to become ready", podsNumber) + } else { + c.logger.Debugf("Waiting for any replica pod to become ready") } - podsNumber := len(pods.Items) - err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + err := retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { - masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption) - if err2 != nil { - return false, err2 + masterCount := 0 + if !anyReplica { + masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption) + if err2 != nil { + return false, err2 + } + if len(masterPods.Items) > 1 { + return false, fmt.Errorf("too many masters (%d pods with the master label found)", + len(masterPods.Items)) + } + masterCount = len(masterPods.Items) } replicaPods, err2 := c.KubeClient.Pods(namespace).List(replicaListOption) if err2 != nil { return false, err2 } - if len(masterPods.Items) > 1 { - return false, fmt.Errorf("too many masters") - } - if len(replicaPods.Items) == podsNumber { + replicaCount := len(replicaPods.Items) + if anyReplica && replicaCount > 0 { + c.logger.Debugf("Found %d running replica pods", replicaCount) return true, nil } - return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil + return masterCount+replicaCount >= podsNumber, nil }) return err } +func (c *Cluster) waitForAnyReplicaLabelReady() error { + return c._waitPodLabelsReady(true) +} + +func (c *Cluster) waitForAllPodsLabelReady() error { + return c._waitPodLabelsReady(false) +} + func (c *Cluster) waitStatefulsetPodsReady() error { c.setProcessName("waiting for the pods of the statefulset") // TODO: wait for the first Pod only @@ -332,7 +358,7 @@ func (c *Cluster) waitStatefulsetPodsReady() error { } // TODO: wait only for master - if err := c.waitPodLabelsReady(); err != nil { + if err := c.waitForAllPodsLabelReady(); err != nil { return fmt.Errorf("pod labels error: %v", err) }