make id and payload columns configurable

This commit is contained in:
Felix Kunde 2021-12-02 17:31:31 +01:00
parent 837969ed76
commit bde522ba2a
9 changed files with 101 additions and 34 deletions

View File

@ -489,7 +489,16 @@ spec:
tables: tables:
type: object type: object
additionalProperties: additionalProperties:
type: string type: object
required:
- evenType
properties:
eventType:
type: string
idColumn:
type: string
payloadColumn:
type: string
teamId: teamId:
type: string type: string
tls: tls:

View File

@ -530,11 +530,15 @@ Each stream object can have the following properties:
replication user). Required. replication user). Required.
* **tables** * **tables**
Defines a map of (outbox) table names and event types. The CDC operator is following Defines a map of table names and their properties (`eventType`, `idColumn`
the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/). and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
This means that the application will put events into a column in the outbox table The application is responsible for putting events into a (JSON/B or VARCHAR)
in the structure of the target event type, and the CDC operator will capture them payload column of the outbox table in the structure of the specified target
shortly after the transaction is committed. Required. event type. The the CDC operator will consume them shortly after the
transaction is committed. The `idColumn` will be used in telemetry for the
CDC operator. The names for `idColumn` and `payloadColumn` can be configured.
Defaults are `id` and `payload`. The target `eventType` has to be defined.
Required.
* **filter** * **filter**
Streamed events can be filtered by a jsonpath expression for each table. Streamed events can be filtered by a jsonpath expression for each table.

View File

@ -200,8 +200,12 @@ spec:
# streams: # streams:
# - database: foo # - database: foo
# tables: # tables:
# data.ta: event_type_a # data.ta:
# data.tb: event_type_b # eventType: event_type_a
# data.tb:
# eventType: event_type_b
# idColumn: tb_id
# payloadColumn: tb_payload
# # Optional. Filter ignores events before a certain txnId and lsn. Can be used to skip bad events # # Optional. Filter ignores events before a certain txnId and lsn. Can be used to skip bad events
# filter: # filter:
# data.ta: "[?(@.source.txId > 500 && @.source.lsn > 123456)]" # data.ta: "[?(@.source.txId > 500 && @.source.lsn > 123456)]"

View File

@ -487,7 +487,16 @@ spec:
tables: tables:
type: object type: object
additionalProperties: additionalProperties:
type: string type: object
required:
- evenType
properties:
eventType:
type: string
idColumn:
type: string
payloadColumn:
type: string
teamId: teamId:
type: string type: string
tls: tls:

View File

@ -682,7 +682,19 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
Type: "object", Type: "object",
AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{
Schema: &apiextv1.JSONSchemaProps{ Schema: &apiextv1.JSONSchemaProps{
Type: "string", Type: "object",
Required: []string{"eventType"},
Properties: map[string]apiextv1.JSONSchemaProps{
"eventType": {
Type: "string",
},
"idColumn": {
Type: "string",
},
"payloadColumn": {
Type: "string",
},
},
}, },
}, },
}, },

View File

@ -229,8 +229,14 @@ type ConnectionPooler struct {
} }
type Stream struct { type Stream struct {
Database string `json:"database"` Database string `json:"database"`
Tables map[string]string `json:"tables"` Tables map[string]StreamTable `json:"tables"`
Filter map[string]string `json:"filter,omitempty"` Filter map[string]string `json:"filter,omitempty"`
BatchSize uint32 `json:"batchSize,omitempty"` BatchSize uint32 `json:"batchSize,omitempty"`
}
type StreamTable struct {
EventType string `json:"eventType"`
IdColumn string `json:"idColumn,omitempty" defaults:"id"`
PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"`
} }

View File

@ -1143,7 +1143,7 @@ func (in *Stream) DeepCopyInto(out *Stream) {
*out = *in *out = *in
if in.Tables != nil { if in.Tables != nil {
in, out := &in.Tables, &out.Tables in, out := &in.Tables, &out.Tables
*out = make(map[string]string, len(*in)) *out = make(map[string]StreamTable, len(*in))
for key, val := range *in { for key, val := range *in {
(*out)[key] = val (*out)[key] = val
} }
@ -1168,6 +1168,22 @@ func (in *Stream) DeepCopy() *Stream {
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StreamTable) DeepCopyInto(out *StreamTable) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StreamTable.
func (in *StreamTable) DeepCopy() *StreamTable {
if in == nil {
return nil
}
out := new(StreamTable)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TLSDescription) DeepCopyInto(out *TLSDescription) { func (in *TLSDescription) DeepCopyInto(out *TLSDescription) {
*out = *in *out = *in

View File

@ -9,14 +9,11 @@ import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1"
"github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/k8sutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
var outboxTableNameTemplate config.StringTemplate = "{table}_{eventtype}_outbox"
func (c *Cluster) createStreams() error { func (c *Cluster) createStreams() error {
c.setProcessName("creating streams") c.setProcessName("creating streams")
@ -114,10 +111,10 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream
eventStreams := make([]zalandov1alpha1.EventStream, 0) eventStreams := make([]zalandov1alpha1.EventStream, 0)
for _, stream := range c.Spec.Streams { for _, stream := range c.Spec.Streams {
for table, eventType := range stream.Tables { for tableName, table := range stream.Tables {
streamSource := c.getEventStreamSource(stream, table, eventType) streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn)
streamFlow := getEventStreamFlow(stream) streamFlow := getEventStreamFlow(stream, table.PayloadColumn)
streamSink := getEventStreamSink(stream, eventType) streamSink := getEventStreamSink(stream, table.EventType)
eventStreams = append(eventStreams, zalandov1alpha1.EventStream{ eventStreams = append(eventStreams, zalandov1alpha1.EventStream{
EventStreamFlow: streamFlow, EventStreamFlow: streamFlow,
@ -145,21 +142,22 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream
} }
} }
func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType string) zalandov1alpha1.EventStreamSource { func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1alpha1.EventStreamSource {
_, schema := getTableSchema(table) _, schema := getTableSchema(tableName)
streamFilter := stream.Filter[table] streamFilter := stream.Filter[tableName]
return zalandov1alpha1.EventStreamSource{ return zalandov1alpha1.EventStreamSource{
Type: constants.EventStreamSourcePGType, Type: constants.EventStreamSourcePGType,
Schema: schema, Schema: schema,
EventStreamTable: getOutboxTable(table, eventType), EventStreamTable: getOutboxTable(tableName, idColumn),
Filter: streamFilter, Filter: streamFilter,
Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix),
} }
} }
func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1alpha1.EventStreamFlow {
return zalandov1alpha1.EventStreamFlow{ return zalandov1alpha1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType, Type: constants.EventStreamFlowPgGenericType,
PayloadColumn: payloadColumn,
} }
} }
@ -182,9 +180,10 @@ func getTableSchema(fullTableName string) (tableName, schemaName string) {
return tableName, schemaName return tableName, schemaName
} }
func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTable { func getOutboxTable(tableName, idColumn string) zalandov1alpha1.EventStreamTable {
return zalandov1alpha1.EventStreamTable{ return zalandov1alpha1.EventStreamTable{
Name: outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType), Name: tableName,
IDColumn: idColumn,
} }
} }

View File

@ -48,8 +48,12 @@ var (
Streams: []acidv1.Stream{ Streams: []acidv1.Stream{
{ {
Database: "foo", Database: "foo",
Tables: map[string]string{ Tables: map[string]acidv1.StreamTable{
"bar": "stream_type_a", "bar": acidv1.StreamTable{
EventType: "stream_type_a",
IdColumn: "b_id",
PayloadColumn: "b_payload",
},
}, },
BatchSize: uint32(100), BatchSize: uint32(100),
}, },
@ -124,8 +128,12 @@ func TestUpdateFabricEventStream(t *testing.T) {
pgSpec.Streams = []acidv1.Stream{ pgSpec.Streams = []acidv1.Stream{
{ {
Database: "foo", Database: "foo",
Tables: map[string]string{ Tables: map[string]acidv1.StreamTable{
"bar": "stream_type_b", "bar": acidv1.StreamTable{
EventType: "stream_type_b",
IdColumn: "b_id",
PayloadColumn: "b_payload",
},
}, },
BatchSize: uint32(250), BatchSize: uint32(250),
}, },