Improve the pod moving behavior during the Kubernetes cluster upgrade. (#281)
* Improve the pod moving behavior during the Kubernetes cluster upgrade. Fix an issue of not waiting for at least one replica to become ready (if the Statefulset indicates there are replicas) when moving the master pod off the decomissioned node. Resolves the first part of #279. Small fixes to error messages. * Eliminate a race condition during the swithover. When the operator initiates the failover (switchover) that fails and then retries it for a second time it may happen that the previous waitForPodChannel is still active. As a result, the operator subscribes to the former master pod two times, causing a panic. The problem was that the original code didn't bother to cancel the waitForPodLalbel for the new master pod in the case when the failover fails. This commit fixes it by adding a stop channel to that function. Code review by @zerg-junior
This commit is contained in:
parent
ebff820fcc
commit
fe47f9ebea
|
|
@ -858,6 +858,7 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus {
|
||||||
// ManualFailover does manual failover to a candidate pod
|
// ManualFailover does manual failover to a candidate pod
|
||||||
func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedName) error {
|
func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedName) error {
|
||||||
c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate)
|
c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate)
|
||||||
|
|
||||||
podLabelErr := make(chan error)
|
podLabelErr := make(chan error)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(podLabelErr)
|
defer close(podLabelErr)
|
||||||
|
|
@ -868,11 +869,12 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam
|
||||||
|
|
||||||
role := Master
|
role := Master
|
||||||
|
|
||||||
_, err := c.waitForPodLabel(ch, &role)
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
case podLabelErr <- err:
|
case podLabelErr <- func() error {
|
||||||
|
_, err := c.waitForPodLabel(ch, stopCh, &role)
|
||||||
|
return err
|
||||||
|
}():
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,8 @@ func TestInitRobotUsers(t *testing.T) {
|
||||||
{
|
{
|
||||||
manifestUsers: map[string]spec.UserFlags{"foo": {"superuser", "createdb"}},
|
manifestUsers: map[string]spec.UserFlags{"foo": {"superuser", "createdb"}},
|
||||||
infraRoles: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}},
|
infraRoles: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}},
|
||||||
result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}},
|
result: map[string]spec.PgUser{"foo": {Origin: spec.RoleOriginInfrastructure, Name: "foo", Password: "bar"}},
|
||||||
err: nil,
|
err: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
manifestUsers: map[string]spec.UserFlags{"!fooBar": {"superuser", "createdb"}},
|
manifestUsers: map[string]spec.UserFlags{"!fooBar": {"superuser", "createdb"}},
|
||||||
|
|
|
||||||
|
|
@ -707,7 +707,7 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *sp
|
||||||
// `enable_load_balancer`` governs LB for a master service
|
// `enable_load_balancer`` governs LB for a master service
|
||||||
// there is no equivalent deprecated operator option for the replica LB
|
// there is no equivalent deprecated operator option for the replica LB
|
||||||
if c.OpConfig.EnableLoadBalancer != nil {
|
if c.OpConfig.EnableLoadBalancer != nil {
|
||||||
c.logger.Debugf("The operator configmap sets the deprecated `enable_load_balancer` param. Consider using the `enable_master_load_balancer` or `enable_replica_load_balancer` instead.", c.Name)
|
c.logger.Debugf("The operator configmap sets the deprecated `enable_load_balancer` param. Consider using the `enable_master_load_balancer` or `enable_replica_load_balancer` instead.")
|
||||||
return *c.OpConfig.EnableLoadBalancer
|
return *c.OpConfig.EnableLoadBalancer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -149,13 +149,19 @@ func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) {
|
func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) {
|
||||||
|
|
||||||
|
// Wait until at least one replica pod will come up
|
||||||
|
if err := c.waitForAnyReplicaLabelReady(); err != nil {
|
||||||
|
c.logger.Warningf("could not find at least one ready replica: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
replicas, err := c.getRolePods(Replica)
|
replicas, err := c.getRolePods(Replica)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get replica pods: %v", err)
|
return nil, fmt.Errorf("could not get replica pods: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(replicas) == 0 {
|
if len(replicas) == 0 {
|
||||||
c.logger.Warningf("single master pod for cluster %q, migration will cause disruption of the service")
|
c.logger.Warningf("no available master candidates, migration will cause longer downtime of the master instance")
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -168,12 +174,16 @@ func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.logger.Debug("no available master candidates on live nodes")
|
c.logger.Warningf("no available master candidates on live nodes")
|
||||||
return &replicas[rand.Intn(len(replicas))], nil
|
return &replicas[rand.Intn(len(replicas))], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MigrateMasterPod migrates master pod via failover to a replica
|
// MigrateMasterPod migrates master pod via failover to a replica
|
||||||
func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
|
func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
|
||||||
|
var (
|
||||||
|
masterCandidatePod *v1.Pod
|
||||||
|
)
|
||||||
|
|
||||||
oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{})
|
oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -193,10 +203,13 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
|
||||||
c.logger.Warningf("pod %q is not a master", podName)
|
c.logger.Warningf("pod %q is not a master", podName)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if *c.Statefulset.Spec.Replicas == 1 {
|
||||||
masterCandidatePod, err := c.masterCandidate(oldMaster.Spec.NodeName)
|
c.logger.Warningf("single master pod for cluster %q, migration will cause longer downtime of the master instance", c.clusterName())
|
||||||
if err != nil {
|
} else {
|
||||||
return fmt.Errorf("could not get new master candidate: %v", err)
|
masterCandidatePod, err = c.masterCandidate(oldMaster.Spec.NodeName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get new master candidate: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// there are two cases for each postgres cluster that has its master pod on the node to migrate from:
|
// there are two cases for each postgres cluster that has its master pod on the node to migrate from:
|
||||||
|
|
@ -250,6 +263,7 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st
|
||||||
func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
||||||
ch := c.registerPodSubscriber(podName)
|
ch := c.registerPodSubscriber(podName)
|
||||||
defer c.unregisterPodSubscriber(podName)
|
defer c.unregisterPodSubscriber(podName)
|
||||||
|
stopChan := make(chan struct{})
|
||||||
|
|
||||||
if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil {
|
if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil {
|
||||||
return nil, fmt.Errorf("could not delete pod: %v", err)
|
return nil, fmt.Errorf("could not delete pod: %v", err)
|
||||||
|
|
@ -258,7 +272,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
||||||
if err := c.waitForPodDeletion(ch); err != nil {
|
if err := c.waitForPodDeletion(ch); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pod, err := c.waitForPodLabel(ch, nil); err != nil {
|
if pod, err := c.waitForPodLabel(ch, stopChan, nil); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else {
|
} else {
|
||||||
c.logger.Infof("pod %q has been recreated", podName)
|
c.logger.Infof("pod %q has been recreated", podName)
|
||||||
|
|
|
||||||
|
|
@ -225,7 +225,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) {
|
||||||
return teamInfo.Members, nil
|
return teamInfo.Members, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRole) (*v1.Pod, error) {
|
func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, stopChan chan struct{}, role *PostgresRole) (*v1.Pod, error) {
|
||||||
timeout := time.After(c.OpConfig.PodLabelWaitTimeout)
|
timeout := time.After(c.OpConfig.PodLabelWaitTimeout)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
@ -241,6 +241,8 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRo
|
||||||
}
|
}
|
||||||
case <-timeout:
|
case <-timeout:
|
||||||
return nil, fmt.Errorf("pod label wait timeout")
|
return nil, fmt.Errorf("pod label wait timeout")
|
||||||
|
case <-stopChan:
|
||||||
|
return nil, fmt.Errorf("pod label wait cancelled")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -278,7 +280,10 @@ func (c *Cluster) waitStatefulsetReady() error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) waitPodLabelsReady() error {
|
func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error {
|
||||||
|
var (
|
||||||
|
podsNumber int
|
||||||
|
)
|
||||||
ls := c.labelsSet(false)
|
ls := c.labelsSet(false)
|
||||||
namespace := c.Namespace
|
namespace := c.Namespace
|
||||||
|
|
||||||
|
|
@ -295,35 +300,56 @@ func (c *Cluster) waitPodLabelsReady() error {
|
||||||
c.OpConfig.PodRoleLabel: string(Replica),
|
c.OpConfig.PodRoleLabel: string(Replica),
|
||||||
}).String(),
|
}).String(),
|
||||||
}
|
}
|
||||||
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
|
podsNumber = 1
|
||||||
if err != nil {
|
if !anyReplica {
|
||||||
return err
|
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
podsNumber = len(pods.Items)
|
||||||
|
c.logger.Debugf("Waiting for %d pods to become ready", podsNumber)
|
||||||
|
} else {
|
||||||
|
c.logger.Debugf("Waiting for any replica pod to become ready")
|
||||||
}
|
}
|
||||||
podsNumber := len(pods.Items)
|
|
||||||
|
|
||||||
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
|
err := retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
|
||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption)
|
masterCount := 0
|
||||||
if err2 != nil {
|
if !anyReplica {
|
||||||
return false, err2
|
masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption)
|
||||||
|
if err2 != nil {
|
||||||
|
return false, err2
|
||||||
|
}
|
||||||
|
if len(masterPods.Items) > 1 {
|
||||||
|
return false, fmt.Errorf("too many masters (%d pods with the master label found)",
|
||||||
|
len(masterPods.Items))
|
||||||
|
}
|
||||||
|
masterCount = len(masterPods.Items)
|
||||||
}
|
}
|
||||||
replicaPods, err2 := c.KubeClient.Pods(namespace).List(replicaListOption)
|
replicaPods, err2 := c.KubeClient.Pods(namespace).List(replicaListOption)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
return false, err2
|
return false, err2
|
||||||
}
|
}
|
||||||
if len(masterPods.Items) > 1 {
|
replicaCount := len(replicaPods.Items)
|
||||||
return false, fmt.Errorf("too many masters")
|
if anyReplica && replicaCount > 0 {
|
||||||
}
|
c.logger.Debugf("Found %d running replica pods", replicaCount)
|
||||||
if len(replicaPods.Items) == podsNumber {
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil
|
return masterCount+replicaCount >= podsNumber, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) waitForAnyReplicaLabelReady() error {
|
||||||
|
return c._waitPodLabelsReady(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) waitForAllPodsLabelReady() error {
|
||||||
|
return c._waitPodLabelsReady(false)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cluster) waitStatefulsetPodsReady() error {
|
func (c *Cluster) waitStatefulsetPodsReady() error {
|
||||||
c.setProcessName("waiting for the pods of the statefulset")
|
c.setProcessName("waiting for the pods of the statefulset")
|
||||||
// TODO: wait for the first Pod only
|
// TODO: wait for the first Pod only
|
||||||
|
|
@ -332,7 +358,7 @@ func (c *Cluster) waitStatefulsetPodsReady() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: wait only for master
|
// TODO: wait only for master
|
||||||
if err := c.waitPodLabelsReady(); err != nil {
|
if err := c.waitForAllPodsLabelReady(); err != nil {
|
||||||
return fmt.Errorf("pod labels error: %v", err)
|
return fmt.Errorf("pod labels error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue