diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index cf4cd4251..763b4e6c5 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -477,6 +477,9 @@ spec: nullable: true items: type: object + required: + - database + - tables properties: batchSize: type: integer @@ -490,11 +493,6 @@ spec: type: object additionalProperties: type: string - streamType: - type: string - enum: - - "nakadi" - - "wal" teamId: type: string tls: diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index f879e7938..ee8555bea 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -522,27 +522,24 @@ under the `streams` top-level key will be used by the operator to create a CRD for Zalando's internal CDC operator named like the Postgres cluster. Each stream object can have the following properties: -* **streamType** - Defines the stream flow. Choose `nakadi` when you want to specify certain - nakadi event types of or `wal` if changes should be mapped to a generic - event type. Default is `wal`. - * **database** Name of the database from where events will be published via Postgres' logical decoding feature. The operator will take care of updating the database configuration (setting `wal_level: logical`, creating logical replication slots, using output plugin `wal2json` and creating a dedicated - replication user). + replication user). Required. * **tables** Defines a map of table names and event types. The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/) meaning changes are only consumed from an extra table that already has the structure of the event in the target sink. The operator will assume that this - outbox table is called like `__outbox`. + outbox table is called like `
__outbox`. Required. * **filter** Streamed events can be filtered by a jsonpath expression for each table. + Optional. * **batchSize** - Defines the size of batches in which events are consumed. + Defines the size of batches in which events are consumed. Optional. + Defaults to 1. diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 68977bbcd..a28e5999e 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -198,15 +198,11 @@ spec: # Enables change data capture streams for defined database tables # streams: -# - streamType: nakadi -# batchSize: 100 -# database: foo +# - database: foo # tables: -# ta: event_type_a -# tb: event_type_b -# - streamType: wal -# batchSize: 100 -# database: foo -# tables: -# public.tx: event_type_a -# public.ty: event_type_b +# data.ta: event_type_a +# data.tb: event_type_b +# # Optional. Filter ignores events before a certain txnId and lsn. Can be used to skip bad events +# filter: +# data.ta: "[?(@.source.txId > 500 && @.source.lsn > 123456)]" +# batchSize: 1000 diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index e2aa227a0..6e6b091e7 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -473,6 +473,9 @@ spec: nullable: true items: type: object + required: + - database + - tables properties: batchSize: type: integer @@ -486,11 +489,6 @@ spec: type: object additionalProperties: type: string - streamType: - type: string - enum: - - "nakadi" - - "wal" teamId: type: string tls: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index c3ee7d81d..214c8b3b4 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -664,7 +664,8 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ Type: "array", Items: &apiextv1.JSONSchemaPropsOrArray{ Schema: &apiextv1.JSONSchemaProps{ - Type: "object", + Type: "object", + Required: []string{"database", "tables"}, Properties: map[string]apiextv1.JSONSchemaProps{ "batchSize": { Type: "integer", @@ -688,17 +689,6 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, }, }, - "streamType": { - Type: "string", - Enum: []apiextv1.JSON{ - { - Raw: []byte(`"nakadi"`), - }, - { - Raw: []byte(`"wal"`), - }, - }, - }, }, }, }, diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index af45d7c04..0499763db 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -229,9 +229,8 @@ type ConnectionPooler struct { } type Stream struct { - StreamType string `json:"streamType,omitempty"` - Database string `json:"database,omitempty"` - Tables map[string]string `json:"tables,omitempty"` - Filter map[string]string `json:"filter,omitempty"` - BatchSize uint32 `json:"batchSize,omitempty"` + Database string `json:"database"` + Tables map[string]string `json:"tables"` + Filter map[string]string `json:"filter,omitempty"` + BatchSize uint32 `json:"batchSize,omitempty"` } diff --git a/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go b/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go index 111179d07..2ef0b6405 100644 --- a/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go @@ -40,12 +40,8 @@ type EventStream struct { // EventStreamFlow defines the flow characteristics of the event stream type EventStreamFlow struct { - Type string `json:"type"` - DataTypeColumn string `json:"dataTypeColumn,omitempty"` - DataOpColumn string `json:"dataOpColumn,omitempty"` - MetadataColumn string `json:"metadataColumn,omitempty"` - DataColumn string `json:"dataColumn,omitempty"` - PayloadColumn string `json:"payloadColumn,omitempty"` + Type string `json:"type"` + PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"` } // EventStreamSink defines the target of the event stream @@ -67,7 +63,7 @@ type EventStreamSource struct { // EventStreamTable defines the name and ID column to be used for streaming type EventStreamTable struct { Name string `json:"name"` - IDColumn string `json:"idColumn,omitempty"` + IDColumn string `json:"idColumn,omitempty" defaults:"id"` } // Connection to be used for allowing the FES operator to connect to a database diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index e9e3ce66c..b6e1ef29a 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -150,23 +150,9 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType st } func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { - switch stream.StreamType { - case "nakadi": - return zalandov1alpha1.EventStreamFlow{ - Type: constants.EventStreamFlowPgNakadiType, - DataTypeColumn: constants.EventStreamFlowDataTypeColumn, - DataOpColumn: constants.EventStreamFlowDataOpColumn, - MetadataColumn: constants.EventStreamFlowMetadataColumn, - DataColumn: constants.EventStreamFlowDataColumn, - } - case "wal": - return zalandov1alpha1.EventStreamFlow{ - Type: constants.EventStreamFlowPgGenericType, - PayloadColumn: constants.EventStreamFlowPayloadColumn, - } + return zalandov1alpha1.EventStreamFlow{ + Type: constants.EventStreamFlowPgGenericType, } - - return zalandov1alpha1.EventStreamFlow{} } func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1alpha1.EventStreamSink { @@ -190,8 +176,7 @@ func getTableSchema(fullTableName string) (tableName, schemaName string) { func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTable { return zalandov1alpha1.EventStreamTable{ - Name: outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType), - IDColumn: "id", + Name: outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType), } } @@ -236,15 +221,16 @@ func (c *Cluster) syncStreams() error { c.logger.Infof("event streams do not exist, create it") err := c.createStreams() if err != nil { - return fmt.Errorf("event stream creation failed: %v", err) + return fmt.Errorf("event streams creation failed: %v", err) } } else { desiredStreams := c.generateFabricEventStream() if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { + c.logger.Debug("updating event streams") desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion err = c.updateStreams(desiredStreams) if err != nil { - return fmt.Errorf("event stream update failed: %v", err) + return fmt.Errorf("event streams update failed: %v", err) } } } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 5976d97b3..617efcc31 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -47,16 +47,7 @@ var ( }, Streams: []acidv1.Stream{ { - StreamType: "nakadi", - Database: "foo", - Tables: map[string]string{ - "bar": "stream_type_a", - }, - BatchSize: uint32(100), - }, - { - StreamType: "wal", - Database: "foo", + Database: "foo", Tables: map[string]string{ "bar": "stream_type_a", }, @@ -132,8 +123,7 @@ func TestUpdateFabricEventStream(t *testing.T) { var pgSpec acidv1.PostgresSpec pgSpec.Streams = []acidv1.Stream{ { - StreamType: "nakadi", - Database: "foo", + Database: "foo", Tables: map[string]string{ "bar": "stream_type_b", }, diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go index fb721d309..25f72d85a 100644 --- a/pkg/util/constants/streams.go +++ b/pkg/util/constants/streams.go @@ -2,15 +2,9 @@ package constants // PostgreSQL specific constants const ( - EventStreamSourcePGType = "PostgresLogicalReplication" - EventStreamSourceSlotPrefix = "fes" - EventStreamSourceAuthType = "DatabaseAuthenticationSecret" - EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent" - EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" - EventStreamFlowDataTypeColumn = "data_type" - EventStreamFlowDataOpColumn = "data_op" - EventStreamFlowMetadataColumn = "metadata" - EventStreamFlowDataColumn = "data" - EventStreamFlowPayloadColumn = "payload" - EventStreamSinkNakadiType = "Nakadi" + EventStreamSourcePGType = "PostgresLogicalReplication" + EventStreamSourceSlotPrefix = "fes" + EventStreamSourceAuthType = "DatabaseAuthenticationSecret" + EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" + EventStreamSinkNakadiType = "Nakadi" )