Perform manual failover during the scale down
This commit is contained in:
		
							parent
							
								
									5b29576a8e
								
							
						
					
					
						commit
						6c4cb4e9da
					
				|  | @ -316,7 +316,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp | |||
| 		reasons = append(reasons, "new statefulset's container specification doesn't match the current one") | ||||
| 	} | ||||
| 	if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { | ||||
| 
 | ||||
| 		c.logger.Warningf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) | ||||
| 		return &compareStatefulsetResult{} | ||||
| 	} | ||||
|  | @ -432,10 +431,8 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | |||
| 	defer c.mu.Unlock() | ||||
| 
 | ||||
| 	c.setStatus(spec.ClusterStatusUpdating) | ||||
| 	c.logger.Debugf("cluster update from version %q to %q", | ||||
| 		c.ResourceVersion, newSpec.ResourceVersion) | ||||
| 
 | ||||
| 	/* Make sure we update when this function exists */ | ||||
| 	/* Make sure we update when this function exits */ | ||||
| 	defer func() { | ||||
| 		c.Postgresql = *newSpec | ||||
| 	}() | ||||
|  |  | |||
|  | @ -11,12 +11,11 @@ import ( | |||
| ) | ||||
| 
 | ||||
| func (c *Cluster) listPods() ([]v1.Pod, error) { | ||||
| 	ns := c.Namespace | ||||
| 	listOptions := metav1.ListOptions{ | ||||
| 		LabelSelector: c.labelsSet().String(), | ||||
| 	} | ||||
| 
 | ||||
| 	pods, err := c.KubeClient.Pods(ns).List(listOptions) | ||||
| 	pods, err := c.KubeClient.Pods(c.Namespace).List(listOptions) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("could not get list of pods: %v", err) | ||||
| 	} | ||||
|  | @ -24,6 +23,27 @@ func (c *Cluster) listPods() ([]v1.Pod, error) { | |||
| 	return pods.Items, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) { | ||||
| 	listOptions := metav1.ListOptions{ | ||||
| 		LabelSelector: c.roleLabelsSet(role).String(), | ||||
| 	} | ||||
| 
 | ||||
| 	pods, err := c.KubeClient.Pods(c.Namespace).List(listOptions) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("could not get list of pods: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if len(pods.Items) == 0 { | ||||
| 		return nil, fmt.Errorf("no pods") | ||||
| 	} | ||||
| 
 | ||||
| 	if role == Master && len(pods.Items) > 1 { | ||||
| 		return nil, fmt.Errorf("too many masters") | ||||
| 	} | ||||
| 
 | ||||
| 	return pods.Items, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) deletePods() error { | ||||
| 	c.logger.Debugln("deleting pods") | ||||
| 	pods, err := c.listPods() | ||||
|  |  | |||
|  | @ -2,6 +2,8 @@ package cluster | |||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 
 | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
|  | @ -119,6 +121,56 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { | |||
| 	return statefulSet, nil | ||||
| } | ||||
| 
 | ||||
| func getPodIndex(podName string) (int32, error) { | ||||
| 	parts := strings.Split(podName, "-") | ||||
| 	if len(parts) == 0 { | ||||
| 		return 0, fmt.Errorf("pod has no index part") | ||||
| 	} | ||||
| 
 | ||||
| 	postfix := parts[len(parts)-1] | ||||
| 	res, err := strconv.ParseInt(postfix, 10, 32) | ||||
| 	if err != nil { | ||||
| 		return 0, fmt.Errorf("could not parse pod index: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return int32(res), nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error { | ||||
| 	masterPod, err := c.getRolePods(Master) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get master pod: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	podNum, err := getPodIndex(masterPod[0].Name) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get pod number: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	//Check if scale down affects current master pod
 | ||||
| 	if *newStatefulSet.Spec.Replicas >= podNum+1 { | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	podName := fmt.Sprintf("%s-0", c.Statefulset.Name) | ||||
| 	masterCandidatePod, err := c.KubeClient.Pods(c.OpConfig.Namespace).Get(podName, metav1.GetOptions{}) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get master candidate pod: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// some sanity check
 | ||||
| 	if !util.MapContains(masterCandidatePod.Labels, c.OpConfig.ClusterLabels) || | ||||
| 		!util.MapContains(masterCandidatePod.Labels, map[string]string{c.OpConfig.ClusterNameLabel: c.Name}) { | ||||
| 		return fmt.Errorf("pod %q does not belong to cluster", podName) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := c.patroni.Failover(&masterPod[0], masterCandidatePod.Name); err != nil { | ||||
| 		return fmt.Errorf("could not failover: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { | ||||
| 	c.setProcessName("updating statefulset") | ||||
| 	if c.Statefulset == nil { | ||||
|  | @ -126,6 +178,12 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { | |||
| 	} | ||||
| 	statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta) | ||||
| 
 | ||||
| 	//scale down
 | ||||
| 	if *c.Statefulset.Spec.Replicas > *newStatefulSet.Spec.Replicas { | ||||
| 		if err := c.preScaleDown(newStatefulSet); err != nil { | ||||
| 			c.logger.Warningf("could not scale down: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 	c.logger.Debugf("updating statefulset") | ||||
| 
 | ||||
| 	patchData, err := specPatch(newStatefulSet.Spec) | ||||
|  |  | |||
|  | @ -45,10 +45,6 @@ func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event spe | |||
| 	cluster, ok := c.clusters[clusterName] | ||||
| 	c.clustersMu.RUnlock() | ||||
| 	if ok { | ||||
| 		c.logger.WithField("cluster-name", clusterName). | ||||
| 			Debugf("sending %q event of pod %q to the cluster channel", | ||||
| 				event.EventType, | ||||
| 				event.PodName) | ||||
| 		cluster.ReceivePodEvent(event) | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -41,7 +41,7 @@ func New(logger *logrus.Entry) *Patroni { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (p *Patroni) apiURL(masterPod *v1.Pod) string { | ||||
| func apiURL(masterPod *v1.Pod) string { | ||||
| 	return fmt.Sprintf("http://%s:%d", masterPod.Status.PodIP, apiPort) | ||||
| } | ||||
| 
 | ||||
|  | @ -54,7 +54,7 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error { | |||
| 		return fmt.Errorf("could not encode json: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	request, err := http.NewRequest(http.MethodPost, p.apiURL(master)+failoverPath, buf) | ||||
| 	request, err := http.NewRequest(http.MethodPost, apiURL(master)+failoverPath, buf) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not create request: %v", err) | ||||
| 	} | ||||
|  | @ -77,4 +77,4 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error { | |||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| } | ||||
|  | @ -103,3 +103,19 @@ func FindNamedStringSubmatch(r *regexp.Regexp, s string) map[string]string { | |||
| 
 | ||||
| 	return res | ||||
| } | ||||
| 
 | ||||
| // MapContains returns true if and only if haystack contains all the keys from the needle with matching corresponding values
 | ||||
| func MapContains(haystack, needle map[string]string) bool { | ||||
| 	if len(haystack) < len(needle) { | ||||
| 		return false | ||||
| 	} | ||||
| 
 | ||||
| 	for k, v := range needle { | ||||
| 		v2, ok := haystack[k] | ||||
| 		if !ok || v2 != v { | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return true | ||||
| } | ||||
|  |  | |||
|  | @ -46,6 +46,18 @@ var substractTest = []struct { | |||
| 	{[]string{"a", "b", "c", "d"}, []string{"a", "bb", "c", "d"}, []string{"b"}, false}, | ||||
| } | ||||
| 
 | ||||
| var mapContaintsTest = []struct { | ||||
| 	inA map[string]string | ||||
| 	inB map[string]string | ||||
| 	out bool | ||||
| }{ | ||||
| 	{map[string]string{"1": "a", "2": "b", "3": "c", "4": "c"}, map[string]string{"1": "a", "2": "b", "3": "c"}, true}, | ||||
| 	{map[string]string{"1": "a", "2": "b", "3": "c", "4": "c"}, map[string]string{"1": "a", "2": "b", "3": "d"}, false}, | ||||
| 	{map[string]string{}, map[string]string{}, true}, | ||||
| 	{map[string]string{"3": "c", "4": "c"}, map[string]string{"1": "a", "2": "b", "3": "c"}, false}, | ||||
| 	{map[string]string{"3": "c", "4": "c"}, map[string]string{}, true}, | ||||
| } | ||||
| 
 | ||||
| var substringMatch = []struct { | ||||
| 	inRegex *regexp.Regexp | ||||
| 	inStr   string | ||||
|  | @ -122,3 +134,12 @@ func TestFindNamedStringSubmatch(t *testing.T) { | |||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestMapContains(t *testing.T) { | ||||
| 	for _, tt := range mapContaintsTest { | ||||
| 		res := MapContains(tt.inA, tt.inB) | ||||
| 		if res != tt.out { | ||||
| 			t.Errorf("MapContains expected: %#v, got: %#v", tt.out, res) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue