stream: slot and FES should not be created if the publication creation fails (#2704)

* slot should not be created if the publication creation fails
* not create FES resource when slot doesn't exist
This commit is contained in:
Ida Novindasari 2024-08-02 15:09:37 +02:00 committed by GitHub
parent 31f474a95c
commit 94d36327ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 56 additions and 15 deletions

View File

@ -2041,6 +2041,20 @@ class EndToEndTestCase(unittest.TestCase):
"recoveryEventType": "test-event-dlq" "recoveryEventType": "test-event-dlq"
} }
} }
},
{
"applicationId": "test-app2",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"test_non_exist_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
}
}
} }
] ]
} }
@ -2064,6 +2078,18 @@ class EndToEndTestCase(unittest.TestCase):
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1,
"Could not find Fabric Event Stream resource", 10, 5) "Could not find Fabric Event Stream resource", 10, 5)
# check if the non-existing table in the stream section does not create a publication and slot
get_publication_query_not_exist_table = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2';
"""
get_slot_query_not_exist_table = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0,
"Publication is created for non-existing tables", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0,
"Replication slot is created for non-existing tables", 10, 5)
# grant create and ownership of test_table to foo_user, reset search path to default # grant create and ownership of test_table to foo_user, reset search path to default
grant_permission_foo_user = """ grant_permission_foo_user = """
GRANT CREATE ON DATABASE foo TO foo_user; GRANT CREATE ON DATABASE foo TO foo_user;

View File

@ -134,7 +134,6 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
} else if currentTables != tableList { } else if currentTables != tableList {
alterPublications[slotName] = tableList alterPublications[slotName] = tableList
} }
(*slotsToSync)[slotName] = slotAndPublication.Slot
} }
// check if there is any deletion // check if there is any deletion
@ -148,24 +147,30 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
return nil return nil
} }
var errorMessage error = nil
for publicationName, tables := range createPublications { for publicationName, tables := range createPublications {
if err = c.executeCreatePublication(publicationName, tables); err != nil { if err = c.executeCreatePublication(publicationName, tables); err != nil {
return fmt.Errorf("creation of publication %q failed: %v", publicationName, err) errorMessage = fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
continue
} }
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
} }
for publicationName, tables := range alterPublications { for publicationName, tables := range alterPublications {
if err = c.executeAlterPublication(publicationName, tables); err != nil { if err = c.executeAlterPublication(publicationName, tables); err != nil {
return fmt.Errorf("update of publication %q failed: %v", publicationName, err) errorMessage = fmt.Errorf("update of publication %q failed: %v", publicationName, err)
continue
} }
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
} }
for _, publicationName := range deletePublications { for _, publicationName := range deletePublications {
(*slotsToSync)[publicationName] = nil
if err = c.executeDropPublication(publicationName); err != nil { if err = c.executeDropPublication(publicationName); err != nil {
return fmt.Errorf("deletion of publication %q failed: %v", publicationName, err) errorMessage = fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
continue
} }
(*slotsToSync)[publicationName] = nil
} }
return nil return errorMessage
} }
func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream { func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
@ -390,7 +395,7 @@ func (c *Cluster) syncStreams() error {
} }
// finally sync stream CRDs // finally sync stream CRDs
err = c.createOrUpdateStreams() err = c.createOrUpdateStreams(slotsToSync)
if err != nil { if err != nil {
return err return err
} }
@ -398,7 +403,7 @@ func (c *Cluster) syncStreams() error {
return nil return nil
} }
func (c *Cluster) createOrUpdateStreams() error { func (c *Cluster) createOrUpdateStreams(createdSlots map[string]map[string]string) error {
// fetch different application IDs from streams section // fetch different application IDs from streams section
// there will be a separate event stream resource for each ID // there will be a separate event stream resource for each ID
@ -413,7 +418,7 @@ func (c *Cluster) createOrUpdateStreams() error {
return fmt.Errorf("could not list of FabricEventStreams: %v", err) return fmt.Errorf("could not list of FabricEventStreams: %v", err)
} }
for _, appId := range appIds { for idx, appId := range appIds {
streamExists := false streamExists := false
// update stream when it exists and EventStreams array differs // update stream when it exists and EventStreams array differs
@ -435,6 +440,12 @@ func (c *Cluster) createOrUpdateStreams() error {
} }
if !streamExists { if !streamExists {
// check if there is any slot with the applicationId
slotName := getSlotName(c.Spec.Streams[idx].Database, appId)
if _, exists := createdSlots[slotName]; !exists {
c.logger.Warningf("no slot %s with applicationId %s exists, skipping event stream creation", slotName, appId)
continue
}
c.logger.Infof("event streams with applicationId %s do not exist, create it", appId) c.logger.Infof("event streams with applicationId %s do not exist, create it", appId)
streamCRD, err := c.createStreams(appId) streamCRD, err := c.createStreams(appId)
if err != nil { if err != nil {

View File

@ -41,6 +41,10 @@ var (
fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix) fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix)
slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1))
fakeCreatedSlots map[string]map[string]string = map[string]map[string]string{
slotName: {},
}
pg = acidv1.Postgresql{ pg = acidv1.Postgresql{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "Postgresql", Kind: "Postgresql",
@ -222,7 +226,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
// create the streams // create the streams
err = cluster.createOrUpdateStreams() err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err) assert.NoError(t, err)
// compare generated stream with expected stream // compare generated stream with expected stream
@ -248,7 +252,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
} }
// sync streams once again // sync streams once again
err = cluster.createOrUpdateStreams() err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err) assert.NoError(t, err)
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
@ -397,7 +401,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
// now create the stream // now create the stream
err = cluster.createOrUpdateStreams() err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err) assert.NoError(t, err)
// change specs of streams and patch CRD // change specs of streams and patch CRD
@ -419,7 +423,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
cluster.Postgresql.Spec = pgPatched.Spec cluster.Postgresql.Spec = pgPatched.Spec
err = cluster.createOrUpdateStreams() err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err) assert.NoError(t, err)
// compare stream returned from API with expected stream // compare stream returned from API with expected stream
@ -448,7 +452,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
cluster.Postgresql.Spec = pgPatched.Spec cluster.Postgresql.Spec = pgPatched.Spec
err = cluster.createOrUpdateStreams() err = cluster.createOrUpdateStreams(fakeCreatedSlots)
assert.NoError(t, err) assert.NoError(t, err)
result = cluster.generateFabricEventStream(appId) result = cluster.generateFabricEventStream(appId)
@ -466,7 +470,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
cluster.Postgresql.Spec = pgUpdated.Spec cluster.Postgresql.Spec = pgUpdated.Spec
cluster.createOrUpdateStreams() cluster.createOrUpdateStreams(fakeCreatedSlots)
streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
if len(streamList.Items) > 0 || err != nil { if len(streamList.Items) > 0 || err != nil {