Merge branch 'master' into pw-rotation-post-changes
This commit is contained in:
		
						commit
						a9793dd5cb
					
				|  | @ -205,7 +205,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|                 "enable_team_member_deprecation": "true", | ||||
|                 "role_deletion_suffix": "_delete_me", | ||||
|                 "resync_period": "15s", | ||||
|                 "repair_period": "10s", | ||||
|                 "repair_period": "15s", | ||||
|             }, | ||||
|         } | ||||
|         k8s.update_config(enable_postgres_team_crd) | ||||
|  | @ -296,6 +296,133 @@ class EndToEndTestCase(unittest.TestCase): | |||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, | ||||
|                              "Operator does not get in sync") | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_config_update(self): | ||||
|         ''' | ||||
|             Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni | ||||
|             and query Patroni config endpoint to check if manifest changes got applied | ||||
|             via restarting cluster through Patroni's rest api | ||||
|         ''' | ||||
|         k8s = self.k8s | ||||
|         leader = k8s.get_cluster_leader_pod() | ||||
|         replica = k8s.get_cluster_replica_pod() | ||||
|         masterCreationTimestamp = leader.metadata.creation_timestamp | ||||
|         replicaCreationTimestamp = replica.metadata.creation_timestamp | ||||
|         new_max_connections_value = "50" | ||||
| 
 | ||||
|         # adjust Postgres config | ||||
|         pg_patch_config = { | ||||
|             "spec": { | ||||
|                 "postgresql": { | ||||
|                     "parameters": { | ||||
|                         "max_connections": new_max_connections_value | ||||
|                      } | ||||
|                  }, | ||||
|                  "patroni": { | ||||
|                     "slots": { | ||||
|                         "test_slot": { | ||||
|                             "type": "physical" | ||||
|                         } | ||||
|                     }, | ||||
|                     "ttl": 29, | ||||
|                     "loop_wait": 9, | ||||
|                     "retry_timeout": 9, | ||||
|                     "synchronous_mode": True | ||||
|                  } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         try: | ||||
|             k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||
|                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config) | ||||
|              | ||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
| 
 | ||||
|             def compare_config(): | ||||
|                 effective_config = k8s.patroni_rest(leader.metadata.name, "config") | ||||
|                 desired_config = pg_patch_config["spec"]["patroni"] | ||||
|                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] | ||||
|                 effective_parameters = effective_config["postgresql"]["parameters"] | ||||
|                 self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], | ||||
|                             "max_connections not updated") | ||||
|                 self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") | ||||
|                 self.assertEqual(desired_config["ttl"], effective_config["ttl"], | ||||
|                             "ttl not updated") | ||||
|                 self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"], | ||||
|                             "loop_wait not updated") | ||||
|                 self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"], | ||||
|                             "retry_timeout not updated") | ||||
|                 self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"], | ||||
|                             "synchronous_mode not updated") | ||||
|                 return True | ||||
| 
 | ||||
|             # check if Patroni config has been updated | ||||
|             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||
| 
 | ||||
|             # make sure that pods were not recreated | ||||
|             leader = k8s.get_cluster_leader_pod() | ||||
|             replica = k8s.get_cluster_replica_pod() | ||||
|             self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp, | ||||
|                             "Master pod creation timestamp is updated") | ||||
|             self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp, | ||||
|                             "Master pod creation timestamp is updated") | ||||
| 
 | ||||
|             # query max_connections setting | ||||
|             setting_query = """ | ||||
|                SELECT setting | ||||
|                  FROM pg_settings | ||||
|                 WHERE name = 'max_connections'; | ||||
|             """ | ||||
|             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||
|                 "New max_connections setting not applied on master", 10, 5) | ||||
|             self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||
|                 "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) | ||||
| 
 | ||||
|             # the next sync should restart the replica because it has pending_restart flag set | ||||
|             # force next sync by deleting the operator pod | ||||
|             k8s.delete_operator_pod() | ||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
| 
 | ||||
|             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||
|                 "New max_connections setting not applied on replica", 10, 5) | ||||
| 
 | ||||
|             # decrease max_connections again | ||||
|             # this time restart will be correct and new value should appear on both instances | ||||
|             lower_max_connections_value = "30" | ||||
|             pg_patch_max_connections = { | ||||
|                 "spec": { | ||||
|                     "postgresql": { | ||||
|                         "parameters": { | ||||
|                             "max_connections": lower_max_connections_value | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||
|                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) | ||||
| 
 | ||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
| 
 | ||||
|             # check Patroni config again | ||||
|             pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value | ||||
|             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||
| 
 | ||||
|             # and query max_connections setting again | ||||
|             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, | ||||
|                 "Previous max_connections setting not applied on master", 10, 5) | ||||
|             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, | ||||
|                 "Previous max_connections setting not applied on replica", 10, 5) | ||||
| 
 | ||||
|         except timeout_decorator.TimeoutError: | ||||
|             print('Operator log: {}'.format(k8s.get_operator_log())) | ||||
|             raise | ||||
| 
 | ||||
|         # make sure cluster is in a good state for further tests | ||||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
|         self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, | ||||
|                              "No 2 pods running") | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_cross_namespace_secrets(self): | ||||
|         ''' | ||||
|  | @ -794,7 +921,11 @@ class EndToEndTestCase(unittest.TestCase): | |||
|         Lower resource limits below configured minimum and let operator fix it | ||||
|         ''' | ||||
|         k8s = self.k8s | ||||
|         # self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Cluster not healthy at start") | ||||
|         cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' | ||||
| 
 | ||||
|         # get nodes of master and replica(s) (expected target of new master) | ||||
|         _, replica_nodes = k8s.get_pg_nodes(cluster_label) | ||||
|         self.assertNotEqual(replica_nodes, []) | ||||
| 
 | ||||
|         # configure minimum boundaries for CPU and memory limits | ||||
|         minCPULimit = '503m' | ||||
|  | @ -827,7 +958,9 @@ class EndToEndTestCase(unittest.TestCase): | |||
|             "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) | ||||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
| 
 | ||||
|         self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") | ||||
|         # wait for switched over | ||||
|         k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) | ||||
|         k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) | ||||
|         self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running") | ||||
| 
 | ||||
|         def verify_pod_limits(): | ||||
|  | @ -968,15 +1101,15 @@ class EndToEndTestCase(unittest.TestCase): | |||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
| 
 | ||||
|             # node affinity change should cause another rolling update and relocation of replica | ||||
|             k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=replica,' + cluster_label) | ||||
|             k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) | ||||
|             k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) | ||||
| 
 | ||||
|         except timeout_decorator.TimeoutError: | ||||
|             print('Operator log: {}'.format(k8s.get_operator_log())) | ||||
|             raise | ||||
| 
 | ||||
|         # toggle pod anti affinity to make sure replica and master run on separate nodes | ||||
|         self.assert_distributed_pods(replica_nodes) | ||||
|         self.assert_distributed_pods(master_nodes) | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_node_readiness_label(self): | ||||
|  | @ -1192,133 +1325,6 @@ class EndToEndTestCase(unittest.TestCase): | |||
|         self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 2, | ||||
|             "Found incorrect number of rotation users", 10, 5) | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_patroni_config_update(self): | ||||
|         ''' | ||||
|             Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni | ||||
|             and query Patroni config endpoint to check if manifest changes got applied | ||||
|             via restarting cluster through Patroni's rest api | ||||
|         ''' | ||||
|         k8s = self.k8s | ||||
|         leader = k8s.get_cluster_leader_pod() | ||||
|         replica = k8s.get_cluster_replica_pod() | ||||
|         masterCreationTimestamp = leader.metadata.creation_timestamp | ||||
|         replicaCreationTimestamp = replica.metadata.creation_timestamp | ||||
|         new_max_connections_value = "50" | ||||
| 
 | ||||
|         # adjust Postgres config | ||||
|         pg_patch_config = { | ||||
|             "spec": { | ||||
|                 "postgresql": { | ||||
|                     "parameters": { | ||||
|                         "max_connections": new_max_connections_value | ||||
|                      } | ||||
|                  }, | ||||
|                  "patroni": { | ||||
|                     "slots": { | ||||
|                         "test_slot": { | ||||
|                             "type": "physical" | ||||
|                         } | ||||
|                     }, | ||||
|                     "ttl": 29, | ||||
|                     "loop_wait": 9, | ||||
|                     "retry_timeout": 9, | ||||
|                     "synchronous_mode": True | ||||
|                  } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         try: | ||||
|             k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||
|                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config) | ||||
|              | ||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
| 
 | ||||
|             def compare_config(): | ||||
|                 effective_config = k8s.patroni_rest(leader.metadata.name, "config") | ||||
|                 desired_config = pg_patch_config["spec"]["patroni"] | ||||
|                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] | ||||
|                 effective_parameters = effective_config["postgresql"]["parameters"] | ||||
|                 self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], | ||||
|                             "max_connections not updated") | ||||
|                 self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") | ||||
|                 self.assertEqual(desired_config["ttl"], effective_config["ttl"], | ||||
|                             "ttl not updated") | ||||
|                 self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"], | ||||
|                             "loop_wait not updated") | ||||
|                 self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"], | ||||
|                             "retry_timeout not updated") | ||||
|                 self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"], | ||||
|                             "synchronous_mode not updated") | ||||
|                 return True | ||||
| 
 | ||||
|             # check if Patroni config has been updated | ||||
|             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||
| 
 | ||||
|             # make sure that pods were not recreated | ||||
|             leader = k8s.get_cluster_leader_pod() | ||||
|             replica = k8s.get_cluster_replica_pod() | ||||
|             self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp, | ||||
|                             "Master pod creation timestamp is updated") | ||||
|             self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp, | ||||
|                             "Master pod creation timestamp is updated") | ||||
| 
 | ||||
|             # query max_connections setting | ||||
|             setting_query = """ | ||||
|                SELECT setting | ||||
|                  FROM pg_settings | ||||
|                 WHERE name = 'max_connections'; | ||||
|             """ | ||||
|             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||
|                 "New max_connections setting not applied on master", 10, 5) | ||||
|             self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||
|                 "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) | ||||
| 
 | ||||
|             # the next sync should restart the replica because it has pending_restart flag set | ||||
|             # force next sync by deleting the operator pod | ||||
|             k8s.delete_operator_pod() | ||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
| 
 | ||||
|             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||
|                 "New max_connections setting not applied on replica", 10, 5) | ||||
| 
 | ||||
|             # decrease max_connections again | ||||
|             # this time restart will be correct and new value should appear on both instances | ||||
|             lower_max_connections_value = "30" | ||||
|             pg_patch_max_connections = { | ||||
|                 "spec": { | ||||
|                     "postgresql": { | ||||
|                         "parameters": { | ||||
|                             "max_connections": lower_max_connections_value | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||
|                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) | ||||
| 
 | ||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
| 
 | ||||
|             # check Patroni config again | ||||
|             pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value | ||||
|             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||
| 
 | ||||
|             # and query max_connections setting again | ||||
|             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, | ||||
|                 "Previous max_connections setting not applied on master", 10, 5) | ||||
|             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, | ||||
|                 "Previous max_connections setting not applied on replica", 10, 5) | ||||
| 
 | ||||
|         except timeout_decorator.TimeoutError: | ||||
|             print('Operator log: {}'.format(k8s.get_operator_log())) | ||||
|             raise | ||||
| 
 | ||||
|         # make sure cluster is in a good state for further tests | ||||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||
|         self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, | ||||
|                              "No 2 pods running") | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_rolling_update_flag(self): | ||||
|         ''' | ||||
|  | @ -1405,7 +1411,7 @@ class EndToEndTestCase(unittest.TestCase): | |||
|             "data": { | ||||
|                 "pod_label_wait_timeout": "2s", | ||||
|                 "resync_period": "30s", | ||||
|                 "repair_period": "10s", | ||||
|                 "repair_period": "30s", | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|  |  | |||
|  | @ -13,6 +13,7 @@ import ( | |||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 
 | ||||
| 	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||||
| 	"github.com/zalando/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/patroni" | ||||
|  | @ -349,6 +350,54 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) getPatroniConfig(pod *v1.Pod) (acidv1.Patroni, map[string]string, error) { | ||||
| 	var ( | ||||
| 		patroniConfig acidv1.Patroni | ||||
| 		pgParameters  map[string]string | ||||
| 	) | ||||
| 	podName := util.NameFromMeta(pod.ObjectMeta) | ||||
| 	err := retryutil.Retry(1*time.Second, 5*time.Second, | ||||
| 		func() (bool, error) { | ||||
| 			var err error | ||||
| 			patroniConfig, pgParameters, err = c.patroni.GetConfig(pod) | ||||
| 
 | ||||
| 			if err != nil { | ||||
| 				return false, err | ||||
| 			} | ||||
| 			return true, nil | ||||
| 		}, | ||||
| 	) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return acidv1.Patroni{}, nil, fmt.Errorf("could not get Postgres config from pod %s: %v", podName, err) | ||||
| 	} | ||||
| 
 | ||||
| 	return patroniConfig, pgParameters, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) { | ||||
| 	var memberData patroni.MemberData | ||||
| 	err := retryutil.Retry(1*time.Second, 5*time.Second, | ||||
| 		func() (bool, error) { | ||||
| 			var err error | ||||
| 			memberData, err = c.patroni.GetMemberData(pod) | ||||
| 
 | ||||
| 			if err != nil { | ||||
| 				return false, err | ||||
| 			} | ||||
| 			return true, nil | ||||
| 		}, | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return patroni.MemberData{}, fmt.Errorf("could not get member data: %v", err) | ||||
| 	} | ||||
| 	if memberData.State == "creating replica" { | ||||
| 		return patroni.MemberData{}, fmt.Errorf("replica currently being initialized") | ||||
| 	} | ||||
| 
 | ||||
| 	return memberData, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { | ||||
| 	ch := c.registerPodSubscriber(podName) | ||||
| 	defer c.unregisterPodSubscriber(podName) | ||||
|  | @ -380,54 +429,10 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { | |||
| 	return pod, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool { | ||||
| 
 | ||||
| 	/* | ||||
| 	 Operator should not re-create pods if there is at least one replica being bootstrapped | ||||
| 	 because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag). | ||||
| 
 | ||||
| 	 XXX operator cannot forbid replica re-init, so we might still fail if re-init is started | ||||
| 	 after this check succeeds but before a pod is re-created | ||||
| 	*/ | ||||
| 	for _, pod := range pods { | ||||
| 		c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP) | ||||
| 	} | ||||
| 
 | ||||
| 	for _, pod := range pods { | ||||
| 
 | ||||
| 		var data patroni.MemberData | ||||
| 
 | ||||
| 		err := retryutil.Retry(1*time.Second, 5*time.Second, | ||||
| 			func() (bool, error) { | ||||
| 				var err error | ||||
| 				data, err = c.patroni.GetMemberData(&pod) | ||||
| 
 | ||||
| 				if err != nil { | ||||
| 					return false, err | ||||
| 				} | ||||
| 				return true, nil | ||||
| 			}, | ||||
| 		) | ||||
| 
 | ||||
| 		if err != nil { | ||||
| 			c.logger.Errorf("failed to get Patroni state for pod: %s", err) | ||||
| 			return false | ||||
| 		} else if data.State == "creating replica" { | ||||
| 			c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name) | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error { | ||||
| 	c.setProcessName("starting to recreate pods") | ||||
| 	c.logger.Infof("there are %d pods in the cluster to recreate", len(pods)) | ||||
| 
 | ||||
| 	if !c.isSafeToRecreatePods(pods) { | ||||
| 		return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized") | ||||
| 	} | ||||
| 
 | ||||
| 	var ( | ||||
| 		masterPod, newMasterPod *v1.Pod | ||||
| 	) | ||||
|  |  | |||
|  | @ -277,6 +277,7 @@ func (c *Cluster) syncStatefulSet() error { | |||
| 		restartMasterFirst bool | ||||
| 	) | ||||
| 	podsToRecreate := make([]v1.Pod, 0) | ||||
| 	isSafeToRecreatePods := true | ||||
| 	switchoverCandidates := make([]spec.NamespacedName, 0) | ||||
| 
 | ||||
| 	pods, err := c.listPods() | ||||
|  | @ -402,21 +403,20 @@ func (c *Cluster) syncStatefulSet() error { | |||
| 	// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
 | ||||
| 	// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
 | ||||
| 	for i, pod := range pods { | ||||
| 		emptyPatroniConfig := acidv1.Patroni{} | ||||
| 		podName := util.NameFromMeta(pods[i].ObjectMeta) | ||||
| 		patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod) | ||||
| 		patroniConfig, pgParameters, err := c.getPatroniConfig(&pod) | ||||
| 		if err != nil { | ||||
| 			c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) | ||||
| 			c.logger.Warningf("%v", err) | ||||
| 			isSafeToRecreatePods = false | ||||
| 			continue | ||||
| 		} | ||||
| 		restartWait = patroniConfig.LoopWait | ||||
| 
 | ||||
| 		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | ||||
| 		// do not attempt a restart
 | ||||
| 		if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { | ||||
| 		if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 { | ||||
| 			restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) | ||||
| 			if err != nil { | ||||
| 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | ||||
| 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err) | ||||
| 				continue | ||||
| 			} | ||||
| 			// it could take up to LoopWait to apply the config
 | ||||
|  | @ -437,50 +437,59 @@ func (c *Cluster) syncStatefulSet() error { | |||
| 			remainingPods = append(remainingPods, &pods[i]) | ||||
| 			continue | ||||
| 		} | ||||
| 		c.restartInstance(&pod, restartWait) | ||||
| 		if err = c.restartInstance(&pod, restartWait); err != nil { | ||||
| 			c.logger.Errorf("%v", err) | ||||
| 			isSafeToRecreatePods = false | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// in most cases only the master should be left to restart
 | ||||
| 	if len(remainingPods) > 0 { | ||||
| 		for _, remainingPod := range remainingPods { | ||||
| 			c.restartInstance(remainingPod, restartWait) | ||||
| 			if err = c.restartInstance(remainingPod, restartWait); err != nil { | ||||
| 				c.logger.Errorf("%v", err) | ||||
| 				isSafeToRecreatePods = false | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// if we get here we also need to re-create the pods (either leftovers from the old
 | ||||
| 	// statefulset or those that got their configuration from the outdated statefulset)
 | ||||
| 	if len(podsToRecreate) > 0 { | ||||
| 		if isSafeToRecreatePods { | ||||
| 			c.logger.Debugln("performing rolling update") | ||||
| 			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") | ||||
| 			if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { | ||||
| 				return fmt.Errorf("could not recreate pods: %v", err) | ||||
| 			} | ||||
| 			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") | ||||
| 		} else { | ||||
| 			c.logger.Warningf("postpone pod recreation until next sync") | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) { | ||||
| func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error { | ||||
| 	// if the config update requires a restart, call Patroni restart
 | ||||
| 	podName := util.NameFromMeta(pod.ObjectMeta) | ||||
| 	role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | ||||
| 
 | ||||
| 	// if the config update requires a restart, call Patroni restart
 | ||||
| 	memberData, err := c.patroni.GetMemberData(pod) | ||||
| 	memberData, err := c.getPatroniMemberData(pod) | ||||
| 	if err != nil { | ||||
| 		c.logger.Debugf("could not get member data of %s pod %s - skipping possible restart attempt: %v", role, podName, err) | ||||
| 		return | ||||
| 		return fmt.Errorf("could not restart Postgres in %s pod %s: %v", role, podName, err) | ||||
| 	} | ||||
| 
 | ||||
| 	// do restart only when it is pending
 | ||||
| 	if memberData.PendingRestart { | ||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) | ||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, podName)) | ||||
| 		if err := c.patroni.Restart(pod); err != nil { | ||||
| 			c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) | ||||
| 			return | ||||
| 			return err | ||||
| 		} | ||||
| 		time.Sleep(time.Duration(restartWait) * time.Second) | ||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) | ||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, podName)) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // AnnotationsToPropagate get the annotations to update if required
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue