diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 77c14e272..d1db4991b 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -205,7 +205,7 @@ class EndToEndTestCase(unittest.TestCase): "enable_team_member_deprecation": "true", "role_deletion_suffix": "_delete_me", "resync_period": "15s", - "repair_period": "10s", + "repair_period": "15s", }, } k8s.update_config(enable_postgres_team_crd) @@ -296,6 +296,133 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_config_update(self): + ''' + Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni + and query Patroni config endpoint to check if manifest changes got applied + via restarting cluster through Patroni's rest api + ''' + k8s = self.k8s + leader = k8s.get_cluster_leader_pod() + replica = k8s.get_cluster_replica_pod() + masterCreationTimestamp = leader.metadata.creation_timestamp + replicaCreationTimestamp = replica.metadata.creation_timestamp + new_max_connections_value = "50" + + # adjust Postgres config + pg_patch_config = { + "spec": { + "postgresql": { + "parameters": { + "max_connections": new_max_connections_value + } + }, + "patroni": { + "slots": { + "test_slot": { + "type": "physical" + } + }, + "ttl": 29, + "loop_wait": 9, + "retry_timeout": 9, + "synchronous_mode": True + } + } + } + + try: + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + def compare_config(): + effective_config = k8s.patroni_rest(leader.metadata.name, "config") + desired_config = pg_patch_config["spec"]["patroni"] + desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] + effective_parameters = effective_config["postgresql"]["parameters"] + self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], + "max_connections not updated") + self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") + self.assertEqual(desired_config["ttl"], effective_config["ttl"], + "ttl not updated") + self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"], + "loop_wait not updated") + self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"], + "retry_timeout not updated") + self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"], + "synchronous_mode not updated") + return True + + # check if Patroni config has been updated + self.eventuallyTrue(compare_config, "Postgres config not applied") + + # make sure that pods were not recreated + leader = k8s.get_cluster_leader_pod() + replica = k8s.get_cluster_replica_pod() + self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp, + "Master pod creation timestamp is updated") + self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp, + "Master pod creation timestamp is updated") + + # query max_connections setting + setting_query = """ + SELECT setting + FROM pg_settings + WHERE name = 'max_connections'; + """ + self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, + "New max_connections setting not applied on master", 10, 5) + self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, + "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) + + # the next sync should restart the replica because it has pending_restart flag set + # force next sync by deleting the operator pod + k8s.delete_operator_pod() + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, + "New max_connections setting not applied on replica", 10, 5) + + # decrease max_connections again + # this time restart will be correct and new value should appear on both instances + lower_max_connections_value = "30" + pg_patch_max_connections = { + "spec": { + "postgresql": { + "parameters": { + "max_connections": lower_max_connections_value + } + } + } + } + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # check Patroni config again + pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value + self.eventuallyTrue(compare_config, "Postgres config not applied") + + # and query max_connections setting again + self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, + "Previous max_connections setting not applied on master", 10, 5) + self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, + "Previous max_connections setting not applied on replica", 10, 5) + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + # make sure cluster is in a good state for further tests + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, + "No 2 pods running") + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_cross_namespace_secrets(self): ''' @@ -794,7 +921,11 @@ class EndToEndTestCase(unittest.TestCase): Lower resource limits below configured minimum and let operator fix it ''' k8s = self.k8s - # self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Cluster not healthy at start") + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + # get nodes of master and replica(s) (expected target of new master) + _, replica_nodes = k8s.get_pg_nodes(cluster_label) + self.assertNotEqual(replica_nodes, []) # configure minimum boundaries for CPU and memory limits minCPULimit = '503m' @@ -827,7 +958,9 @@ class EndToEndTestCase(unittest.TestCase): "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") + # wait for switched over + k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running") def verify_pod_limits(): @@ -968,15 +1101,15 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") # node affinity change should cause another rolling update and relocation of replica - k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=replica,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) raise # toggle pod anti affinity to make sure replica and master run on separate nodes - self.assert_distributed_pods(replica_nodes) + self.assert_distributed_pods(master_nodes) @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_node_readiness_label(self): @@ -1192,133 +1325,6 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 2, "Found incorrect number of rotation users", 10, 5) - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_patroni_config_update(self): - ''' - Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni - and query Patroni config endpoint to check if manifest changes got applied - via restarting cluster through Patroni's rest api - ''' - k8s = self.k8s - leader = k8s.get_cluster_leader_pod() - replica = k8s.get_cluster_replica_pod() - masterCreationTimestamp = leader.metadata.creation_timestamp - replicaCreationTimestamp = replica.metadata.creation_timestamp - new_max_connections_value = "50" - - # adjust Postgres config - pg_patch_config = { - "spec": { - "postgresql": { - "parameters": { - "max_connections": new_max_connections_value - } - }, - "patroni": { - "slots": { - "test_slot": { - "type": "physical" - } - }, - "ttl": 29, - "loop_wait": 9, - "retry_timeout": 9, - "synchronous_mode": True - } - } - } - - try: - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config) - - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - def compare_config(): - effective_config = k8s.patroni_rest(leader.metadata.name, "config") - desired_config = pg_patch_config["spec"]["patroni"] - desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] - effective_parameters = effective_config["postgresql"]["parameters"] - self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], - "max_connections not updated") - self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") - self.assertEqual(desired_config["ttl"], effective_config["ttl"], - "ttl not updated") - self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"], - "loop_wait not updated") - self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"], - "retry_timeout not updated") - self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"], - "synchronous_mode not updated") - return True - - # check if Patroni config has been updated - self.eventuallyTrue(compare_config, "Postgres config not applied") - - # make sure that pods were not recreated - leader = k8s.get_cluster_leader_pod() - replica = k8s.get_cluster_replica_pod() - self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp, - "Master pod creation timestamp is updated") - self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp, - "Master pod creation timestamp is updated") - - # query max_connections setting - setting_query = """ - SELECT setting - FROM pg_settings - WHERE name = 'max_connections'; - """ - self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, - "New max_connections setting not applied on master", 10, 5) - self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, - "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) - - # the next sync should restart the replica because it has pending_restart flag set - # force next sync by deleting the operator pod - k8s.delete_operator_pod() - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, - "New max_connections setting not applied on replica", 10, 5) - - # decrease max_connections again - # this time restart will be correct and new value should appear on both instances - lower_max_connections_value = "30" - pg_patch_max_connections = { - "spec": { - "postgresql": { - "parameters": { - "max_connections": lower_max_connections_value - } - } - } - } - - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) - - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # check Patroni config again - pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value - self.eventuallyTrue(compare_config, "Postgres config not applied") - - # and query max_connections setting again - self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, - "Previous max_connections setting not applied on master", 10, 5) - self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, - "Previous max_connections setting not applied on replica", 10, 5) - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - # make sure cluster is in a good state for further tests - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, - "No 2 pods running") - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_rolling_update_flag(self): ''' @@ -1405,7 +1411,7 @@ class EndToEndTestCase(unittest.TestCase): "data": { "pod_label_wait_timeout": "2s", "resync_period": "30s", - "repair_period": "10s", + "repair_period": "30s", } } diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 805112e29..9e8ded844 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -13,6 +13,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/patroni" @@ -349,6 +350,54 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st return nil } +func (c *Cluster) getPatroniConfig(pod *v1.Pod) (acidv1.Patroni, map[string]string, error) { + var ( + patroniConfig acidv1.Patroni + pgParameters map[string]string + ) + podName := util.NameFromMeta(pod.ObjectMeta) + err := retryutil.Retry(1*time.Second, 5*time.Second, + func() (bool, error) { + var err error + patroniConfig, pgParameters, err = c.patroni.GetConfig(pod) + + if err != nil { + return false, err + } + return true, nil + }, + ) + + if err != nil { + return acidv1.Patroni{}, nil, fmt.Errorf("could not get Postgres config from pod %s: %v", podName, err) + } + + return patroniConfig, pgParameters, nil +} + +func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) { + var memberData patroni.MemberData + err := retryutil.Retry(1*time.Second, 5*time.Second, + func() (bool, error) { + var err error + memberData, err = c.patroni.GetMemberData(pod) + + if err != nil { + return false, err + } + return true, nil + }, + ) + if err != nil { + return patroni.MemberData{}, fmt.Errorf("could not get member data: %v", err) + } + if memberData.State == "creating replica" { + return patroni.MemberData{}, fmt.Errorf("replica currently being initialized") + } + + return memberData, nil +} + func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) @@ -380,54 +429,10 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { return pod, nil } -func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool { - - /* - Operator should not re-create pods if there is at least one replica being bootstrapped - because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag). - - XXX operator cannot forbid replica re-init, so we might still fail if re-init is started - after this check succeeds but before a pod is re-created - */ - for _, pod := range pods { - c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP) - } - - for _, pod := range pods { - - var data patroni.MemberData - - err := retryutil.Retry(1*time.Second, 5*time.Second, - func() (bool, error) { - var err error - data, err = c.patroni.GetMemberData(&pod) - - if err != nil { - return false, err - } - return true, nil - }, - ) - - if err != nil { - c.logger.Errorf("failed to get Patroni state for pod: %s", err) - return false - } else if data.State == "creating replica" { - c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name) - return false - } - } - return true -} - func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error { c.setProcessName("starting to recreate pods") c.logger.Infof("there are %d pods in the cluster to recreate", len(pods)) - if !c.isSafeToRecreatePods(pods) { - return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized") - } - var ( masterPod, newMasterPod *v1.Pod ) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 9f84ef3da..bbf023764 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -277,6 +277,7 @@ func (c *Cluster) syncStatefulSet() error { restartMasterFirst bool ) podsToRecreate := make([]v1.Pod, 0) + isSafeToRecreatePods := true switchoverCandidates := make([]spec.NamespacedName, 0) pods, err := c.listPods() @@ -402,21 +403,20 @@ func (c *Cluster) syncStatefulSet() error { // get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used. for i, pod := range pods { - emptyPatroniConfig := acidv1.Patroni{} - podName := util.NameFromMeta(pods[i].ObjectMeta) - patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod) + patroniConfig, pgParameters, err := c.getPatroniConfig(&pod) if err != nil { - c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) + c.logger.Warningf("%v", err) + isSafeToRecreatePods = false continue } restartWait = patroniConfig.LoopWait // empty config probably means cluster is not fully initialized yet, e.g. restoring from backup // do not attempt a restart - if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { + if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 { restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) if err != nil { - c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) + c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err) continue } // it could take up to LoopWait to apply the config @@ -437,50 +437,59 @@ func (c *Cluster) syncStatefulSet() error { remainingPods = append(remainingPods, &pods[i]) continue } - c.restartInstance(&pod, restartWait) + if err = c.restartInstance(&pod, restartWait); err != nil { + c.logger.Errorf("%v", err) + isSafeToRecreatePods = false + } } // in most cases only the master should be left to restart if len(remainingPods) > 0 { for _, remainingPod := range remainingPods { - c.restartInstance(remainingPod, restartWait) + if err = c.restartInstance(remainingPod, restartWait); err != nil { + c.logger.Errorf("%v", err) + isSafeToRecreatePods = false + } } } // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) if len(podsToRecreate) > 0 { - c.logger.Debugln("performing rolling update") - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") - if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { - return fmt.Errorf("could not recreate pods: %v", err) + if isSafeToRecreatePods { + c.logger.Debugln("performing rolling update") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") + if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { + return fmt.Errorf("could not recreate pods: %v", err) + } + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") + } else { + c.logger.Warningf("postpone pod recreation until next sync") } - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") } return nil } -func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) { +func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error { + // if the config update requires a restart, call Patroni restart podName := util.NameFromMeta(pod.ObjectMeta) role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) - - // if the config update requires a restart, call Patroni restart - memberData, err := c.patroni.GetMemberData(pod) + memberData, err := c.getPatroniMemberData(pod) if err != nil { - c.logger.Debugf("could not get member data of %s pod %s - skipping possible restart attempt: %v", role, podName, err) - return + return fmt.Errorf("could not restart Postgres in %s pod %s: %v", role, podName, err) } // do restart only when it is pending if memberData.PendingRestart { - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, podName)) if err := c.patroni.Restart(pod); err != nil { - c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) - return + return err } time.Sleep(time.Duration(restartWait) * time.Second) - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, podName)) } + + return nil } // AnnotationsToPropagate get the annotations to update if required