diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 1c6064a90..f8da45c23 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -489,7 +489,16 @@ spec: tables: type: object additionalProperties: - type: string + type: object + required: + - evenType + properties: + eventType: + type: string + idColumn: + type: string + payloadColumn: + type: string teamId: type: string tls: diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 626f832fe..cd9d8c147 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -530,11 +530,15 @@ Each stream object can have the following properties: replication user). Required. * **tables** - Defines a map of (outbox) table names and event types. The CDC operator is following - the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/). - This means that the application will put events into a column in the outbox table - in the structure of the target event type, and the CDC operator will capture them - shortly after the transaction is committed. Required. + Defines a map of table names and their properties (`eventType`, `idColumn` + and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/). + The application is responsible for putting events into a (JSON/B or VARCHAR) + payload column of the outbox table in the structure of the specified target + event type. The the CDC operator will consume them shortly after the + transaction is committed. The `idColumn` will be used in telemetry for the + CDC operator. The names for `idColumn` and `payloadColumn` can be configured. + Defaults are `id` and `payload`. The target `eventType` has to be defined. + Required. * **filter** Streamed events can be filtered by a jsonpath expression for each table. diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 485b37475..3924ddd28 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -200,8 +200,12 @@ spec: # streams: # - database: foo # tables: -# data.ta: event_type_a -# data.tb: event_type_b +# data.ta: +# eventType: event_type_a +# data.tb: +# eventType: event_type_b +# idColumn: tb_id +# payloadColumn: tb_payload # # Optional. Filter ignores events before a certain txnId and lsn. Can be used to skip bad events # filter: # data.ta: "[?(@.source.txId > 500 && @.source.lsn > 123456)]" diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index ad12ad6be..4b49494a8 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -487,7 +487,16 @@ spec: tables: type: object additionalProperties: - type: string + type: object + required: + - evenType + properties: + eventType: + type: string + idColumn: + type: string + payloadColumn: + type: string teamId: type: string tls: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index b0c88e264..82dde6b94 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -682,7 +682,19 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ Type: "object", AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ Schema: &apiextv1.JSONSchemaProps{ - Type: "string", + Type: "object", + Required: []string{"eventType"}, + Properties: map[string]apiextv1.JSONSchemaProps{ + "eventType": { + Type: "string", + }, + "idColumn": { + Type: "string", + }, + "payloadColumn": { + Type: "string", + }, + }, }, }, }, diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 0499763db..7a645e8fa 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -229,8 +229,14 @@ type ConnectionPooler struct { } type Stream struct { - Database string `json:"database"` - Tables map[string]string `json:"tables"` - Filter map[string]string `json:"filter,omitempty"` - BatchSize uint32 `json:"batchSize,omitempty"` + Database string `json:"database"` + Tables map[string]StreamTable `json:"tables"` + Filter map[string]string `json:"filter,omitempty"` + BatchSize uint32 `json:"batchSize,omitempty"` +} + +type StreamTable struct { + EventType string `json:"eventType"` + IdColumn string `json:"idColumn,omitempty" defaults:"id"` + PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"` } diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 08cc7a3a9..b0f0dae25 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -1143,7 +1143,7 @@ func (in *Stream) DeepCopyInto(out *Stream) { *out = *in if in.Tables != nil { in, out := &in.Tables, &out.Tables - *out = make(map[string]string, len(*in)) + *out = make(map[string]StreamTable, len(*in)) for key, val := range *in { (*out)[key] = val } @@ -1168,6 +1168,22 @@ func (in *Stream) DeepCopy() *Stream { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StreamTable) DeepCopyInto(out *StreamTable) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StreamTable. +func (in *StreamTable) DeepCopy() *StreamTable { + if in == nil { + return nil + } + out := new(StreamTable) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TLSDescription) DeepCopyInto(out *TLSDescription) { *out = *in diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index b00b6d09c..cef86a05f 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -9,14 +9,11 @@ import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" "github.com/zalando/postgres-operator/pkg/util" - "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var outboxTableNameTemplate config.StringTemplate = "{table}_{eventtype}_outbox" - func (c *Cluster) createStreams() error { c.setProcessName("creating streams") @@ -114,10 +111,10 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream eventStreams := make([]zalandov1alpha1.EventStream, 0) for _, stream := range c.Spec.Streams { - for table, eventType := range stream.Tables { - streamSource := c.getEventStreamSource(stream, table, eventType) - streamFlow := getEventStreamFlow(stream) - streamSink := getEventStreamSink(stream, eventType) + for tableName, table := range stream.Tables { + streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn) + streamFlow := getEventStreamFlow(stream, table.PayloadColumn) + streamSink := getEventStreamSink(stream, table.EventType) eventStreams = append(eventStreams, zalandov1alpha1.EventStream{ EventStreamFlow: streamFlow, @@ -145,21 +142,22 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream } } -func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType string) zalandov1alpha1.EventStreamSource { - _, schema := getTableSchema(table) - streamFilter := stream.Filter[table] +func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1alpha1.EventStreamSource { + _, schema := getTableSchema(tableName) + streamFilter := stream.Filter[tableName] return zalandov1alpha1.EventStreamSource{ Type: constants.EventStreamSourcePGType, Schema: schema, - EventStreamTable: getOutboxTable(table, eventType), + EventStreamTable: getOutboxTable(tableName, idColumn), Filter: streamFilter, Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), } } -func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { +func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1alpha1.EventStreamFlow { return zalandov1alpha1.EventStreamFlow{ - Type: constants.EventStreamFlowPgGenericType, + Type: constants.EventStreamFlowPgGenericType, + PayloadColumn: payloadColumn, } } @@ -182,9 +180,10 @@ func getTableSchema(fullTableName string) (tableName, schemaName string) { return tableName, schemaName } -func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTable { +func getOutboxTable(tableName, idColumn string) zalandov1alpha1.EventStreamTable { return zalandov1alpha1.EventStreamTable{ - Name: outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType), + Name: tableName, + IDColumn: idColumn, } } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 725c856b1..8a339b11a 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -48,8 +48,12 @@ var ( Streams: []acidv1.Stream{ { Database: "foo", - Tables: map[string]string{ - "bar": "stream_type_a", + Tables: map[string]acidv1.StreamTable{ + "bar": acidv1.StreamTable{ + EventType: "stream_type_a", + IdColumn: "b_id", + PayloadColumn: "b_payload", + }, }, BatchSize: uint32(100), }, @@ -124,8 +128,12 @@ func TestUpdateFabricEventStream(t *testing.T) { pgSpec.Streams = []acidv1.Stream{ { Database: "foo", - Tables: map[string]string{ - "bar": "stream_type_b", + Tables: map[string]acidv1.StreamTable{ + "bar": acidv1.StreamTable{ + EventType: "stream_type_b", + IdColumn: "b_id", + PayloadColumn: "b_payload", + }, }, BatchSize: uint32(250), },