restart master first in some edge cases
This commit is contained in:
		
							parent
							
								
									720b4575c7
								
							
						
					
					
						commit
						bf0c06cca4
					
				|  | @ -23,6 +23,7 @@ RUN apt-get update     \ | ||||||
|     && curl --silent https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - \ |     && curl --silent https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - \ | ||||||
|     && apt-get update \ |     && apt-get update \ | ||||||
|     && apt-get install --no-install-recommends -y  \ |     && apt-get install --no-install-recommends -y  \ | ||||||
|  |         postgresql-client-14  \ | ||||||
|         postgresql-client-13  \ |         postgresql-client-13  \ | ||||||
|         postgresql-client-12  \ |         postgresql-client-12  \ | ||||||
|         postgresql-client-11  \ |         postgresql-client-11  \ | ||||||
|  |  | ||||||
|  | @ -16,7 +16,7 @@ RUN apt-get update \ | ||||||
|            curl \ |            curl \ | ||||||
|            vim \ |            vim \ | ||||||
|     && pip3 install --no-cache-dir -r requirements.txt \ |     && pip3 install --no-cache-dir -r requirements.txt \ | ||||||
|     && curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/bin/linux/amd64/kubectl \ |     && curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.22.0/bin/linux/amd64/kubectl \ | ||||||
|     && chmod +x ./kubectl \ |     && chmod +x ./kubectl \ | ||||||
|     && mv ./kubectl /usr/local/bin/kubectl \ |     && mv ./kubectl /usr/local/bin/kubectl \ | ||||||
|     && apt-get clean \ |     && apt-get clean \ | ||||||
|  |  | ||||||
|  | @ -1064,12 +1064,13 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|             via restarting cluster through Patroni's rest api |             via restarting cluster through Patroni's rest api | ||||||
|         ''' |         ''' | ||||||
|         k8s = self.k8s |         k8s = self.k8s | ||||||
|         masterPod = k8s.get_cluster_leader_pod() |         leader = k8s.get_cluster_leader_pod() | ||||||
|  |         replica = k8s.get_cluster_replica_pod() | ||||||
|         labels = 'application=spilo,cluster-name=acid-minimal-cluster,spilo-role=master' |         labels = 'application=spilo,cluster-name=acid-minimal-cluster,spilo-role=master' | ||||||
|         creationTimestamp = masterPod.metadata.creation_timestamp |         creationTimestamp = leader.metadata.creation_timestamp | ||||||
|         new_max_connections_value = "50" |         new_max_connections_value = "50" | ||||||
| 
 | 
 | ||||||
|         # adjust max_connection |         # adjust Postgres config | ||||||
|         pg_patch_config = { |         pg_patch_config = { | ||||||
|             "spec": { |             "spec": { | ||||||
|                 "postgresql": { |                 "postgresql": { | ||||||
|  | @ -1098,7 +1099,7 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") |             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
| 
 | 
 | ||||||
|             def compare_config(): |             def compare_config(): | ||||||
|                 effective_config = k8s.patroni_rest(masterPod.metadata.name, "config") |                 effective_config = k8s.patroni_rest(leader.metadata.name, "config") | ||||||
|                 desired_config = pg_patch_config["spec"]["patroni"] |                 desired_config = pg_patch_config["spec"]["patroni"] | ||||||
|                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] |                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] | ||||||
|                 effective_parameters = effective_config["postgresql"]["parameters"] |                 effective_parameters = effective_config["postgresql"]["parameters"] | ||||||
|  | @ -1122,13 +1123,40 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|                  FROM pg_settings |                  FROM pg_settings | ||||||
|                 WHERE name = 'max_connections'; |                 WHERE name = 'max_connections'; | ||||||
|             """ |             """ | ||||||
|             self.eventuallyEqual(lambda: self.query_database(masterPod.metadata.name, "postgres", setting_query)[0], new_max_connections_value, |             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|                 "New max_connections setting not applied", 10, 5) |                 "New max_connections setting not applied on master", 10, 5) | ||||||
|  |             self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|  |                 "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) | ||||||
| 
 | 
 | ||||||
|             # make sure that pod wasn't recreated |             # make sure that pod wasn't recreated | ||||||
|             self.assertEqual(creationTimestamp, masterPod.metadata.creation_timestamp, |             self.assertEqual(creationTimestamp, leader.metadata.creation_timestamp, | ||||||
|                             "Master pod creation timestamp is updated") |                             "Master pod creation timestamp is updated") | ||||||
| 
 | 
 | ||||||
|  |             # decrease max_connections again | ||||||
|  |             lower_max_connections_value = "30" | ||||||
|  |             pg_patch_max_connections = { | ||||||
|  |                 "spec": { | ||||||
|  |                     "postgresql": { | ||||||
|  |                         "parameters": { | ||||||
|  |                             "max_connections": lower_max_connections_value | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||||
|  |                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) | ||||||
|  | 
 | ||||||
|  |             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
|  | 
 | ||||||
|  |             pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value | ||||||
|  |             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||||
|  | 
 | ||||||
|  |             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|  |                 "Previous max_connections setting not applied on master", 10, 5) | ||||||
|  |             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|  |                 "Previous max_connections setting not applied on replica", 10, 5) | ||||||
|  | 
 | ||||||
|         except timeout_decorator.TimeoutError: |         except timeout_decorator.TimeoutError: | ||||||
|             print('Operator log: {}'.format(k8s.get_operator_log())) |             print('Operator log: {}'.format(k8s.get_operator_log())) | ||||||
|             raise |             raise | ||||||
|  |  | ||||||
|  | @ -36,7 +36,7 @@ spec: | ||||||
|           defaultRoles: true |           defaultRoles: true | ||||||
|           defaultUsers: false |           defaultUsers: false | ||||||
|   postgresql: |   postgresql: | ||||||
|     version: "14" |     version: "13" | ||||||
|     parameters:  # Expert section |     parameters:  # Expert section | ||||||
|       shared_buffers: "32MB" |       shared_buffers: "32MB" | ||||||
|       max_connections: "10" |       max_connections: "10" | ||||||
|  |  | ||||||
|  | @ -18,4 +18,4 @@ spec: | ||||||
|   preparedDatabases: |   preparedDatabases: | ||||||
|     bar: {} |     bar: {} | ||||||
|   postgresql: |   postgresql: | ||||||
|     version: "12" |     version: "13" | ||||||
|  |  | ||||||
|  | @ -18,4 +18,4 @@ spec: | ||||||
|   preparedDatabases: |   preparedDatabases: | ||||||
|     bar: {} |     bar: {} | ||||||
|   postgresql: |   postgresql: | ||||||
|     version: "14" |     version: "13" | ||||||
|  |  | ||||||
|  | @ -9,7 +9,7 @@ spec: | ||||||
|     size: 1Gi |     size: 1Gi | ||||||
|   numberOfInstances: 1 |   numberOfInstances: 1 | ||||||
|   postgresql: |   postgresql: | ||||||
|     version: "14" |     version: "13" | ||||||
| # Make this a standby cluster and provide the s3 bucket path of source cluster for continuous streaming. | # Make this a standby cluster and provide the s3 bucket path of source cluster for continuous streaming. | ||||||
|   standby: |   standby: | ||||||
|     s3_wal_path: "s3://path/to/bucket/containing/wal/of/source/cluster/" |     s3_wal_path: "s3://path/to/bucket/containing/wal/of/source/cluster/" | ||||||
|  |  | ||||||
|  | @ -20,6 +20,14 @@ import ( | ||||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | var requireMasterRestartWhenDecreased = []string{ | ||||||
|  | 	"max_connections", | ||||||
|  | 	"max_prepared_transactions", | ||||||
|  | 	"max_locks_per_transaction", | ||||||
|  | 	"max_worker_processes", | ||||||
|  | 	"max_wal_senders", | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest.
 | // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest.
 | ||||||
| // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them.
 | // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them.
 | ||||||
| func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { | func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { | ||||||
|  | @ -264,9 +272,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncStatefulSet() error { | func (c *Cluster) syncStatefulSet() error { | ||||||
| 	var ( | 	var ( | ||||||
| 		masterPod               *v1.Pod | 		restartTTL              uint32 | ||||||
| 		postgresConfig          map[string]interface{} |  | ||||||
| 		instanceRestartRequired bool | 		instanceRestartRequired bool | ||||||
|  | 		restartMasterFirst      bool | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	podsToRecreate := make([]v1.Pod, 0) | 	podsToRecreate := make([]v1.Pod, 0) | ||||||
|  | @ -406,34 +414,44 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | 		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | ||||||
| 		// do not attempt a restart
 | 		// do not attempt a restart
 | ||||||
| 		if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { | 		if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { | ||||||
| 			instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) | 			instanceRestartRequired, restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
|  | 			restartTTL = patroniConfig.TTL | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// if the config update requires a restart, call Patroni restart for replicas first, then master
 | 	// if the config update requires a restart, call Patroni restart
 | ||||||
| 	if instanceRestartRequired { | 	if instanceRestartRequired { | ||||||
| 		c.logger.Debug("restarting Postgres server within pods") | 		remainingPods := make([]*v1.Pod, 0) | ||||||
| 		ttl, ok := postgresConfig["ttl"].(int32) | 		skipRole := Master | ||||||
| 		if !ok { | 		if restartMasterFirst { | ||||||
| 			ttl = 30 | 			skipRole = Replica | ||||||
| 		} | 		} | ||||||
|  | 		c.logger.Debug("restarting Postgres server within pods") | ||||||
| 		for i, pod := range pods { | 		for i, pod := range pods { | ||||||
| 			role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | 			role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | ||||||
| 			if role == Master { | 			if role == skipRole { | ||||||
| 				masterPod = &pods[i] | 				remainingPods = append(remainingPods, &pods[i]) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 			c.restartInstance(&pod) | 			c.restartInstance(&pod) | ||||||
| 			time.Sleep(time.Duration(ttl) * time.Second) | 			if len(pods) > 1 { | ||||||
|  | 				time.Sleep(time.Duration(restartTTL) * time.Second) | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if masterPod != nil { | 		// in most cases only the master should be left to restart
 | ||||||
| 			c.restartInstance(masterPod) | 		if len(remainingPods) > 0 { | ||||||
|  | 			for _, remainingPod := range remainingPods { | ||||||
|  | 				c.restartInstance(remainingPod) | ||||||
|  | 				if len(remainingPods) > 1 { | ||||||
|  | 					time.Sleep(time.Duration(restartTTL) * time.Second) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -499,24 +517,13 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri | ||||||
| 
 | 
 | ||||||
| // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
 | // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
 | ||||||
| // (like max_connections) have changed and if necessary sets it via the Patroni API
 | // (like max_connections) have changed and if necessary sets it via the Patroni API
 | ||||||
| func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, error) { | func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, bool, error) { | ||||||
| 	configToSet := make(map[string]interface{}) | 	configToSet := make(map[string]interface{}) | ||||||
| 	parametersToSet := make(map[string]string) | 	parametersToSet := make(map[string]string) | ||||||
|  | 	restartMaster := make([]bool, 0) | ||||||
|  | 	requiresMasterRestart := false | ||||||
| 
 | 
 | ||||||
| 	// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
 | 	// compare options from config with c.Spec.Patroni from manifest
 | ||||||
| 	desiredPgParameters := c.Spec.Parameters |  | ||||||
| 	for desiredOption, desiredValue := range desiredPgParameters { |  | ||||||
| 		effectiveValue := effectivePgParameters[desiredOption] |  | ||||||
| 		if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) { |  | ||||||
| 			parametersToSet[desiredOption] = desiredValue |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if len(parametersToSet) > 0 { |  | ||||||
| 		configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// compare other options from config with c.Spec.Patroni from manifest
 |  | ||||||
| 	desiredPatroniConfig := c.Spec.Patroni | 	desiredPatroniConfig := c.Spec.Patroni | ||||||
| 	if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != patroniConfig.LoopWait { | 	if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != patroniConfig.LoopWait { | ||||||
| 		configToSet["loop_wait"] = desiredPatroniConfig.LoopWait | 		configToSet["loop_wait"] = desiredPatroniConfig.LoopWait | ||||||
|  | @ -554,8 +561,30 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC | ||||||
| 		configToSet["slots"] = slotsToSet | 		configToSet["slots"] = slotsToSet | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
 | ||||||
|  | 	desiredPgParameters := c.Spec.Parameters | ||||||
|  | 	for desiredOption, desiredValue := range desiredPgParameters { | ||||||
|  | 		effectiveValue := effectivePgParameters[desiredOption] | ||||||
|  | 		if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) { | ||||||
|  | 			parametersToSet[desiredOption] = desiredValue | ||||||
|  | 			if util.SliceContains(requireMasterRestartWhenDecreased, desiredOption) && (effectiveValue != desiredValue) { | ||||||
|  | 				restartMaster = append(restartMaster, true) | ||||||
|  | 			} else { | ||||||
|  | 				restartMaster = append(restartMaster, false) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if !util.SliceContains(restartMaster, false) && len(configToSet) == 0 { | ||||||
|  | 		requiresMasterRestart = true | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if len(parametersToSet) > 0 { | ||||||
|  | 		configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	if len(configToSet) == 0 { | 	if len(configToSet) == 0 { | ||||||
| 		return false, nil | 		return false, false, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	configToSetJson, err := json.Marshal(configToSet) | 	configToSetJson, err := json.Marshal(configToSet) | ||||||
|  | @ -569,10 +598,10 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC | ||||||
| 	c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", | 	c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", | ||||||
| 		podName, configToSetJson) | 		podName, configToSetJson) | ||||||
| 	if err = c.patroni.SetConfig(pod, configToSet); err != nil { | 	if err = c.patroni.SetConfig(pod, configToSet); err != nil { | ||||||
| 		return true, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) | 		return true, requiresMasterRestart, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return true, nil | 	return true, requiresMasterRestart, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncSecrets() error { | func (c *Cluster) syncSecrets() error { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue