diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 07b03ec9f..398480f0a 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -546,11 +546,13 @@ have the following properties: and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/). The application is responsible for putting events into a (JSON/B or VARCHAR) payload column of the outbox table in the structure of the specified target - event type. The the CDC operator will consume them shortly after the - transaction is committed. The `idColumn` will be used in telemetry for the - CDC operator. The names for `idColumn` and `payloadColumn` can be configured. - Defaults are `id` and `payload`. The target `eventType` has to be defined. - Required. + event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/14/logical-replication-publication.html) + in Postgres for all tables specified for one `database` and `applicationId`. + The CDC operator will consume from it shortly after transactions are + committed to the outbox table. The `idColumn` will be used in telemetry for + the CDC operator. The names for `idColumn` and `payloadColumn` can be + configured. Defaults are `id` and `payload`. The target `eventType` has to + be defined. Required. * **filter** Streamed events can be filtered by a jsonpath expression for each table. diff --git a/pkg/apis/zalando.org/v1/fabriceventstream.go b/pkg/apis/zalando.org/v1/fabriceventstream.go index 7990d7700..81706802d 100644 --- a/pkg/apis/zalando.org/v1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1/fabriceventstream.go @@ -68,9 +68,10 @@ type EventStreamTable struct { // Connection to be used for allowing the FES operator to connect to a database type Connection struct { - Url string `json:"jdbcUrl"` - SlotName string `json:"slotName"` - DBAuth DBAuth `json:"databaseAuthentication"` + Url string `json:"jdbcUrl"` + SlotName string `json:"slotName"` + PublicationName string `json:"publicationName"` + DBAuth DBAuth `json:"databaseAuthentication"` } // DBAuth specifies the credentials to be used for connecting with the database diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index aa3a5e3be..ff08c6c24 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -39,6 +39,8 @@ const ( createExtensionSQL = `CREATE EXTENSION IF NOT EXISTS "%s" SCHEMA "%s"` alterExtensionSQL = `ALTER EXTENSION "%s" SET SCHEMA "%s"` + createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');` + globalDefaultPrivilegesSQL = `SET ROLE TO "%s"; ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s"; ALTER DEFAULT PRIVILEGES GRANT SELECT ON TABLES TO "%s"; @@ -610,3 +612,23 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { return nil } + +// getExtension returns the list of current database extensions +// The caller is responsible for opening and closing the database connection +func (c *Cluster) createPublication(dbName, publication, tables string) (err error) { + + if err := c.initDbConnWithName(dbName); err != nil { + return fmt.Errorf("could not init connection to database %q", dbName) + } + defer func() { + if err = c.closeDbConn(); err != nil { + err = fmt.Errorf("could not close connection to database %q: %v", dbName, err) + } + }() + + if _, err := c.pgDb.Exec(fmt.Sprintf(createPublicationSQL, publication, tables)); err != nil { + return fmt.Errorf("could not create publication %s: %v", publication, err) + } + + return err +} diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 7053aeb20..dd126df0f 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -76,6 +76,7 @@ func gatherApplicationIds(streams []acidv1.Stream) []string { func (c *Cluster) syncPostgresConfig() error { slots := make(map[string]map[string]string) + publications := make(map[string]map[string]acidv1.StreamTable) desiredPatroniConfig := c.Spec.Patroni if len(desiredPatroniConfig.Slots) > 0 { slots = desiredPatroniConfig.Slots @@ -87,9 +88,18 @@ func (c *Cluster) syncPostgresConfig() error { "plugin": "pgoutput", "type": "logical", } - slotName := constants.EventStreamSourceSlotPrefix + "_" + stream.Database + "_" + stream.ApplicationId + slotName := getSlotName(stream.Database, stream.ApplicationId) if _, exists := slots[slotName]; !exists { slots[slotName] = slot + publications[slotName] = stream.Tables + } else { + streamTables := publications[slotName] + for tableName, table := range stream.Tables { + if _, exists := streamTables[tableName]; !exists { + streamTables[tableName] = table + } + } + publications[slotName] = streamTables } } @@ -108,7 +118,7 @@ func (c *Cluster) syncPostgresConfig() error { pods, err := c.listPods() if err != nil || len(pods) == 0 { - c.logger.Warnf("could not list pods of the statefulset: %v", err) + c.logger.Warningf("could not list pods of the statefulset: %v", err) } for i, pod := range pods { podName := util.NameFromMeta(pods[i].ObjectMeta) @@ -125,6 +135,22 @@ func (c *Cluster) syncPostgresConfig() error { } } + // next create publications to each created slot + for publication, tables := range publications { + dbName := slots[publication]["database"] + tableNames := make([]string, len(tables)) + i := 0 + for t := range tables { + tableNames[i] = fmt.Sprintf("%q", t) + i++ + } + tableList := strings.Join(tableNames, ", ") + c.logger.Debugf("creating publication %q in database %q for tables %s", publication, dbName, tableList) + if err := c.createPublication(dbName, publication, tableList); err != nil { + c.logger.Warningf("%v", err) + } + } + return nil } @@ -214,10 +240,15 @@ func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable { } } +func getSlotName(dbName, appId string) string { + return constants.EventStreamSourceSlotPrefix + "_" + dbName + "_" + strings.Replace(appId, "-", "_", -1) +} + func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Connection { return zalandov1.Connection{ - Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), - SlotName: constants.EventStreamSourceSlotPrefix + "_" + database + "_" + strings.Replace(appId, "-", "_", -1), + Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), + SlotName: getSlotName(database, appId), + PublicationName: getSlotName(database, appId), DBAuth: zalandov1.DBAuth{ Type: constants.EventStreamSourceAuthType, Name: c.credentialSecretNameForCluster(user, c.Name), @@ -229,12 +260,19 @@ func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Co func (c *Cluster) syncStreams() error { + c.setProcessName("syncing streams") + _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{}) if k8sutil.ResourceNotFound(err) { c.logger.Debugf("event stream CRD not installed, skipping") return nil } + err = c.syncPostgresConfig() + if err != nil { + return fmt.Errorf("could not update Postgres config for event streaming: %v", err) + } + err = c.createOrUpdateStreams() if err != nil { return err @@ -245,13 +283,6 @@ func (c *Cluster) syncStreams() error { func (c *Cluster) createOrUpdateStreams() error { - c.setProcessName("syncing streams") - - err := c.syncPostgresConfig() - if err != nil { - return fmt.Errorf("could not update Postgres config for event streaming: %v", err) - } - appIds := gatherApplicationIds(c.Spec.Streams) for _, appId := range appIds { fesName := c.Name + "-" + appId diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 27d3e91f3..f1c615e04 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -42,6 +42,7 @@ var ( dbName string = "foo" fesUser string = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix fesName string = clusterName + "-" + appId + slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) pg = acidv1.Postgresql{ TypeMeta: metav1.TypeMeta{ @@ -118,8 +119,9 @@ var ( Type: constants.EventStreamSourceAuthType, UserKey: "username", }, - Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), - SlotName: fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)), + Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), + SlotName: slotName, + PublicationName: slotName, }, Schema: "data", EventStreamTable: v1.EventStreamTable{