From 2f7e3ee847bcf910a71b1a01a99350a3cbededeb Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 20 Aug 2024 14:38:07 +0200 Subject: [PATCH] fix stream duplication on operator restart (#2733) * fix stream duplication on operator restart * add try except to streams e2e test --- e2e/tests/test_e2e.py | 216 ++++++++++++++++++------------------ pkg/cluster/streams.go | 69 ++++++++---- pkg/cluster/streams_test.go | 90 +++++++++++---- 3 files changed, 224 insertions(+), 151 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index bd7dfef57..06e5c5231 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2131,130 +2131,136 @@ class EndToEndTestCase(unittest.TestCase): verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"] ) cluster_role.rules.append(fes_cluster_role_rule) - k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role) - # create a table in one of the database of acid-minimal-cluster - create_stream_table = """ - CREATE TABLE test_table (id int, payload jsonb); - """ - self.query_database(leader.metadata.name, "foo", create_stream_table) + try: + k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role) - # update the manifest with the streams section - patch_streaming_config = { - "spec": { - "patroni": { - "slots": { - "manual_slot": { - "type": "physical" - } - } - }, - "streams": [ - { - "applicationId": "test-app", - "batchSize": 100, - "database": "foo", - "enableRecovery": True, - "tables": { - "test_table": { - "eventType": "test-event", - "idColumn": "id", - "payloadColumn": "payload", - "recoveryEventType": "test-event-dlq" + # create a table in one of the database of acid-minimal-cluster + create_stream_table = """ + CREATE TABLE test_table (id int, payload jsonb); + """ + self.query_database(leader.metadata.name, "foo", create_stream_table) + + # update the manifest with the streams section + patch_streaming_config = { + "spec": { + "patroni": { + "slots": { + "manual_slot": { + "type": "physical" } } }, - { - "applicationId": "test-app2", - "batchSize": 100, - "database": "foo", - "enableRecovery": True, - "tables": { - "test_non_exist_table": { - "eventType": "test-event", - "idColumn": "id", - "payloadColumn": "payload", - "recoveryEventType": "test-event-dlq" + "streams": [ + { + "applicationId": "test-app", + "batchSize": 100, + "database": "foo", + "enableRecovery": True, + "tables": { + "test_table": { + "eventType": "test-event", + "idColumn": "id", + "payloadColumn": "payload", + "recoveryEventType": "test-event-dlq" + } + } + }, + { + "applicationId": "test-app2", + "batchSize": 100, + "database": "foo", + "enableRecovery": True, + "tables": { + "test_non_exist_table": { + "eventType": "test-event", + "idColumn": "id", + "payloadColumn": "payload", + "recoveryEventType": "test-event-dlq" + } } } - } - ] + ] + } } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - # check if publication, slot, and fes resource are created - get_publication_query = """ - SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app'; - """ - get_slot_query = """ - SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1, - "Publication is not created", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1, - "Replication slot is not created", 10, 5) - self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + # check if publication, slot, and fes resource are created + get_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app'; + """ + get_slot_query = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1, + "Publication is not created", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1, + "Replication slot is not created", 10, 5) + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, "Could not find Fabric Event Stream resource", 10, 5) - # check if the non-existing table in the stream section does not create a publication and slot - get_publication_query_not_exist_table = """ - SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2'; - """ - get_slot_query_not_exist_table = """ - SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0, - "Publication is created for non-existing tables", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0, - "Replication slot is created for non-existing tables", 10, 5) + # check if the non-existing table in the stream section does not create a publication and slot + get_publication_query_not_exist_table = """ + SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2'; + """ + get_slot_query_not_exist_table = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0, + "Publication is created for non-existing tables", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0, + "Replication slot is created for non-existing tables", 10, 5) - # grant create and ownership of test_table to foo_user, reset search path to default - grant_permission_foo_user = """ - GRANT CREATE ON DATABASE foo TO foo_user; - ALTER TABLE test_table OWNER TO foo_user; - ALTER ROLE foo_user RESET search_path; - """ - self.query_database(leader.metadata.name, "foo", grant_permission_foo_user) - # non-postgres user creates a publication - create_nonstream_publication = """ - CREATE PUBLICATION mypublication FOR TABLE test_table; - """ - self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user") + # grant create and ownership of test_table to foo_user, reset search path to default + grant_permission_foo_user = """ + GRANT CREATE ON DATABASE foo TO foo_user; + ALTER TABLE test_table OWNER TO foo_user; + ALTER ROLE foo_user RESET search_path; + """ + self.query_database(leader.metadata.name, "foo", grant_permission_foo_user) + # non-postgres user creates a publication + create_nonstream_publication = """ + CREATE PUBLICATION mypublication FOR TABLE test_table; + """ + self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user") - # remove the streams section from the manifest - patch_streaming_config_removal = { - "spec": { - "streams": [] + # remove the streams section from the manifest + patch_streaming_config_removal = { + "spec": { + "streams": [] + } } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - # check if publication, slot, and fes resource are removed - self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + # check if publication, slot, and fes resource are removed + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0, 'Could not delete Fabric Event Stream resource', 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0, - "Publication is not deleted", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, - "Replication slot is not deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0, + "Publication is not deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, + "Replication slot is not deleted", 10, 5) - # check the manual_slot and mypublication should not get deleted - get_manual_slot_query = """ - SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; - """ - get_nonstream_publication_query = """ - SELECT * FROM pg_publication WHERE pubname = 'mypublication'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, - "Slot defined in patroni config is deleted", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, - "Publication defined not in stream section is deleted", 10, 5) + # check the manual_slot and mypublication should not get deleted + get_manual_slot_query = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; + """ + get_nonstream_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'mypublication'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, + "Slot defined in patroni config is deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, + "Publication defined not in stream section is deleted", 10, 5) + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_taint_based_eviction(self): diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index dcdd86a1c..9a31edc28 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -433,34 +433,55 @@ func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1. } func (c *Cluster) syncStream(appId string) error { + var ( + streams *zalandov1.FabricEventStreamList + err error + ) + c.setProcessName("syncing stream with applicationId %s", appId) + c.logger.Debugf("syncing stream with applicationId %s", appId) + + listOptions := metav1.ListOptions{LabelSelector: c.labelsSet(true).String()} + streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions) + if err != nil { + return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err) + } + streamExists := false - // update stream when it exists and EventStreams array differs - for _, stream := range c.Streams { - if appId == stream.Spec.ApplicationId { - streamExists = true - desiredStreams := c.generateFabricEventStream(appId) - if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { - c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) - stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences - c.setProcessName("updating event streams with applicationId %s", appId) - stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), stream, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) - } - c.Streams[appId] = stream - } - if match, reason := c.compareStreams(stream, desiredStreams); !match { - c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason) - desiredStreams.ObjectMeta = stream.ObjectMeta - updatedStream, err := c.updateStreams(desiredStreams) - if err != nil { - return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err) - } - c.Streams[appId] = updatedStream - c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId) + for _, stream := range streams.Items { + if stream.Spec.ApplicationId != appId { + continue + } + if streamExists { + c.logger.Warningf("more than one event stream with applicationId %s found, delete it", appId) + if err = c.KubeClient.FabricEventStreams(stream.ObjectMeta.Namespace).Delete(context.TODO(), stream.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil { + c.logger.Errorf("could not delete event stream %q with applicationId %s: %v", stream.ObjectMeta.Name, appId, err) + } else { + c.logger.Infof("redundant event stream %q with applicationId %s has been successfully deleted", stream.ObjectMeta.Name, appId) } continue } + streamExists = true + desiredStreams := c.generateFabricEventStream(appId) + if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { + c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) + stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences + c.setProcessName("updating event streams with applicationId %s", appId) + stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) + } + c.Streams[appId] = stream + } + if match, reason := c.compareStreams(&stream, desiredStreams); !match { + c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason) + desiredStreams.ObjectMeta = stream.ObjectMeta + updatedStream, err := c.updateStreams(desiredStreams) + if err != nil { + return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err) + } + c.Streams[appId] = updatedStream + c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId) + } } if !streamExists { diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 0a0bd3555..6091210b5 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -2,6 +2,7 @@ package cluster import ( "fmt" + "reflect" "strings" "context" @@ -87,6 +88,11 @@ var ( ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-12345", clusterName), Namespace: namespace, + Labels: map[string]string{ + "application": "spilo", + "cluster-name": fmt.Sprintf("%s-2", clusterName), + "team": "acid", + }, OwnerReferences: []metav1.OwnerReference{ metav1.OwnerReference{ APIVersion: "apps/v1", @@ -432,12 +438,8 @@ func TestGenerateFabricEventStream(t *testing.T) { cluster.Name = clusterName cluster.Namespace = namespace - // create statefulset to have ownerReference for streams - _, err := cluster.createStatefulSet() - assert.NoError(t, err) - // create the streams - err = cluster.syncStream(appId) + err := cluster.syncStream(appId) assert.NoError(t, err) // compare generated stream with expected stream @@ -451,11 +453,7 @@ func TestGenerateFabricEventStream(t *testing.T) { } streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) - - // check if there is only one stream - if len(streams.Items) > 1 { - t.Errorf("too many stream CRDs found: got %d, but expected only one", len(streams.Items)) - } + assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only one", len(streams.Items)) // compare stream returned from API with expected stream if match, _ := cluster.compareStreams(&streams.Items[0], fes); !match { @@ -468,11 +466,7 @@ func TestGenerateFabricEventStream(t *testing.T) { streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) - - // check if there is still only one stream - if len(streams.Items) > 1 { - t.Errorf("too many stream CRDs found after sync: got %d, but expected only one", len(streams.Items)) - } + assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only one", len(streams.Items)) // compare stream resturned from API with generated stream if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { @@ -493,6 +487,62 @@ func newFabricEventStream(streams []zalandov1.EventStream, annotations map[strin } } +func TestSyncStreams(t *testing.T) { + pg.Name = fmt.Sprintf("%s-2", pg.Name) + var cluster = New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + EnableOwnerReferences: util.True(), + PodRoleLabel: "spilo-role", + }, + }, + }, client, pg, logger, eventRecorder) + + _, err := cluster.KubeClient.Postgresqls(namespace).Create( + context.TODO(), &pg, metav1.CreateOptions{}) + assert.NoError(t, err) + + // create the stream + err = cluster.syncStream(appId) + assert.NoError(t, err) + + // create a second stream with same spec but with different name + createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create( + context.TODO(), fes, metav1.CreateOptions{}) + assert.NoError(t, err) + assert.Equal(t, createdStream.Spec.ApplicationId, appId) + + // check that two streams exist + listOptions := metav1.ListOptions{ + LabelSelector: cluster.labelsSet(true).String(), + } + streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + assert.NoError(t, err) + assert.Equalf(t, 2, len(streams.Items), "unexpected number of streams found: got %d, but expected only 2", len(streams.Items)) + + // sync the stream which should remove the redundant stream + err = cluster.syncStream(appId) + assert.NoError(t, err) + + // check that only one stream remains after sync + streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + assert.NoError(t, err) + assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items)) + + // check owner references + if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) { + t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences) + } +} + func TestSameStreams(t *testing.T) { testName := "TestSameStreams" annotationsA := map[string]string{"owned-by": "acid"} @@ -606,8 +656,8 @@ func TestSameStreams(t *testing.T) { } } -func TestUpdateFabricEventStream(t *testing.T) { - pg.Name = fmt.Sprintf("%s-2", pg.Name) +func TestUpdateStreams(t *testing.T) { + pg.Name = fmt.Sprintf("%s-3", pg.Name) var cluster = New( Config{ OpConfig: config.Config{ @@ -628,11 +678,7 @@ func TestUpdateFabricEventStream(t *testing.T) { context.TODO(), &pg, metav1.CreateOptions{}) assert.NoError(t, err) - // create statefulset to have ownerReference for streams - _, err = cluster.createStatefulSet() - assert.NoError(t, err) - - // now create the stream + // create the stream err = cluster.syncStream(appId) assert.NoError(t, err)