diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 638cd05b2..39fd45323 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -395,12 +395,13 @@ class EndToEndTestCase(unittest.TestCase): "spec": { "postgresql": { "parameters": { - "max_connections": new_max_connections_value + "max_connections": new_max_connections_value, + "wal_level": "logical" } }, "patroni": { "slots": { - "test_slot": { + "first_slot": { "type": "physical" } }, @@ -437,6 +438,8 @@ class EndToEndTestCase(unittest.TestCase): "synchronous_mode not updated") self.assertEqual(desired_config["failsafe_mode"], effective_config["failsafe_mode"], "failsafe_mode not updated") + self.assertEqual(desired_config["slots"], effective_config["slots"], + "slots not updated") return True # check if Patroni config has been updated @@ -497,6 +500,84 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, "Previous max_connections setting not applied on replica", 10, 5) + # patch new slot via Patroni REST + patroni_slot = "test_patroni_slot" + patch_slot_command = """curl -s -XPATCH -d '{"slots": {"test_patroni_slot": {"type": "physical"}}}' localhost:8008/config""" + pg_patch_config["spec"]["patroni"]["slots"][patroni_slot] = {"type": "physical"} + + k8s.exec_with_kubectl(leader.metadata.name, patch_slot_command) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyTrue(compare_config, "Postgres config not applied") + + # test adding new slots + pg_add_new_slots_patch = { + "spec": { + "patroni": { + "slots": { + "test_slot": { + "type": "logical", + "database": "foo", + "plugin": "pgoutput" + }, + "test_slot_2": { + "type": "physical" + } + } + } + } + } + + for slot_name, slot_details in pg_add_new_slots_patch["spec"]["patroni"]["slots"].items(): + pg_patch_config["spec"]["patroni"]["slots"][slot_name] = slot_details + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_add_new_slots_patch) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyTrue(compare_config, "Postgres config not applied") + + # delete test_slot_2 from config and change the database type for test_slot + slot_to_change = "test_slot" + slot_to_remove = "test_slot_2" + pg_delete_slot_patch = { + "spec": { + "patroni": { + "slots": { + "test_slot": { + "type": "logical", + "database": "bar", + "plugin": "pgoutput" + }, + "test_slot_2": None + } + } + } + } + + pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar" + del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove] + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyTrue(compare_config, "Postgres config not applied") + + get_slot_query = """ + SELECT %s + FROM pg_replication_slots + WHERE slot_name = '%s'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", slot_to_remove))), 0, + "The replication slot cannot be deleted", 10, 5) + + self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar", + "The replication slot cannot be updated", 10, 5) + + # make sure slot from Patroni didn't get deleted + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1, + "The replication slot from Patroni gets deleted", 10, 5) + except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) raise diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f18cf8053..723fa4171 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -84,6 +84,7 @@ type Cluster struct { userSyncStrategy spec.UserSyncer deleteOptions metav1.DeleteOptions podEventsQueue *cache.FIFO + replicationSlots map[string]interface{} teamsAPIClient teams.Interface oauthTokenGetter OAuthTokenGetter @@ -140,6 +141,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres podEventsQueue: podEventsQueue, KubeClient: kubeClient, currentMajorVersion: 0, + replicationSlots: make(map[string]interface{}), } cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName()) cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) @@ -374,6 +376,10 @@ func (c *Cluster) Create() error { } } + for slotName, desiredSlot := range c.Spec.Patroni.Slots { + c.replicationSlots[slotName] = desiredSlot + } + return nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 76c9fd12a..af85eb076 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -579,8 +579,18 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } } + slotsToSet := make(map[string]interface{}) + // check if there is any slot deletion + for slotName, effectiveSlot := range c.replicationSlots { + if desiredSlot, exists := desiredPatroniConfig.Slots[slotName]; exists { + if reflect.DeepEqual(effectiveSlot, desiredSlot) { + continue + } + } + slotsToSet[slotName] = nil + delete(c.replicationSlots, slotName) + } // check if specified slots exist in config and if they differ - slotsToSet := make(map[string]map[string]string) for slotName, desiredSlot := range desiredPatroniConfig.Slots { if effectiveSlot, exists := effectivePatroniConfig.Slots[slotName]; exists { if reflect.DeepEqual(desiredSlot, effectiveSlot) { @@ -588,6 +598,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } } slotsToSet[slotName] = desiredSlot + c.replicationSlots[slotName] = desiredSlot } if len(slotsToSet) > 0 { configToSet["slots"] = slotsToSet @@ -614,7 +625,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } // check if there exist only config updates that require a restart of the primary - if !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 { + if len(restartPrimary) > 0 && !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 { requiresMasterRestart = true } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 4d50b791f..3cd3d3f28 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -144,6 +144,13 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { client, _ := newFakeK8sSyncClient() clusterName := "acid-test-cluster" namespace := "default" + testSlots := map[string]map[string]string{ + "slot1": { + "type": "logical", + "plugin": "wal2json", + "database": "foo", + }, + } ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -208,11 +215,26 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { // simulate existing config that differs from cluster.Spec tests := []struct { - subtest string - patroni acidv1.Patroni - pgParams map[string]string - restartPrimary bool + subtest string + patroni acidv1.Patroni + desiredSlots map[string]map[string]string + removedSlots map[string]map[string]string + pgParams map[string]string + shouldBePatched bool + restartPrimary bool }{ + { + subtest: "Patroni and Postgresql.Parameters do not differ", + patroni: acidv1.Patroni{ + TTL: 20, + }, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + shouldBePatched: false, + restartPrimary: false, + }, { subtest: "Patroni and Postgresql.Parameters differ - restart replica first", patroni: acidv1.Patroni{ @@ -222,7 +244,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "500", // desired 200 "max_connections": "100", // desired 50 }, - restartPrimary: false, + shouldBePatched: true, + restartPrimary: false, }, { subtest: "multiple Postgresql.Parameters differ - restart replica first", @@ -231,7 +254,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "500", // desired 200 "max_connections": "100", // desired 50 }, - restartPrimary: false, + shouldBePatched: true, + restartPrimary: false, }, { subtest: "desired max_connections bigger - restart replica first", @@ -240,7 +264,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "200", "max_connections": "30", // desired 50 }, - restartPrimary: false, + shouldBePatched: true, + restartPrimary: false, }, { subtest: "desired max_connections smaller - restart master first", @@ -249,19 +274,105 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "200", "max_connections": "100", // desired 50 }, - restartPrimary: true, + shouldBePatched: true, + restartPrimary: true, + }, + { + subtest: "slot does not exist but is desired", + patroni: acidv1.Patroni{ + TTL: 20, + }, + desiredSlots: testSlots, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + shouldBePatched: true, + restartPrimary: false, + }, + { + subtest: "slot exist, nothing specified in manifest", + patroni: acidv1.Patroni{ + TTL: 20, + Slots: map[string]map[string]string{ + "slot1": { + "type": "logical", + "plugin": "pgoutput", + "database": "foo", + }, + }, + }, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + shouldBePatched: false, + restartPrimary: false, + }, + { + subtest: "slot is removed from manifest", + patroni: acidv1.Patroni{ + TTL: 20, + Slots: map[string]map[string]string{ + "slot1": { + "type": "logical", + "plugin": "pgoutput", + "database": "foo", + }, + }, + }, + removedSlots: testSlots, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + shouldBePatched: true, + restartPrimary: false, + }, + { + subtest: "slot plugin differs", + patroni: acidv1.Patroni{ + TTL: 20, + Slots: map[string]map[string]string{ + "slot1": { + "type": "logical", + "plugin": "pgoutput", + "database": "foo", + }, + }, + }, + desiredSlots: testSlots, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + shouldBePatched: true, + restartPrimary: false, }, } for _, tt := range tests { + if len(tt.desiredSlots) > 0 { + cluster.Spec.Patroni.Slots = tt.desiredSlots + } + if len(tt.removedSlots) > 0 { + for slotName, removedSlot := range tt.removedSlots { + cluster.replicationSlots[slotName] = removedSlot + } + } + configPatched, requirePrimaryRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters) assert.NoError(t, err) - if configPatched != true { + if configPatched != tt.shouldBePatched { t.Errorf("%s - %s: expected config update did not happen", testName, tt.subtest) } if requirePrimaryRestart != tt.restartPrimary { t.Errorf("%s - %s: wrong master restart strategy, got restart %v, expected restart %v", testName, tt.subtest, requirePrimaryRestart, tt.restartPrimary) } + + // reset slots for next tests + cluster.Spec.Patroni.Slots = nil + cluster.replicationSlots = make(map[string]interface{}) } testsFailsafe := []struct { @@ -342,7 +453,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { effectiveVal: util.True(), desiredVal: true, shouldBePatched: false, // should not require patching - restartPrimary: true, + restartPrimary: false, }, }