wait after config patch and restart on sync whenever we see pending_restart
This commit is contained in:
		
							parent
							
								
									d3fe4264d4
								
							
						
					
					
						commit
						49668e95eb
					
				|  | @ -1066,8 +1066,8 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|         k8s = self.k8s |         k8s = self.k8s | ||||||
|         leader = k8s.get_cluster_leader_pod() |         leader = k8s.get_cluster_leader_pod() | ||||||
|         replica = k8s.get_cluster_replica_pod() |         replica = k8s.get_cluster_replica_pod() | ||||||
|         labels = 'application=spilo,cluster-name=acid-minimal-cluster,spilo-role=master' |         masterCreationTimestamp = leader.metadata.creation_timestamp | ||||||
|         creationTimestamp = leader.metadata.creation_timestamp |         replicaCreationTimestamp = replica.metadata.creation_timestamp | ||||||
|         new_max_connections_value = "50" |         new_max_connections_value = "50" | ||||||
| 
 | 
 | ||||||
|         # adjust Postgres config |         # adjust Postgres config | ||||||
|  | @ -1116,8 +1116,18 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|                             "synchronous_mode not updated") |                             "synchronous_mode not updated") | ||||||
|                 return True |                 return True | ||||||
| 
 | 
 | ||||||
|  |             # check if Patroni config has been updated | ||||||
|             self.eventuallyTrue(compare_config, "Postgres config not applied") |             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||||
| 
 | 
 | ||||||
|  |             # make sure that pods were not recreated | ||||||
|  |             leader = k8s.get_cluster_leader_pod() | ||||||
|  |             replica = k8s.get_cluster_replica_pod() | ||||||
|  |             self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp, | ||||||
|  |                             "Master pod creation timestamp is updated") | ||||||
|  |             self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp, | ||||||
|  |                             "Master pod creation timestamp is updated") | ||||||
|  | 
 | ||||||
|  |             # query max_connections setting | ||||||
|             setting_query = """ |             setting_query = """ | ||||||
|                SELECT setting |                SELECT setting | ||||||
|                  FROM pg_settings |                  FROM pg_settings | ||||||
|  | @ -1128,11 +1138,16 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|             self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, |             self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|                 "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) |                 "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) | ||||||
| 
 | 
 | ||||||
|             # make sure that pod wasn't recreated |             # the next sync should restart the replica because it has pending_restart flag set | ||||||
|             self.assertEqual(creationTimestamp, leader.metadata.creation_timestamp, |             # force next sync by deleting the operator pod | ||||||
|                             "Master pod creation timestamp is updated") |             k8s.delete_operator_pod() | ||||||
|  |             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
|  | 
 | ||||||
|  |             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|  |                 "New max_connections setting not applied on replica", 10, 5) | ||||||
| 
 | 
 | ||||||
|             # decrease max_connections again |             # decrease max_connections again | ||||||
|  |             # this time restart will be correct and new value should appear on both instances | ||||||
|             lower_max_connections_value = "30" |             lower_max_connections_value = "30" | ||||||
|             pg_patch_max_connections = { |             pg_patch_max_connections = { | ||||||
|                 "spec": { |                 "spec": { | ||||||
|  | @ -1149,12 +1164,14 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
| 
 | 
 | ||||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") |             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
| 
 | 
 | ||||||
|  |             # check Patroni config again | ||||||
|             pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value |             pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value | ||||||
|             self.eventuallyTrue(compare_config, "Postgres config not applied") |             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||||
| 
 | 
 | ||||||
|             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, |             # and query max_connections setting again | ||||||
|  |             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, | ||||||
|                 "Previous max_connections setting not applied on master", 10, 5) |                 "Previous max_connections setting not applied on master", 10, 5) | ||||||
|             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, |             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, | ||||||
|                 "Previous max_connections setting not applied on replica", 10, 5) |                 "Previous max_connections setting not applied on replica", 10, 5) | ||||||
| 
 | 
 | ||||||
|         except timeout_decorator.TimeoutError: |         except timeout_decorator.TimeoutError: | ||||||
|  |  | ||||||
|  | @ -272,9 +272,8 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncStatefulSet() error { | func (c *Cluster) syncStatefulSet() error { | ||||||
| 	var ( | 	var ( | ||||||
| 		restartTTL              uint32 | 		restartTTL         uint32 | ||||||
| 		instanceRestartRequired bool | 		restartMasterFirst bool | ||||||
| 		restartMasterFirst      bool |  | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	podsToRecreate := make([]v1.Pod, 0) | 	podsToRecreate := make([]v1.Pod, 0) | ||||||
|  | @ -414,43 +413,42 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | 		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | ||||||
| 		// do not attempt a restart
 | 		// do not attempt a restart
 | ||||||
| 		if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { | 		if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { | ||||||
| 			instanceRestartRequired, restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) | 			restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
|  | 			// it could take up to LoopWait to apply the config
 | ||||||
|  | 			time.Sleep(time.Duration(patroniConfig.LoopWait)*time.Second + time.Second*2) | ||||||
| 			restartTTL = patroniConfig.TTL | 			restartTTL = patroniConfig.TTL | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// if the config update requires a restart, call Patroni restart
 | 	// restart instances if requiredy
 | ||||||
| 	if instanceRestartRequired { | 	remainingPods := make([]*v1.Pod, 0) | ||||||
| 		remainingPods := make([]*v1.Pod, 0) | 	skipRole := Master | ||||||
| 		skipRole := Master | 	if restartMasterFirst { | ||||||
| 		if restartMasterFirst { | 		skipRole = Replica | ||||||
| 			skipRole = Replica | 	} | ||||||
|  | 	for i, pod := range pods { | ||||||
|  | 		role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | ||||||
|  | 		if role == skipRole { | ||||||
|  | 			remainingPods = append(remainingPods, &pods[i]) | ||||||
|  | 			continue | ||||||
| 		} | 		} | ||||||
| 		c.logger.Debug("restarting Postgres server within pods") | 		c.restartInstance(&pod) | ||||||
| 		for i, pod := range pods { | 		if len(pods) > 1 { | ||||||
| 			role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | 			time.Sleep(time.Duration(restartTTL) * time.Second) | ||||||
| 			if role == skipRole { |  | ||||||
| 				remainingPods = append(remainingPods, &pods[i]) |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
| 			c.restartInstance(&pod) |  | ||||||
| 			if len(pods) > 1 { |  | ||||||
| 				time.Sleep(time.Duration(restartTTL) * time.Second) |  | ||||||
| 			} |  | ||||||
| 		} | 		} | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 		// in most cases only the master should be left to restart
 | 	// in most cases only the master should be left to restart
 | ||||||
| 		if len(remainingPods) > 0 { | 	if len(remainingPods) > 0 { | ||||||
| 			for _, remainingPod := range remainingPods { | 		for _, remainingPod := range remainingPods { | ||||||
| 				c.restartInstance(remainingPod) | 			c.restartInstance(remainingPod) | ||||||
| 				if len(remainingPods) > 1 { | 			if len(remainingPods) > 1 { | ||||||
| 					time.Sleep(time.Duration(restartTTL) * time.Second) | 				time.Sleep(time.Duration(restartTTL) * time.Second) | ||||||
| 				} |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | @ -472,15 +470,22 @@ func (c *Cluster) restartInstance(pod *v1.Pod) { | ||||||
| 	podName := util.NameFromMeta(pod.ObjectMeta) | 	podName := util.NameFromMeta(pod.ObjectMeta) | ||||||
| 	role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | 	role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | ||||||
| 
 | 
 | ||||||
| 	c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) | 	// if the config update requires a restart, call Patroni restart
 | ||||||
| 
 | 	memberData, err := c.patroni.GetMemberData(pod) | ||||||
| 	if err := c.patroni.Restart(pod); err != nil { | 	if err != nil { | ||||||
| 		c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) | 		c.logger.Debugf("could not get member data of %s pod %s - skipping possible restart attempt: %v", role, podName, err) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Debugf("Postgres server successfuly restarted in %s pod %s", role, podName) | 	// do restart only when it is pending
 | ||||||
| 	c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) | 	if memberData.PendingRestart { | ||||||
|  | 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) | ||||||
|  | 		if err := c.patroni.Restart(pod); err != nil { | ||||||
|  | 			c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // AnnotationsToPropagate get the annotations to update if required
 | // AnnotationsToPropagate get the annotations to update if required
 | ||||||
|  | @ -517,7 +522,7 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri | ||||||
| 
 | 
 | ||||||
| // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
 | // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
 | ||||||
| // (like max_connections) have changed and if necessary sets it via the Patroni API
 | // (like max_connections) have changed and if necessary sets it via the Patroni API
 | ||||||
| func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, bool, error) { | func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, error) { | ||||||
| 	configToSet := make(map[string]interface{}) | 	configToSet := make(map[string]interface{}) | ||||||
| 	parametersToSet := make(map[string]string) | 	parametersToSet := make(map[string]string) | ||||||
| 	restartMaster := make([]bool, 0) | 	restartMaster := make([]bool, 0) | ||||||
|  | @ -584,7 +589,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if len(configToSet) == 0 { | 	if len(configToSet) == 0 { | ||||||
| 		return false, false, nil | 		return false, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	configToSetJson, err := json.Marshal(configToSet) | 	configToSetJson, err := json.Marshal(configToSet) | ||||||
|  | @ -598,10 +603,10 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC | ||||||
| 	c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", | 	c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", | ||||||
| 		podName, configToSetJson) | 		podName, configToSetJson) | ||||||
| 	if err = c.patroni.SetConfig(pod, configToSet); err != nil { | 	if err = c.patroni.SetConfig(pod, configToSet); err != nil { | ||||||
| 		return true, requiresMasterRestart, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) | 		return requiresMasterRestart, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return true, requiresMasterRestart, nil | 	return requiresMasterRestart, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncSecrets() error { | func (c *Cluster) syncSecrets() error { | ||||||
|  |  | ||||||
|  | @ -238,16 +238,12 @@ func (p *Patroni) Restart(server *v1.Pod) error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	memberData, err := p.GetMemberData(server) | 	if err := p.httpPostOrPatch(http.MethodPost, apiURLString+restartPath, buf); err != nil { | ||||||
| 	if err != nil { |  | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	p.logger.Infof("Postgres server successfuly restarted in pod %s", server.Name) | ||||||
| 
 | 
 | ||||||
| 	// do restart only when it is pending
 | 	return nil | ||||||
| 	if !memberData.PendingRestart { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 	return p.httpPostOrPatch(http.MethodPost, apiURLString+restartPath, buf) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // GetMemberData read member data from patroni API
 | // GetMemberData read member data from patroni API
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue