From 9007475ed83beaa30e11c6f383da0547a2986290 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 21 Feb 2022 17:01:58 +0100 Subject: [PATCH] reflect code review and additional refactoring --- docs/reference/cluster_manifest.md | 17 +- pkg/cluster/streams.go | 252 +++++++++++++++-------------- pkg/cluster/streams_test.go | 4 +- pkg/util/constants/streams.go | 5 +- 4 files changed, 150 insertions(+), 128 deletions(-) diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 0e1cad1af..fe1f6dfd7 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -536,11 +536,18 @@ Those parameters are grouped under the `tls` top-level key. ## Change data capture streams -This sections enables change data capture (CDC) streams e.g. into Zalando’s -distributed event broker [Nakadi](https://nakadi.io/). Parameters grouped -under the `streams` top-level key will be used by the operator to create -custom resources for Zalando's internal CDC operator. Each stream object can -have the following properties: +This sections enables change data capture (CDC) streams via Postgres' +[logical decoding](https://www.postgresql.org/docs/14/logicaldecoding.html) +feature and `pgoutput` plugin. While the Postgres operator takes responsibility +for providing the setup to publish change events, it relies on external tools +to consume them. At Zalando, we are using a workflow based on +[Debezium Connector](https://debezium.io/documentation/reference/stable/connectors/postgresql.html) +which can feed streams into Zalando’s distributed event broker [Nakadi](https://nakadi.io/) +among others. + +The Postgres Operator creates custom resources for Zalando's internal CDC +operator which will be used to set up the consumer part. Each stream object +can have the following properties: * **applicationId** The application name to which the database and CDC belongs to. For each diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 75675d490..a6d498d83 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -15,32 +15,22 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func (c *Cluster) createStreams(appId string) { +func (c *Cluster) createStreams(appId string) error { c.setProcessName("creating streams") - var ( - fes *zalandov1.FabricEventStream - err error - ) - - msg := "could not create event stream custom resource with applicationId %s: %v" - - fes = c.generateFabricEventStream(appId) - if err != nil { - c.logger.Warningf(msg, appId, err) - } - _, err = c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) - if err != nil { - c.logger.Warningf(msg, appId, err) + fes := c.generateFabricEventStream(appId) + if _, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}); err != nil { + return err } + + return nil } func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error { c.setProcessName("updating event streams") - _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("could not update event stream custom resource: %v", err) + if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}); err != nil { + return err } return nil @@ -50,7 +40,7 @@ func (c *Cluster) deleteStreams() error { c.setProcessName("deleting event streams") // check if stream CRD is installed before trying a delete - _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{}) + _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{}) if k8sutil.ResourceNotFound(err) { return nil } @@ -74,9 +64,39 @@ func gatherApplicationIds(streams []acidv1.Stream) []string { return appIds } -func (c *Cluster) syncPostgresConfig() error { - slots := make(map[string]map[string]string) - publications := make(map[string]map[string]acidv1.StreamTable) +func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) error { + errorMsg := "no pods found to update config" + + // if streams are defined wal_level must be switched to logical + requiredPgParameters := map[string]string{"wal_level": "logical"} + + // apply config changes in pods + pods, err := c.listPods() + if err != nil { + errorMsg = fmt.Sprintf("could not list pods of the statefulset: %v", err) + } + for i, pod := range pods { + podName := util.NameFromMeta(pods[i].ObjectMeta) + effectivePatroniConfig, effectivePgParameters, err := c.patroni.GetConfig(&pod) + if err != nil { + errorMsg = fmt.Sprintf("could not get Postgres config from pod %s: %v", podName, err) + continue + } + + _, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters) + if err != nil { + errorMsg = fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) + continue + } + + // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used + return nil + } + + return fmt.Errorf(errorMsg) +} + +func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error { createPublications := make(map[string]string) alterPublications := make(map[string]string) @@ -86,106 +106,45 @@ func (c *Cluster) syncPostgresConfig() error { } }() - desiredPatroniConfig := c.Spec.Patroni - if len(desiredPatroniConfig.Slots) > 0 { - slots = desiredPatroniConfig.Slots + // check for existing publications + if err := c.initDbConnWithName(dbName); err != nil { + return fmt.Errorf("could not init database connection") } - // define extra logical slots for Patroni config - for _, stream := range c.Spec.Streams { - slot := map[string]string{ - "database": stream.Database, - "plugin": constants.EventStreamSourcePluginType, - "type": "logical", - } - slotName := getSlotName(stream.Database, stream.ApplicationId) - if _, exists := slots[slotName]; !exists { - slots[slotName] = slot - publications[slotName] = stream.Tables - } else { - streamTables := publications[slotName] - for tableName, table := range stream.Tables { - if _, exists := streamTables[tableName]; !exists { - streamTables[tableName] = table - } - } - publications[slotName] = streamTables - } + currentPublications, err := c.getPublications() + if err != nil { + return fmt.Errorf("could not get current publications: %v", err) } - if len(slots) > 0 { - desiredPatroniConfig.Slots = slots - } else { + tableNames := make([]string, len(tables)) + i := 0 + for t := range tables { + tableName, schemaName := getTableSchema(t) + tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) + i++ + } + sort.Strings(tableNames) + tableList := strings.Join(tableNames, ", ") + + currentTables, exists := currentPublications[publication] + if !exists { + createPublications[publication] = tableList + } else if currentTables != tableList { + alterPublications[publication] = tableList + } + + if len(createPublications)+len(alterPublications) == 0 { return nil } - // if streams are defined wal_level must be switched to logical - desiredPgParameters := map[string]string{"wal_level": "logical"} - - // apply config changes in pods - pods, err := c.listPods() - if err != nil || len(pods) == 0 { - c.logger.Warningf("could not list pods of the statefulset: %v", err) - } - for i, pod := range pods { - podName := util.NameFromMeta(pods[i].ObjectMeta) - effectivePatroniConfig, effectivePgParameters, err := c.patroni.GetConfig(&pod) - if err != nil { - c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) - continue - } - - _, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, desiredPatroniConfig, effectivePgParameters, desiredPgParameters) - if err != nil { - c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) - continue + for publicationName, tables := range createPublications { + if err = c.executeCreatePublication(publicationName, tables); err != nil { + return fmt.Errorf("creation of publication %q failed: %v", publicationName, err) } } - - // next, create publications to each created slot - c.logger.Debug("syncing database publications") - for publication, tables := range publications { - // but first check for existing publications - dbName := slots[publication]["database"] - if err := c.initDbConnWithName(dbName); err != nil { - return fmt.Errorf("could not init database connection") - } - - currentPublications, err := c.getPublications() - if err != nil { - return fmt.Errorf("could not get current publications: %v", err) - } - - tableNames := make([]string, len(tables)) - i := 0 - for t := range tables { - tableName, schemaName := getTableSchema(t) - tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) - i++ - } - sort.Strings(tableNames) - tableList := strings.Join(tableNames, ", ") - - currentTables, exists := currentPublications[publication] - if !exists { - createPublications[publication] = tableList - } else if currentTables != tableList { - alterPublications[publication] = tableList - } - - if len(createPublications)+len(alterPublications) == 0 { - return nil - } - - for publicationName, tables := range createPublications { - if err = c.executeCreatePublication(publicationName, tables); err != nil { - c.logger.Warningf("%v", err) - } - } - for publicationName, tables := range alterPublications { - if err = c.executeAlterPublication(publicationName, tables); err != nil { - c.logger.Warningf("%v", err) - } + for publicationName, tables := range alterPublications { + if err = c.executeAlterPublication(publicationName, tables); err != nil { + return fmt.Errorf("update of publication %q failed: %v", publicationName, err) } } @@ -213,8 +172,8 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent return &zalandov1.FabricEventStream{ TypeMeta: metav1.TypeMeta{ - Kind: constants.EventStreamSourceCRDKind, - APIVersion: "zalando.org/v1", + APIVersion: constants.EventStreamCRDApiVersion, + Kind: constants.EventStreamCRDKind, }, ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", c.Name, appId), @@ -300,15 +259,65 @@ func (c *Cluster) syncStreams() error { c.setProcessName("syncing streams") - _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{}) + _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{}) if k8sutil.ResourceNotFound(err) { c.logger.Debugf("event stream CRD not installed, skipping") return nil } - err = c.syncPostgresConfig() + slots := make(map[string]map[string]string) + publications := make(map[string]map[string]acidv1.StreamTable) + + requiredPatroniConfig := c.Spec.Patroni + if len(requiredPatroniConfig.Slots) > 0 { + slots = requiredPatroniConfig.Slots + } + + // gather list of required slots and publications + for _, stream := range c.Spec.Streams { + slot := map[string]string{ + "database": stream.Database, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + } + slotName := getSlotName(stream.Database, stream.ApplicationId) + if _, exists := slots[slotName]; !exists { + slots[slotName] = slot + publications[slotName] = stream.Tables + } else { + streamTables := publications[slotName] + for tableName, table := range stream.Tables { + if _, exists := streamTables[tableName]; !exists { + streamTables[tableName] = table + } + } + publications[slotName] = streamTables + } + } + + // no slots = no streams defined + if len(slots) > 0 { + requiredPatroniConfig.Slots = slots + } else { + return nil + } + + // add extra logical slots to Patroni config + c.logger.Debug("syncing Postgres config for logical decoding") + err = c.syncPostgresConfig(requiredPatroniConfig) if err != nil { - return fmt.Errorf("could not update Postgres config for event streaming: %v", err) + return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err) + } + + // next, create publications to each created slot + c.logger.Debug("syncing database publications") + for publication, tables := range publications { + // but first check for existing publications + dbName := slots[publication]["database"] + err = c.syncPublication(publication, dbName, tables) + if err != nil { + c.logger.Warningf("could not sync publication %q in database %d: %v", publication, dbName, err) + } } err = c.createOrUpdateStreams() @@ -331,7 +340,11 @@ func (c *Cluster) createOrUpdateStreams() error { } c.logger.Infof("event streams do not exist, create it") - c.createStreams(appId) + err = c.createStreams(appId) + if err != nil { + return fmt.Errorf("failed creating event stream %s: %v", fesName, err) + } + c.logger.Infof("event stream %q has been successfully created", fesName) } else { desiredStreams := c.generateFabricEventStream(appId) if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { @@ -341,6 +354,7 @@ func (c *Cluster) createOrUpdateStreams() error { if err != nil { return fmt.Errorf("failed updating event stream %s: %v", fesName, err) } + c.logger.Infof("event stream %q has been successfully updated", fesName) } } } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 283b956e9..89dd294ca 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -82,8 +82,8 @@ var ( fes = &v1.FabricEventStream{ TypeMeta: metav1.TypeMeta{ - Kind: "FabricEventStream", - APIVersion: "zalando.org/v1", + APIVersion: constants.EventStreamCRDApiVersion, + Kind: constants.EventStreamCRDKind, }, ObjectMeta: metav1.ObjectMeta{ Name: fesName, diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go index a636abf1c..bd70b719b 100644 --- a/pkg/util/constants/streams.go +++ b/pkg/util/constants/streams.go @@ -2,8 +2,9 @@ package constants // PostgreSQL specific constants const ( - EventStreamSourceCRDKind = "FabricEventStream" - EventStreamSourceCRDName = "fabriceventstreams.zalando.org" + EventStreamCRDApiVersion = "zalando.org/v1" + EventStreamCRDKind = "FabricEventStream" + EventStreamCRDName = "fabriceventstreams.zalando.org" EventStreamSourcePGType = "PostgresLogicalReplication" EventStreamSourceSlotPrefix = "fes" EventStreamSourcePluginType = "pgoutput"