From 94d36327ba403949353be6456e7df41e2a72d24d Mon Sep 17 00:00:00 2001 From: Ida Novindasari Date: Fri, 2 Aug 2024 15:09:37 +0200 Subject: [PATCH] stream: slot and FES should not be created if the publication creation fails (#2704) * slot should not be created if the publication creation fails * not create FES resource when slot doesn't exist --- e2e/tests/test_e2e.py | 26 ++++++++++++++++++++++++++ pkg/cluster/streams.go | 29 ++++++++++++++++++++--------- pkg/cluster/streams_test.go | 16 ++++++++++------ 3 files changed, 56 insertions(+), 15 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 5182851b4..d29fd3d5c 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2041,6 +2041,20 @@ class EndToEndTestCase(unittest.TestCase): "recoveryEventType": "test-event-dlq" } } + }, + { + "applicationId": "test-app2", + "batchSize": 100, + "database": "foo", + "enableRecovery": True, + "tables": { + "test_non_exist_table": { + "eventType": "test-event", + "idColumn": "id", + "payloadColumn": "payload", + "recoveryEventType": "test-event-dlq" + } + } } ] } @@ -2064,6 +2078,18 @@ class EndToEndTestCase(unittest.TestCase): "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, "Could not find Fabric Event Stream resource", 10, 5) + # check if the non-existing table in the stream section does not create a publication and slot + get_publication_query_not_exist_table = """ + SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2'; + """ + get_slot_query_not_exist_table = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0, + "Publication is created for non-existing tables", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0, + "Replication slot is created for non-existing tables", 10, 5) + # grant create and ownership of test_table to foo_user, reset search path to default grant_permission_foo_user = """ GRANT CREATE ON DATABASE foo TO foo_user; diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index c76523f4a..9f58c7184 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -134,7 +134,6 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za } else if currentTables != tableList { alterPublications[slotName] = tableList } - (*slotsToSync)[slotName] = slotAndPublication.Slot } // check if there is any deletion @@ -148,24 +147,30 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za return nil } + var errorMessage error = nil for publicationName, tables := range createPublications { if err = c.executeCreatePublication(publicationName, tables); err != nil { - return fmt.Errorf("creation of publication %q failed: %v", publicationName, err) + errorMessage = fmt.Errorf("creation of publication %q failed: %v", publicationName, err) + continue } + (*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot } for publicationName, tables := range alterPublications { if err = c.executeAlterPublication(publicationName, tables); err != nil { - return fmt.Errorf("update of publication %q failed: %v", publicationName, err) + errorMessage = fmt.Errorf("update of publication %q failed: %v", publicationName, err) + continue } + (*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot } for _, publicationName := range deletePublications { - (*slotsToSync)[publicationName] = nil if err = c.executeDropPublication(publicationName); err != nil { - return fmt.Errorf("deletion of publication %q failed: %v", publicationName, err) + errorMessage = fmt.Errorf("deletion of publication %q failed: %v", publicationName, err) + continue } + (*slotsToSync)[publicationName] = nil } - return nil + return errorMessage } func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream { @@ -390,7 +395,7 @@ func (c *Cluster) syncStreams() error { } // finally sync stream CRDs - err = c.createOrUpdateStreams() + err = c.createOrUpdateStreams(slotsToSync) if err != nil { return err } @@ -398,7 +403,7 @@ func (c *Cluster) syncStreams() error { return nil } -func (c *Cluster) createOrUpdateStreams() error { +func (c *Cluster) createOrUpdateStreams(createdSlots map[string]map[string]string) error { // fetch different application IDs from streams section // there will be a separate event stream resource for each ID @@ -413,7 +418,7 @@ func (c *Cluster) createOrUpdateStreams() error { return fmt.Errorf("could not list of FabricEventStreams: %v", err) } - for _, appId := range appIds { + for idx, appId := range appIds { streamExists := false // update stream when it exists and EventStreams array differs @@ -435,6 +440,12 @@ func (c *Cluster) createOrUpdateStreams() error { } if !streamExists { + // check if there is any slot with the applicationId + slotName := getSlotName(c.Spec.Streams[idx].Database, appId) + if _, exists := createdSlots[slotName]; !exists { + c.logger.Warningf("no slot %s with applicationId %s exists, skipping event stream creation", slotName, appId) + continue + } c.logger.Infof("event streams with applicationId %s do not exist, create it", appId) streamCRD, err := c.createStreams(appId) if err != nil { diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 5045a66fe..58d337f25 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -41,6 +41,10 @@ var ( fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix) slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) + fakeCreatedSlots map[string]map[string]string = map[string]map[string]string{ + slotName: {}, + } + pg = acidv1.Postgresql{ TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", @@ -222,7 +226,7 @@ func TestGenerateFabricEventStream(t *testing.T) { assert.NoError(t, err) // create the streams - err = cluster.createOrUpdateStreams() + err = cluster.createOrUpdateStreams(fakeCreatedSlots) assert.NoError(t, err) // compare generated stream with expected stream @@ -248,7 +252,7 @@ func TestGenerateFabricEventStream(t *testing.T) { } // sync streams once again - err = cluster.createOrUpdateStreams() + err = cluster.createOrUpdateStreams(fakeCreatedSlots) assert.NoError(t, err) streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) @@ -397,7 +401,7 @@ func TestUpdateFabricEventStream(t *testing.T) { assert.NoError(t, err) // now create the stream - err = cluster.createOrUpdateStreams() + err = cluster.createOrUpdateStreams(fakeCreatedSlots) assert.NoError(t, err) // change specs of streams and patch CRD @@ -419,7 +423,7 @@ func TestUpdateFabricEventStream(t *testing.T) { assert.NoError(t, err) cluster.Postgresql.Spec = pgPatched.Spec - err = cluster.createOrUpdateStreams() + err = cluster.createOrUpdateStreams(fakeCreatedSlots) assert.NoError(t, err) // compare stream returned from API with expected stream @@ -448,7 +452,7 @@ func TestUpdateFabricEventStream(t *testing.T) { assert.NoError(t, err) cluster.Postgresql.Spec = pgPatched.Spec - err = cluster.createOrUpdateStreams() + err = cluster.createOrUpdateStreams(fakeCreatedSlots) assert.NoError(t, err) result = cluster.generateFabricEventStream(appId) @@ -466,7 +470,7 @@ func TestUpdateFabricEventStream(t *testing.T) { assert.NoError(t, err) cluster.Postgresql.Spec = pgUpdated.Spec - cluster.createOrUpdateStreams() + cluster.createOrUpdateStreams(fakeCreatedSlots) streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) if len(streamList.Items) > 0 || err != nil {