Re-create pods only if all replicas are running (#903)
* adds a Get call to Patroni interface to fetch state of a Patroni member * postpones re-creating pods if at least one replica is currently being created Co-authored-by: Sergey Dudoladov <sergey.dudoladov@zalando.de> Co-authored-by: Felix Kunde <felix-kunde@gmx.de>
This commit is contained in:
		
							parent
							
								
									5014eebfb2
								
							
						
					
					
						commit
						3c91bdeffa
					
				|  | @ -28,6 +28,7 @@ _testmain.go | |||
| /vendor/ | ||||
| /build/ | ||||
| /docker/build/ | ||||
| /github.com/ | ||||
| .idea | ||||
| 
 | ||||
| scm-source.json | ||||
|  |  | |||
|  | @ -344,7 +344,6 @@ class EndToEndTestCase(unittest.TestCase): | |||
|         ''' | ||||
|         k8s = self.k8s | ||||
|         cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' | ||||
|         labels = 'spilo-role=master,' + cluster_label | ||||
|         readiness_label = 'lifecycle-status' | ||||
|         readiness_value = 'ready' | ||||
| 
 | ||||
|  | @ -709,14 +708,16 @@ class K8s: | |||
|     def wait_for_logical_backup_job_creation(self): | ||||
|         self.wait_for_logical_backup_job(expected_num_of_jobs=1) | ||||
| 
 | ||||
|     def update_config(self, config_map_patch): | ||||
|         self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) | ||||
| 
 | ||||
|     def delete_operator_pod(self): | ||||
|         operator_pod = self.api.core_v1.list_namespaced_pod( | ||||
|             'default', label_selector="name=postgres-operator").items[0].metadata.name | ||||
|         self.api.core_v1.delete_namespaced_pod(operator_pod, "default")  # restart reloads the conf | ||||
|         self.wait_for_operator_pod_start() | ||||
| 
 | ||||
|     def update_config(self, config_map_patch): | ||||
|         self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) | ||||
|         self.delete_operator_pod() | ||||
| 
 | ||||
|     def create_with_kubectl(self, path): | ||||
|         return subprocess.run( | ||||
|             ["kubectl", "create", "-f", path], | ||||
|  |  | |||
|  | @ -294,6 +294,27 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { | |||
| 	return pod, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { | ||||
| 
 | ||||
| 	/* | ||||
| 	 Operator should not re-create pods if there is at least one replica being bootstrapped | ||||
| 	 because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag). | ||||
| 
 | ||||
| 	 XXX operator cannot forbid replica re-init, so we might still fail if re-init is started | ||||
| 	 after this check succeeds but before a pod is re-created | ||||
| 	*/ | ||||
| 
 | ||||
| 	for _, pod := range pods.Items { | ||||
| 		state, err := c.patroni.GetPatroniMemberState(&pod) | ||||
| 		if err != nil || state == "creating replica" { | ||||
| 			c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name) | ||||
| 			return false | ||||
| 		} | ||||
| 
 | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) recreatePods() error { | ||||
| 	c.setProcessName("starting to recreate pods") | ||||
| 	ls := c.labelsSet(false) | ||||
|  | @ -309,6 +330,10 @@ func (c *Cluster) recreatePods() error { | |||
| 	} | ||||
| 	c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items)) | ||||
| 
 | ||||
| 	if !c.isSafeToRecreatePods(pods) { | ||||
| 		return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initilalized") | ||||
| 	} | ||||
| 
 | ||||
| 	var ( | ||||
| 		masterPod, newMasterPod, newPod *v1.Pod | ||||
| 	) | ||||
|  |  | |||
|  | @ -3,6 +3,7 @@ package patroni | |||
| import ( | ||||
| 	"bytes" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"net" | ||||
|  | @ -11,7 +12,7 @@ import ( | |||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/sirupsen/logrus" | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
|  | @ -25,6 +26,7 @@ const ( | |||
| type Interface interface { | ||||
| 	Switchover(master *v1.Pod, candidate string) error | ||||
| 	SetPostgresParameters(server *v1.Pod, options map[string]string) error | ||||
| 	GetPatroniMemberState(pod *v1.Pod) (string, error) | ||||
| } | ||||
| 
 | ||||
| // Patroni API client
 | ||||
|  | @ -123,3 +125,36 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st | |||
| 	} | ||||
| 	return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) | ||||
| } | ||||
| 
 | ||||
| //GetPatroniMemberState returns a state of member of a Patroni cluster
 | ||||
| func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) { | ||||
| 
 | ||||
| 	apiURLString, err := apiURL(server) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 	response, err := p.httpClient.Get(apiURLString) | ||||
| 	if err != nil { | ||||
| 		return "", fmt.Errorf("could not perform Get request: %v", err) | ||||
| 	} | ||||
| 	defer response.Body.Close() | ||||
| 
 | ||||
| 	body, err := ioutil.ReadAll(response.Body) | ||||
| 	if err != nil { | ||||
| 		return "", fmt.Errorf("could not read response: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	data := make(map[string]interface{}) | ||||
| 	err = json.Unmarshal(body, &data) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	state, ok := data["state"].(string) | ||||
| 	if !ok { | ||||
| 		return "", errors.New("Patroni Get call response contains wrong type for 'state' field") | ||||
| 	} | ||||
| 
 | ||||
| 	return state, nil | ||||
| 
 | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue