switch to pgoutput plugin and let operator create publications

This commit is contained in:
Felix Kunde 2022-01-25 17:18:49 +01:00
parent e499567152
commit 179fcb1126
5 changed files with 79 additions and 21 deletions

View File

@ -546,11 +546,13 @@ have the following properties:
and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/). and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
The application is responsible for putting events into a (JSON/B or VARCHAR) The application is responsible for putting events into a (JSON/B or VARCHAR)
payload column of the outbox table in the structure of the specified target payload column of the outbox table in the structure of the specified target
event type. The the CDC operator will consume them shortly after the event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/14/logical-replication-publication.html)
transaction is committed. The `idColumn` will be used in telemetry for the in Postgres for all tables specified for one `database` and `applicationId`.
CDC operator. The names for `idColumn` and `payloadColumn` can be configured. The CDC operator will consume from it shortly after transactions are
Defaults are `id` and `payload`. The target `eventType` has to be defined. committed to the outbox table. The `idColumn` will be used in telemetry for
Required. the CDC operator. The names for `idColumn` and `payloadColumn` can be
configured. Defaults are `id` and `payload`. The target `eventType` has to
be defined. Required.
* **filter** * **filter**
Streamed events can be filtered by a jsonpath expression for each table. Streamed events can be filtered by a jsonpath expression for each table.

View File

@ -70,6 +70,7 @@ type EventStreamTable struct {
type Connection struct { type Connection struct {
Url string `json:"jdbcUrl"` Url string `json:"jdbcUrl"`
SlotName string `json:"slotName"` SlotName string `json:"slotName"`
PublicationName string `json:"publicationName"`
DBAuth DBAuth `json:"databaseAuthentication"` DBAuth DBAuth `json:"databaseAuthentication"`
} }

View File

@ -39,6 +39,8 @@ const (
createExtensionSQL = `CREATE EXTENSION IF NOT EXISTS "%s" SCHEMA "%s"` createExtensionSQL = `CREATE EXTENSION IF NOT EXISTS "%s" SCHEMA "%s"`
alterExtensionSQL = `ALTER EXTENSION "%s" SET SCHEMA "%s"` alterExtensionSQL = `ALTER EXTENSION "%s" SET SCHEMA "%s"`
createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');`
globalDefaultPrivilegesSQL = `SET ROLE TO "%s"; globalDefaultPrivilegesSQL = `SET ROLE TO "%s";
ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s"; ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s";
ALTER DEFAULT PRIVILEGES GRANT SELECT ON TABLES TO "%s"; ALTER DEFAULT PRIVILEGES GRANT SELECT ON TABLES TO "%s";
@ -610,3 +612,23 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
return nil return nil
} }
// getExtension returns the list of current database extensions
// The caller is responsible for opening and closing the database connection
func (c *Cluster) createPublication(dbName, publication, tables string) (err error) {
if err := c.initDbConnWithName(dbName); err != nil {
return fmt.Errorf("could not init connection to database %q", dbName)
}
defer func() {
if err = c.closeDbConn(); err != nil {
err = fmt.Errorf("could not close connection to database %q: %v", dbName, err)
}
}()
if _, err := c.pgDb.Exec(fmt.Sprintf(createPublicationSQL, publication, tables)); err != nil {
return fmt.Errorf("could not create publication %s: %v", publication, err)
}
return err
}

View File

@ -76,6 +76,7 @@ func gatherApplicationIds(streams []acidv1.Stream) []string {
func (c *Cluster) syncPostgresConfig() error { func (c *Cluster) syncPostgresConfig() error {
slots := make(map[string]map[string]string) slots := make(map[string]map[string]string)
publications := make(map[string]map[string]acidv1.StreamTable)
desiredPatroniConfig := c.Spec.Patroni desiredPatroniConfig := c.Spec.Patroni
if len(desiredPatroniConfig.Slots) > 0 { if len(desiredPatroniConfig.Slots) > 0 {
slots = desiredPatroniConfig.Slots slots = desiredPatroniConfig.Slots
@ -87,9 +88,18 @@ func (c *Cluster) syncPostgresConfig() error {
"plugin": "pgoutput", "plugin": "pgoutput",
"type": "logical", "type": "logical",
} }
slotName := constants.EventStreamSourceSlotPrefix + "_" + stream.Database + "_" + stream.ApplicationId slotName := getSlotName(stream.Database, stream.ApplicationId)
if _, exists := slots[slotName]; !exists { if _, exists := slots[slotName]; !exists {
slots[slotName] = slot slots[slotName] = slot
publications[slotName] = stream.Tables
} else {
streamTables := publications[slotName]
for tableName, table := range stream.Tables {
if _, exists := streamTables[tableName]; !exists {
streamTables[tableName] = table
}
}
publications[slotName] = streamTables
} }
} }
@ -108,7 +118,7 @@ func (c *Cluster) syncPostgresConfig() error {
pods, err := c.listPods() pods, err := c.listPods()
if err != nil || len(pods) == 0 { if err != nil || len(pods) == 0 {
c.logger.Warnf("could not list pods of the statefulset: %v", err) c.logger.Warningf("could not list pods of the statefulset: %v", err)
} }
for i, pod := range pods { for i, pod := range pods {
podName := util.NameFromMeta(pods[i].ObjectMeta) podName := util.NameFromMeta(pods[i].ObjectMeta)
@ -125,6 +135,22 @@ func (c *Cluster) syncPostgresConfig() error {
} }
} }
// next create publications to each created slot
for publication, tables := range publications {
dbName := slots[publication]["database"]
tableNames := make([]string, len(tables))
i := 0
for t := range tables {
tableNames[i] = fmt.Sprintf("%q", t)
i++
}
tableList := strings.Join(tableNames, ", ")
c.logger.Debugf("creating publication %q in database %q for tables %s", publication, dbName, tableList)
if err := c.createPublication(dbName, publication, tableList); err != nil {
c.logger.Warningf("%v", err)
}
}
return nil return nil
} }
@ -214,10 +240,15 @@ func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable {
} }
} }
func getSlotName(dbName, appId string) string {
return constants.EventStreamSourceSlotPrefix + "_" + dbName + "_" + strings.Replace(appId, "-", "_", -1)
}
func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Connection { func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Connection {
return zalandov1.Connection{ return zalandov1.Connection{
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user),
SlotName: constants.EventStreamSourceSlotPrefix + "_" + database + "_" + strings.Replace(appId, "-", "_", -1), SlotName: getSlotName(database, appId),
PublicationName: getSlotName(database, appId),
DBAuth: zalandov1.DBAuth{ DBAuth: zalandov1.DBAuth{
Type: constants.EventStreamSourceAuthType, Type: constants.EventStreamSourceAuthType,
Name: c.credentialSecretNameForCluster(user, c.Name), Name: c.credentialSecretNameForCluster(user, c.Name),
@ -229,12 +260,19 @@ func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Co
func (c *Cluster) syncStreams() error { func (c *Cluster) syncStreams() error {
c.setProcessName("syncing streams")
_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{}) _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{})
if k8sutil.ResourceNotFound(err) { if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("event stream CRD not installed, skipping") c.logger.Debugf("event stream CRD not installed, skipping")
return nil return nil
} }
err = c.syncPostgresConfig()
if err != nil {
return fmt.Errorf("could not update Postgres config for event streaming: %v", err)
}
err = c.createOrUpdateStreams() err = c.createOrUpdateStreams()
if err != nil { if err != nil {
return err return err
@ -245,13 +283,6 @@ func (c *Cluster) syncStreams() error {
func (c *Cluster) createOrUpdateStreams() error { func (c *Cluster) createOrUpdateStreams() error {
c.setProcessName("syncing streams")
err := c.syncPostgresConfig()
if err != nil {
return fmt.Errorf("could not update Postgres config for event streaming: %v", err)
}
appIds := gatherApplicationIds(c.Spec.Streams) appIds := gatherApplicationIds(c.Spec.Streams)
for _, appId := range appIds { for _, appId := range appIds {
fesName := c.Name + "-" + appId fesName := c.Name + "-" + appId

View File

@ -42,6 +42,7 @@ var (
dbName string = "foo" dbName string = "foo"
fesUser string = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix fesUser string = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix
fesName string = clusterName + "-" + appId fesName string = clusterName + "-" + appId
slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1))
pg = acidv1.Postgresql{ pg = acidv1.Postgresql{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
@ -119,7 +120,8 @@ var (
UserKey: "username", UserKey: "username",
}, },
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser),
SlotName: fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)), SlotName: slotName,
PublicationName: slotName,
}, },
Schema: "data", Schema: "data",
EventStreamTable: v1.EventStreamTable{ EventStreamTable: v1.EventStreamTable{