do not recreate pods if previous Patroni API calls fail (#1767)
* do not recreate pods if previous Patroni API calls fail * move retry reads against Patroni API to pod.go * remove final failover check in node affinity test * make test_min_resource_limits more robust
This commit is contained in:
		
							parent
							
								
									2d2386f519
								
							
						
					
					
						commit
						46547c4088
					
				|  | @ -205,7 +205,7 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|                 "enable_team_member_deprecation": "true", |                 "enable_team_member_deprecation": "true", | ||||||
|                 "role_deletion_suffix": "_delete_me", |                 "role_deletion_suffix": "_delete_me", | ||||||
|                 "resync_period": "15s", |                 "resync_period": "15s", | ||||||
|                 "repair_period": "10s", |                 "repair_period": "15s", | ||||||
|             }, |             }, | ||||||
|         } |         } | ||||||
|         k8s.update_config(enable_postgres_team_crd) |         k8s.update_config(enable_postgres_team_crd) | ||||||
|  | @ -296,6 +296,133 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, |         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, | ||||||
|                              "Operator does not get in sync") |                              "Operator does not get in sync") | ||||||
| 
 | 
 | ||||||
|  |     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||||
|  |     def test_config_update(self): | ||||||
|  |         ''' | ||||||
|  |             Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni | ||||||
|  |             and query Patroni config endpoint to check if manifest changes got applied | ||||||
|  |             via restarting cluster through Patroni's rest api | ||||||
|  |         ''' | ||||||
|  |         k8s = self.k8s | ||||||
|  |         leader = k8s.get_cluster_leader_pod() | ||||||
|  |         replica = k8s.get_cluster_replica_pod() | ||||||
|  |         masterCreationTimestamp = leader.metadata.creation_timestamp | ||||||
|  |         replicaCreationTimestamp = replica.metadata.creation_timestamp | ||||||
|  |         new_max_connections_value = "50" | ||||||
|  | 
 | ||||||
|  |         # adjust Postgres config | ||||||
|  |         pg_patch_config = { | ||||||
|  |             "spec": { | ||||||
|  |                 "postgresql": { | ||||||
|  |                     "parameters": { | ||||||
|  |                         "max_connections": new_max_connections_value | ||||||
|  |                      } | ||||||
|  |                  }, | ||||||
|  |                  "patroni": { | ||||||
|  |                     "slots": { | ||||||
|  |                         "test_slot": { | ||||||
|  |                             "type": "physical" | ||||||
|  |                         } | ||||||
|  |                     }, | ||||||
|  |                     "ttl": 29, | ||||||
|  |                     "loop_wait": 9, | ||||||
|  |                     "retry_timeout": 9, | ||||||
|  |                     "synchronous_mode": True | ||||||
|  |                  } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         try: | ||||||
|  |             k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||||
|  |                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config) | ||||||
|  |              | ||||||
|  |             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
|  | 
 | ||||||
|  |             def compare_config(): | ||||||
|  |                 effective_config = k8s.patroni_rest(leader.metadata.name, "config") | ||||||
|  |                 desired_config = pg_patch_config["spec"]["patroni"] | ||||||
|  |                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] | ||||||
|  |                 effective_parameters = effective_config["postgresql"]["parameters"] | ||||||
|  |                 self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], | ||||||
|  |                             "max_connections not updated") | ||||||
|  |                 self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") | ||||||
|  |                 self.assertEqual(desired_config["ttl"], effective_config["ttl"], | ||||||
|  |                             "ttl not updated") | ||||||
|  |                 self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"], | ||||||
|  |                             "loop_wait not updated") | ||||||
|  |                 self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"], | ||||||
|  |                             "retry_timeout not updated") | ||||||
|  |                 self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"], | ||||||
|  |                             "synchronous_mode not updated") | ||||||
|  |                 return True | ||||||
|  | 
 | ||||||
|  |             # check if Patroni config has been updated | ||||||
|  |             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||||
|  | 
 | ||||||
|  |             # make sure that pods were not recreated | ||||||
|  |             leader = k8s.get_cluster_leader_pod() | ||||||
|  |             replica = k8s.get_cluster_replica_pod() | ||||||
|  |             self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp, | ||||||
|  |                             "Master pod creation timestamp is updated") | ||||||
|  |             self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp, | ||||||
|  |                             "Master pod creation timestamp is updated") | ||||||
|  | 
 | ||||||
|  |             # query max_connections setting | ||||||
|  |             setting_query = """ | ||||||
|  |                SELECT setting | ||||||
|  |                  FROM pg_settings | ||||||
|  |                 WHERE name = 'max_connections'; | ||||||
|  |             """ | ||||||
|  |             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|  |                 "New max_connections setting not applied on master", 10, 5) | ||||||
|  |             self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|  |                 "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) | ||||||
|  | 
 | ||||||
|  |             # the next sync should restart the replica because it has pending_restart flag set | ||||||
|  |             # force next sync by deleting the operator pod | ||||||
|  |             k8s.delete_operator_pod() | ||||||
|  |             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
|  | 
 | ||||||
|  |             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|  |                 "New max_connections setting not applied on replica", 10, 5) | ||||||
|  | 
 | ||||||
|  |             # decrease max_connections again | ||||||
|  |             # this time restart will be correct and new value should appear on both instances | ||||||
|  |             lower_max_connections_value = "30" | ||||||
|  |             pg_patch_max_connections = { | ||||||
|  |                 "spec": { | ||||||
|  |                     "postgresql": { | ||||||
|  |                         "parameters": { | ||||||
|  |                             "max_connections": lower_max_connections_value | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||||
|  |                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) | ||||||
|  | 
 | ||||||
|  |             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
|  | 
 | ||||||
|  |             # check Patroni config again | ||||||
|  |             pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value | ||||||
|  |             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||||
|  | 
 | ||||||
|  |             # and query max_connections setting again | ||||||
|  |             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, | ||||||
|  |                 "Previous max_connections setting not applied on master", 10, 5) | ||||||
|  |             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, | ||||||
|  |                 "Previous max_connections setting not applied on replica", 10, 5) | ||||||
|  | 
 | ||||||
|  |         except timeout_decorator.TimeoutError: | ||||||
|  |             print('Operator log: {}'.format(k8s.get_operator_log())) | ||||||
|  |             raise | ||||||
|  | 
 | ||||||
|  |         # make sure cluster is in a good state for further tests | ||||||
|  |         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
|  |         self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, | ||||||
|  |                              "No 2 pods running") | ||||||
|  | 
 | ||||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) |     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||||
|     def test_cross_namespace_secrets(self): |     def test_cross_namespace_secrets(self): | ||||||
|         ''' |         ''' | ||||||
|  | @ -794,7 +921,11 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|         Lower resource limits below configured minimum and let operator fix it |         Lower resource limits below configured minimum and let operator fix it | ||||||
|         ''' |         ''' | ||||||
|         k8s = self.k8s |         k8s = self.k8s | ||||||
|         # self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Cluster not healthy at start") |         cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' | ||||||
|  | 
 | ||||||
|  |         # get nodes of master and replica(s) (expected target of new master) | ||||||
|  |         _, replica_nodes = k8s.get_pg_nodes(cluster_label) | ||||||
|  |         self.assertNotEqual(replica_nodes, []) | ||||||
| 
 | 
 | ||||||
|         # configure minimum boundaries for CPU and memory limits |         # configure minimum boundaries for CPU and memory limits | ||||||
|         minCPULimit = '503m' |         minCPULimit = '503m' | ||||||
|  | @ -827,7 +958,9 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|             "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) |             "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) | ||||||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") |         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
| 
 | 
 | ||||||
|         self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") |         # wait for switched over | ||||||
|  |         k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) | ||||||
|  |         k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) | ||||||
|         self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running") |         self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running") | ||||||
| 
 | 
 | ||||||
|         def verify_pod_limits(): |         def verify_pod_limits(): | ||||||
|  | @ -968,15 +1101,15 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") |             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
| 
 | 
 | ||||||
|             # node affinity change should cause another rolling update and relocation of replica |             # node affinity change should cause another rolling update and relocation of replica | ||||||
|             k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=replica,' + cluster_label) |  | ||||||
|             k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) |             k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) | ||||||
|  |             k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) | ||||||
| 
 | 
 | ||||||
|         except timeout_decorator.TimeoutError: |         except timeout_decorator.TimeoutError: | ||||||
|             print('Operator log: {}'.format(k8s.get_operator_log())) |             print('Operator log: {}'.format(k8s.get_operator_log())) | ||||||
|             raise |             raise | ||||||
| 
 | 
 | ||||||
|         # toggle pod anti affinity to make sure replica and master run on separate nodes |         # toggle pod anti affinity to make sure replica and master run on separate nodes | ||||||
|         self.assert_distributed_pods(replica_nodes) |         self.assert_distributed_pods(master_nodes) | ||||||
| 
 | 
 | ||||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) |     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||||
|     def test_node_readiness_label(self): |     def test_node_readiness_label(self): | ||||||
|  | @ -1192,133 +1325,6 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|         self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 2, |         self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 2, | ||||||
|             "Found incorrect number of rotation users", 10, 5) |             "Found incorrect number of rotation users", 10, 5) | ||||||
| 
 | 
 | ||||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) |  | ||||||
|     def test_patroni_config_update(self): |  | ||||||
|         ''' |  | ||||||
|             Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni |  | ||||||
|             and query Patroni config endpoint to check if manifest changes got applied |  | ||||||
|             via restarting cluster through Patroni's rest api |  | ||||||
|         ''' |  | ||||||
|         k8s = self.k8s |  | ||||||
|         leader = k8s.get_cluster_leader_pod() |  | ||||||
|         replica = k8s.get_cluster_replica_pod() |  | ||||||
|         masterCreationTimestamp = leader.metadata.creation_timestamp |  | ||||||
|         replicaCreationTimestamp = replica.metadata.creation_timestamp |  | ||||||
|         new_max_connections_value = "50" |  | ||||||
| 
 |  | ||||||
|         # adjust Postgres config |  | ||||||
|         pg_patch_config = { |  | ||||||
|             "spec": { |  | ||||||
|                 "postgresql": { |  | ||||||
|                     "parameters": { |  | ||||||
|                         "max_connections": new_max_connections_value |  | ||||||
|                      } |  | ||||||
|                  }, |  | ||||||
|                  "patroni": { |  | ||||||
|                     "slots": { |  | ||||||
|                         "test_slot": { |  | ||||||
|                             "type": "physical" |  | ||||||
|                         } |  | ||||||
|                     }, |  | ||||||
|                     "ttl": 29, |  | ||||||
|                     "loop_wait": 9, |  | ||||||
|                     "retry_timeout": 9, |  | ||||||
|                     "synchronous_mode": True |  | ||||||
|                  } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         try: |  | ||||||
|             k8s.api.custom_objects_api.patch_namespaced_custom_object( |  | ||||||
|                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config) |  | ||||||
|              |  | ||||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") |  | ||||||
| 
 |  | ||||||
|             def compare_config(): |  | ||||||
|                 effective_config = k8s.patroni_rest(leader.metadata.name, "config") |  | ||||||
|                 desired_config = pg_patch_config["spec"]["patroni"] |  | ||||||
|                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] |  | ||||||
|                 effective_parameters = effective_config["postgresql"]["parameters"] |  | ||||||
|                 self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], |  | ||||||
|                             "max_connections not updated") |  | ||||||
|                 self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") |  | ||||||
|                 self.assertEqual(desired_config["ttl"], effective_config["ttl"], |  | ||||||
|                             "ttl not updated") |  | ||||||
|                 self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"], |  | ||||||
|                             "loop_wait not updated") |  | ||||||
|                 self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"], |  | ||||||
|                             "retry_timeout not updated") |  | ||||||
|                 self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"], |  | ||||||
|                             "synchronous_mode not updated") |  | ||||||
|                 return True |  | ||||||
| 
 |  | ||||||
|             # check if Patroni config has been updated |  | ||||||
|             self.eventuallyTrue(compare_config, "Postgres config not applied") |  | ||||||
| 
 |  | ||||||
|             # make sure that pods were not recreated |  | ||||||
|             leader = k8s.get_cluster_leader_pod() |  | ||||||
|             replica = k8s.get_cluster_replica_pod() |  | ||||||
|             self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp, |  | ||||||
|                             "Master pod creation timestamp is updated") |  | ||||||
|             self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp, |  | ||||||
|                             "Master pod creation timestamp is updated") |  | ||||||
| 
 |  | ||||||
|             # query max_connections setting |  | ||||||
|             setting_query = """ |  | ||||||
|                SELECT setting |  | ||||||
|                  FROM pg_settings |  | ||||||
|                 WHERE name = 'max_connections'; |  | ||||||
|             """ |  | ||||||
|             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, |  | ||||||
|                 "New max_connections setting not applied on master", 10, 5) |  | ||||||
|             self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, |  | ||||||
|                 "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) |  | ||||||
| 
 |  | ||||||
|             # the next sync should restart the replica because it has pending_restart flag set |  | ||||||
|             # force next sync by deleting the operator pod |  | ||||||
|             k8s.delete_operator_pod() |  | ||||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") |  | ||||||
| 
 |  | ||||||
|             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, |  | ||||||
|                 "New max_connections setting not applied on replica", 10, 5) |  | ||||||
| 
 |  | ||||||
|             # decrease max_connections again |  | ||||||
|             # this time restart will be correct and new value should appear on both instances |  | ||||||
|             lower_max_connections_value = "30" |  | ||||||
|             pg_patch_max_connections = { |  | ||||||
|                 "spec": { |  | ||||||
|                     "postgresql": { |  | ||||||
|                         "parameters": { |  | ||||||
|                             "max_connections": lower_max_connections_value |  | ||||||
|                         } |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
| 
 |  | ||||||
|             k8s.api.custom_objects_api.patch_namespaced_custom_object( |  | ||||||
|                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) |  | ||||||
| 
 |  | ||||||
|             self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") |  | ||||||
| 
 |  | ||||||
|             # check Patroni config again |  | ||||||
|             pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value |  | ||||||
|             self.eventuallyTrue(compare_config, "Postgres config not applied") |  | ||||||
| 
 |  | ||||||
|             # and query max_connections setting again |  | ||||||
|             self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, |  | ||||||
|                 "Previous max_connections setting not applied on master", 10, 5) |  | ||||||
|             self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, |  | ||||||
|                 "Previous max_connections setting not applied on replica", 10, 5) |  | ||||||
| 
 |  | ||||||
|         except timeout_decorator.TimeoutError: |  | ||||||
|             print('Operator log: {}'.format(k8s.get_operator_log())) |  | ||||||
|             raise |  | ||||||
| 
 |  | ||||||
|         # make sure cluster is in a good state for further tests |  | ||||||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") |  | ||||||
|         self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, |  | ||||||
|                              "No 2 pods running") |  | ||||||
| 
 |  | ||||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) |     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||||
|     def test_rolling_update_flag(self): |     def test_rolling_update_flag(self): | ||||||
|         ''' |         ''' | ||||||
|  | @ -1405,7 +1411,7 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|             "data": { |             "data": { | ||||||
|                 "pod_label_wait_timeout": "2s", |                 "pod_label_wait_timeout": "2s", | ||||||
|                 "resync_period": "30s", |                 "resync_period": "30s", | ||||||
|                 "repair_period": "10s", |                 "repair_period": "30s", | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -13,6 +13,7 @@ import ( | ||||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 
 | 
 | ||||||
|  | 	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||||||
| 	"github.com/zalando/postgres-operator/pkg/spec" | 	"github.com/zalando/postgres-operator/pkg/spec" | ||||||
| 	"github.com/zalando/postgres-operator/pkg/util" | 	"github.com/zalando/postgres-operator/pkg/util" | ||||||
| 	"github.com/zalando/postgres-operator/pkg/util/patroni" | 	"github.com/zalando/postgres-operator/pkg/util/patroni" | ||||||
|  | @ -349,6 +350,54 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *Cluster) getPatroniConfig(pod *v1.Pod) (acidv1.Patroni, map[string]string, error) { | ||||||
|  | 	var ( | ||||||
|  | 		patroniConfig acidv1.Patroni | ||||||
|  | 		pgParameters  map[string]string | ||||||
|  | 	) | ||||||
|  | 	podName := util.NameFromMeta(pod.ObjectMeta) | ||||||
|  | 	err := retryutil.Retry(1*time.Second, 5*time.Second, | ||||||
|  | 		func() (bool, error) { | ||||||
|  | 			var err error | ||||||
|  | 			patroniConfig, pgParameters, err = c.patroni.GetConfig(pod) | ||||||
|  | 
 | ||||||
|  | 			if err != nil { | ||||||
|  | 				return false, err | ||||||
|  | 			} | ||||||
|  | 			return true, nil | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	if err != nil { | ||||||
|  | 		return acidv1.Patroni{}, nil, fmt.Errorf("could not get Postgres config from pod %s: %v", podName, err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return patroniConfig, pgParameters, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) { | ||||||
|  | 	var memberData patroni.MemberData | ||||||
|  | 	err := retryutil.Retry(1*time.Second, 5*time.Second, | ||||||
|  | 		func() (bool, error) { | ||||||
|  | 			var err error | ||||||
|  | 			memberData, err = c.patroni.GetMemberData(pod) | ||||||
|  | 
 | ||||||
|  | 			if err != nil { | ||||||
|  | 				return false, err | ||||||
|  | 			} | ||||||
|  | 			return true, nil | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return patroni.MemberData{}, fmt.Errorf("could not get member data: %v", err) | ||||||
|  | 	} | ||||||
|  | 	if memberData.State == "creating replica" { | ||||||
|  | 		return patroni.MemberData{}, fmt.Errorf("replica currently being initialized") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return memberData, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { | func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { | ||||||
| 	ch := c.registerPodSubscriber(podName) | 	ch := c.registerPodSubscriber(podName) | ||||||
| 	defer c.unregisterPodSubscriber(podName) | 	defer c.unregisterPodSubscriber(podName) | ||||||
|  | @ -380,54 +429,10 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { | ||||||
| 	return pod, nil | 	return pod, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool { |  | ||||||
| 
 |  | ||||||
| 	/* |  | ||||||
| 	 Operator should not re-create pods if there is at least one replica being bootstrapped |  | ||||||
| 	 because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag). |  | ||||||
| 
 |  | ||||||
| 	 XXX operator cannot forbid replica re-init, so we might still fail if re-init is started |  | ||||||
| 	 after this check succeeds but before a pod is re-created |  | ||||||
| 	*/ |  | ||||||
| 	for _, pod := range pods { |  | ||||||
| 		c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for _, pod := range pods { |  | ||||||
| 
 |  | ||||||
| 		var data patroni.MemberData |  | ||||||
| 
 |  | ||||||
| 		err := retryutil.Retry(1*time.Second, 5*time.Second, |  | ||||||
| 			func() (bool, error) { |  | ||||||
| 				var err error |  | ||||||
| 				data, err = c.patroni.GetMemberData(&pod) |  | ||||||
| 
 |  | ||||||
| 				if err != nil { |  | ||||||
| 					return false, err |  | ||||||
| 				} |  | ||||||
| 				return true, nil |  | ||||||
| 			}, |  | ||||||
| 		) |  | ||||||
| 
 |  | ||||||
| 		if err != nil { |  | ||||||
| 			c.logger.Errorf("failed to get Patroni state for pod: %s", err) |  | ||||||
| 			return false |  | ||||||
| 		} else if data.State == "creating replica" { |  | ||||||
| 			c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name) |  | ||||||
| 			return false |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return true |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error { | func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error { | ||||||
| 	c.setProcessName("starting to recreate pods") | 	c.setProcessName("starting to recreate pods") | ||||||
| 	c.logger.Infof("there are %d pods in the cluster to recreate", len(pods)) | 	c.logger.Infof("there are %d pods in the cluster to recreate", len(pods)) | ||||||
| 
 | 
 | ||||||
| 	if !c.isSafeToRecreatePods(pods) { |  | ||||||
| 		return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized") |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	var ( | 	var ( | ||||||
| 		masterPod, newMasterPod *v1.Pod | 		masterPod, newMasterPod *v1.Pod | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
|  | @ -277,6 +277,7 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 		restartMasterFirst bool | 		restartMasterFirst bool | ||||||
| 	) | 	) | ||||||
| 	podsToRecreate := make([]v1.Pod, 0) | 	podsToRecreate := make([]v1.Pod, 0) | ||||||
|  | 	isSafeToRecreatePods := true | ||||||
| 	switchoverCandidates := make([]spec.NamespacedName, 0) | 	switchoverCandidates := make([]spec.NamespacedName, 0) | ||||||
| 
 | 
 | ||||||
| 	pods, err := c.listPods() | 	pods, err := c.listPods() | ||||||
|  | @ -402,21 +403,20 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 	// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
 | 	// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
 | ||||||
| 	// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
 | 	// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
 | ||||||
| 	for i, pod := range pods { | 	for i, pod := range pods { | ||||||
| 		emptyPatroniConfig := acidv1.Patroni{} | 		patroniConfig, pgParameters, err := c.getPatroniConfig(&pod) | ||||||
| 		podName := util.NameFromMeta(pods[i].ObjectMeta) |  | ||||||
| 		patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod) |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) | 			c.logger.Warningf("%v", err) | ||||||
|  | 			isSafeToRecreatePods = false | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		restartWait = patroniConfig.LoopWait | 		restartWait = patroniConfig.LoopWait | ||||||
| 
 | 
 | ||||||
| 		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | 		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | ||||||
| 		// do not attempt a restart
 | 		// do not attempt a restart
 | ||||||
| 		if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { | 		if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 { | ||||||
| 			restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) | 			restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 			// it could take up to LoopWait to apply the config
 | 			// it could take up to LoopWait to apply the config
 | ||||||
|  | @ -437,50 +437,59 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 			remainingPods = append(remainingPods, &pods[i]) | 			remainingPods = append(remainingPods, &pods[i]) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		c.restartInstance(&pod, restartWait) | 		if err = c.restartInstance(&pod, restartWait); err != nil { | ||||||
|  | 			c.logger.Errorf("%v", err) | ||||||
|  | 			isSafeToRecreatePods = false | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// in most cases only the master should be left to restart
 | 	// in most cases only the master should be left to restart
 | ||||||
| 	if len(remainingPods) > 0 { | 	if len(remainingPods) > 0 { | ||||||
| 		for _, remainingPod := range remainingPods { | 		for _, remainingPod := range remainingPods { | ||||||
| 			c.restartInstance(remainingPod, restartWait) | 			if err = c.restartInstance(remainingPod, restartWait); err != nil { | ||||||
|  | 				c.logger.Errorf("%v", err) | ||||||
|  | 				isSafeToRecreatePods = false | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// if we get here we also need to re-create the pods (either leftovers from the old
 | 	// if we get here we also need to re-create the pods (either leftovers from the old
 | ||||||
| 	// statefulset or those that got their configuration from the outdated statefulset)
 | 	// statefulset or those that got their configuration from the outdated statefulset)
 | ||||||
| 	if len(podsToRecreate) > 0 { | 	if len(podsToRecreate) > 0 { | ||||||
|  | 		if isSafeToRecreatePods { | ||||||
| 			c.logger.Debugln("performing rolling update") | 			c.logger.Debugln("performing rolling update") | ||||||
| 			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") | 			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") | ||||||
| 			if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { | 			if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { | ||||||
| 				return fmt.Errorf("could not recreate pods: %v", err) | 				return fmt.Errorf("could not recreate pods: %v", err) | ||||||
| 			} | 			} | ||||||
| 			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") | 			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") | ||||||
|  | 		} else { | ||||||
|  | 			c.logger.Warningf("postpone pod recreation until next sync") | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) { | func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error { | ||||||
|  | 	// if the config update requires a restart, call Patroni restart
 | ||||||
| 	podName := util.NameFromMeta(pod.ObjectMeta) | 	podName := util.NameFromMeta(pod.ObjectMeta) | ||||||
| 	role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | 	role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | ||||||
| 
 | 	memberData, err := c.getPatroniMemberData(pod) | ||||||
| 	// if the config update requires a restart, call Patroni restart
 |  | ||||||
| 	memberData, err := c.patroni.GetMemberData(pod) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Debugf("could not get member data of %s pod %s - skipping possible restart attempt: %v", role, podName, err) | 		return fmt.Errorf("could not restart Postgres in %s pod %s: %v", role, podName, err) | ||||||
| 		return |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// do restart only when it is pending
 | 	// do restart only when it is pending
 | ||||||
| 	if memberData.PendingRestart { | 	if memberData.PendingRestart { | ||||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) | 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, podName)) | ||||||
| 		if err := c.patroni.Restart(pod); err != nil { | 		if err := c.patroni.Restart(pod); err != nil { | ||||||
| 			c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) | 			return err | ||||||
| 			return |  | ||||||
| 		} | 		} | ||||||
| 		time.Sleep(time.Duration(restartWait) * time.Second) | 		time.Sleep(time.Duration(restartWait) * time.Second) | ||||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) | 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, podName)) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // AnnotationsToPropagate get the annotations to update if required
 | // AnnotationsToPropagate get the annotations to update if required
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue