diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 77546ca76..d94cc83e8 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -475,9 +475,12 @@ spec: items: type: object required: + - applicationId - database - tables properties: + applicationId: + type: string batchSize: type: integer database: diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 160ab2af4..8668cd72e 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -522,9 +522,17 @@ Those parameters are grouped under the `tls` top-level key. This sections enables change data capture (CDC) streams e.g. into Zalando’s distributed event broker [Nakadi](https://nakadi.io/). Parameters grouped -under the `streams` top-level key will be used by the operator to create a -CRD for Zalando's internal CDC operator named like the Postgres cluster. -Each stream object can have the following properties: +under the `streams` top-level key will be used by the operator to create +custom resources for Zalando's internal CDC operator. Each stream object can +have the following properties: + +* **applicationId** + The application name to which the database and CDC belongs to. For each + set of streams with a distinct `applicationId` a separate stream CR as well + as a separate logical replication slot will be created. This means there can + different streams in the same database and streams with the same + `applicationId` are bundled in one stream CR. The stream CR will be called + like the Postgres cluster plus "-" suffix. Required. * **database** Name of the database from where events will be published via Postgres' diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 3924ddd28..28d0a970d 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -3,6 +3,7 @@ kind: postgresql metadata: name: acid-test-cluster # labels: +# application: test-app # environment: demo # annotations: # "acid.zalan.do/controller": "second-operator" @@ -198,15 +199,16 @@ spec: # Enables change data capture streams for defined database tables # streams: -# - database: foo +# - applicationId: test-app +# database: foo # tables: -# data.ta: +# data.tab_a: # eventType: event_type_a -# data.tb: +# data.tab_b: # eventType: event_type_b # idColumn: tb_id # payloadColumn: tb_payload # # Optional. Filter ignores events before a certain txnId and lsn. Can be used to skip bad events # filter: -# data.ta: "[?(@.source.txId > 500 && @.source.lsn > 123456)]" +# data.tab_a: "[?(@.source.txId > 500 && @.source.lsn > 123456)]" # batchSize: 1000 diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 86408e564..29aa599df 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -473,9 +473,12 @@ spec: items: type: object required: + - applicationId - database - tables properties: + applicationId: + type: string batchSize: type: integer database: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index 0f105fdd4..6277a4f73 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -664,8 +664,11 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ Items: &apiextv1.JSONSchemaPropsOrArray{ Schema: &apiextv1.JSONSchemaProps{ Type: "object", - Required: []string{"database", "tables"}, + Required: []string{"applicationId", "database", "tables"}, Properties: map[string]apiextv1.JSONSchemaProps{ + "applicationId": { + Type: "string", + }, "batchSize": { Type: "integer", }, diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 8b8067fb3..561dfcd05 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -230,10 +230,11 @@ type ConnectionPooler struct { } type Stream struct { - Database string `json:"database"` - Tables map[string]StreamTable `json:"tables"` - Filter map[string]string `json:"filter,omitempty"` - BatchSize uint32 `json:"batchSize,omitempty"` + ApplicationId string `json:"applicationId"` + Database string `json:"database"` + Tables map[string]StreamTable `json:"tables"` + Filter map[string]string `json:"filter,omitempty"` + BatchSize uint32 `json:"batchSize,omitempty"` } type StreamTable struct { diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index e3374f6e1..86e3687db 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -14,16 +14,24 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func (c *Cluster) createStreams() error { +func (c *Cluster) createStreams(appId string) { c.setProcessName("creating streams") - fes := c.generateFabricEventStream() - _, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("could not create event stream custom resource: %v", err) - } + var ( + fes *zalandov1alpha1.FabricEventStream + err error + ) - return nil + msg := "could not create event stream custom resource with applicationId %s: %v" + + fes = c.generateFabricEventStream(appId) + if err != nil { + c.logger.Warningf(msg, appId, err) + } + _, err = c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) + if err != nil { + c.logger.Warningf(msg, appId, err) + } } func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStream) error { @@ -54,21 +62,34 @@ func (c *Cluster) deleteStreams() error { return nil } +func gatherApplicationIds(streams []acidv1.Stream) []string { + appIds := make([]string, 0) + for _, stream := range streams { + if !util.SliceContains(appIds, stream.ApplicationId) { + appIds = append(appIds, stream.ApplicationId) + } + } + + return appIds +} + func (c *Cluster) syncPostgresConfig() error { + slots := make(map[string]map[string]string) desiredPatroniConfig := c.Spec.Patroni - slots := desiredPatroniConfig.Slots + if len(desiredPatroniConfig.Slots) > 0 { + slots = desiredPatroniConfig.Slots + } for _, stream := range c.Spec.Streams { - slotName := c.getLogicalReplicationSlot(stream.Database) - - if slotName == "" { - slot := map[string]string{ - "database": stream.Database, - "plugin": "wal2json", - "type": "logical", - } - slots[constants.EventStreamSourceSlotPrefix+"_"+stream.Database] = slot + slot := map[string]string{ + "database": stream.Database, + "plugin": "wal2json", + "type": "logical", + } + slotName := constants.EventStreamSourceSlotPrefix + "_" + stream.Database + "_" + stream.ApplicationId + if _, exists := slots[slotName]; !exists { + slots[slotName] = slot } } @@ -107,16 +128,13 @@ func (c *Cluster) syncPostgresConfig() error { return nil } -func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream { - var applicationId string +func (c *Cluster) generateFabricEventStream(appId string) *zalandov1alpha1.FabricEventStream { eventStreams := make([]zalandov1alpha1.EventStream, 0) - // take application label from manifest - if spec, err := c.GetSpec(); err == nil { - applicationId = spec.ObjectMeta.Labels["application"] - } - for _, stream := range c.Spec.Streams { + if stream.ApplicationId != appId { + continue + } for tableName, table := range stream.Tables { streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn) streamFlow := getEventStreamFlow(stream, table.PayloadColumn) @@ -135,14 +153,14 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream APIVersion: "zalando.org/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ - Name: c.Name, + Name: c.Name + "-" + appId, Namespace: c.Namespace, Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)), // make cluster StatefulSet the owner (like with connection pooler objects) OwnerReferences: c.ownerReferences(), }, Spec: zalandov1alpha1.FabricEventStreamSpec{ - ApplicationId: applicationId, + ApplicationId: appId, EventStreams: eventStreams, }, } @@ -156,7 +174,10 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn Schema: schema, EventStreamTable: getOutboxTable(table, idColumn), Filter: streamFilter, - Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), + Connection: c.getStreamConnection( + stream.Database, + constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix, + stream.ApplicationId), } } @@ -193,10 +214,10 @@ func getOutboxTable(tableName, idColumn string) zalandov1alpha1.EventStreamTable } } -func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Connection { +func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1alpha1.Connection { return zalandov1alpha1.Connection{ Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), - SlotName: c.getLogicalReplicationSlot(database), + SlotName: constants.EventStreamSourceSlotPrefix + "_" + database + "_" + appId, DBAuth: zalandov1alpha1.DBAuth{ Type: constants.EventStreamSourceAuthType, Name: c.credentialSecretNameForCluster(user, c.Name), @@ -206,16 +227,6 @@ func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Con } } -func (c *Cluster) getLogicalReplicationSlot(database string) string { - for slotName, slot := range c.Spec.Patroni.Slots { - if slot["type"] == "logical" && slot["database"] == database && slot["plugin"] == "wal2json" { - return slotName - } - } - - return constants.EventStreamSourceSlotPrefix + "_" + database -} - func (c *Cluster) syncStreams() error { _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{}) @@ -241,25 +252,26 @@ func (c *Cluster) createOrUpdateStreams() error { return fmt.Errorf("could not update Postgres config for event streaming: %v", err) } - effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), c.Name, metav1.GetOptions{}) - if err != nil { - if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("error during reading of event streams: %v", err) - } - - c.logger.Infof("event streams do not exist, create it") - err := c.createStreams() + appIds := gatherApplicationIds(c.Spec.Streams) + for _, appId := range appIds { + fesName := c.Name + "-" + appId + effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("event streams creation failed: %v", err) - } - } else { - desiredStreams := c.generateFabricEventStream() - if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { - c.logger.Debug("updating event streams") - desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion - err = c.updateStreams(desiredStreams) - if err != nil { - return fmt.Errorf("event streams update failed: %v", err) + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("failed reading event stream %s: %v", fesName, err) + } + + c.logger.Infof("event streams do not exist, create it") + c.createStreams(appId) + } else { + desiredStreams := c.generateFabricEventStream(appId) + if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { + c.logger.Debug("updating event streams") + desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion + err = c.updateStreams(desiredStreams) + if err != nil { + return fmt.Errorf("failed updating event stream %s: %v", fesName, err) + } } } } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index a18a00d67..aa7ba8961 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -37,7 +37,10 @@ func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) { var ( clusterName string = "acid-test-cluster" namespace string = "default" + appId string = "test-app" + dbName string = "foo" fesUser string = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix + fesName string = clusterName + "-" + appId pg = acidv1.Postgresql{ TypeMeta: metav1.TypeMeta{ @@ -47,15 +50,15 @@ var ( ObjectMeta: metav1.ObjectMeta{ Name: clusterName, Namespace: namespace, - Labels: map[string]string{"application": "test"}, }, Spec: acidv1.PostgresSpec{ Databases: map[string]string{ - "foo": "foo_user", + dbName: dbName + constants.UserRoleNameSuffix, }, Streams: []acidv1.Stream{ { - Database: "foo", + ApplicationId: appId, + Database: "foo", Tables: map[string]acidv1.StreamTable{ "data.bar": acidv1.StreamTable{ EventType: "stream_type_a", @@ -69,9 +72,6 @@ var ( BatchSize: uint32(100), }, }, - Users: map[string]acidv1.UserFlags{ - "foo_user": []string{"replication"}, - }, Volume: acidv1.Volume{ Size: "1Gi", }, @@ -84,7 +84,7 @@ var ( APIVersion: "zalando.org/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ - Name: clusterName, + Name: fesName, Namespace: namespace, OwnerReferences: []metav1.OwnerReference{ metav1.OwnerReference{ @@ -96,7 +96,7 @@ var ( }, }, Spec: v1alpha1.FabricEventStreamSpec{ - ApplicationId: "test", + ApplicationId: appId, EventStreams: []v1alpha1.EventStream{ { EventStreamFlow: v1alpha1.EventStreamFlow{ @@ -118,7 +118,7 @@ var ( UserKey: "username", }, Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), - SlotName: "fes_foo", + SlotName: fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, appId), }, Schema: "data", EventStreamTable: v1alpha1.EventStreamTable{ @@ -164,13 +164,13 @@ func TestGenerateFabricEventStream(t *testing.T) { err = cluster.createOrUpdateStreams() assert.NoError(t, err) - result := cluster.generateFabricEventStream() + result := cluster.generateFabricEventStream(appId) if !reflect.DeepEqual(result, fes) { t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, result) } - streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name, metav1.GetOptions{}) + streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) assert.NoError(t, err) if !reflect.DeepEqual(streamCRD, fes) { @@ -206,7 +206,8 @@ func TestUpdateFabricEventStream(t *testing.T) { var pgSpec acidv1.PostgresSpec pgSpec.Streams = []acidv1.Stream{ { - Database: "foo", + ApplicationId: appId, + Database: dbName, Tables: map[string]acidv1.StreamTable{ "data.bar": acidv1.StreamTable{ EventType: "stream_type_b", @@ -230,10 +231,10 @@ func TestUpdateFabricEventStream(t *testing.T) { err = cluster.createOrUpdateStreams() assert.NoError(t, err) - streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name, metav1.GetOptions{}) + streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) assert.NoError(t, err) - result := cluster.generateFabricEventStream() + result := cluster.generateFabricEventStream(appId) if !reflect.DeepEqual(result, streamCRD) { t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result) }