From d2addd3b17aa5a3b0b3be65c0190314bf24b9850 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 27 Aug 2021 13:00:38 +0200 Subject: [PATCH] re-add 3rd type, remove callHome flow --- .../postgres-operator/crds/postgresqls.yaml | 3 ++ manifests/complete-postgres-manifest.yaml | 7 ++++ manifests/postgresql.crd.yaml | 3 ++ pkg/apis/acid.zalan.do/v1/crds.go | 6 +++ pkg/apis/acid.zalan.do/v1/postgresql_type.go | 1 + .../zalando.org/v1alpha1/fabriceventstream.go | 2 + pkg/cluster/streams.go | 38 ++++++++++++------- pkg/cluster/streams_test.go | 8 ++++ pkg/util/constants/streams.go | 25 ++++++------ 9 files changed, 68 insertions(+), 25 deletions(-) diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 743f75470..40474dd56 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -490,6 +490,8 @@ spec: type: string sqsArn: type: string + sqsFifo: + type: boolean tables: type: object additionalProperties: @@ -499,6 +501,7 @@ spec: enum: - "nakadi" - "sqs" + - "wal" teamId: type: string tls: diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 30212618b..74c68ae36 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -198,10 +198,17 @@ spec: # tables: # ta: event_type_a # tb: event_type_b +# - type: wal +# batchSize: 100 +# database: foo +# tables: +# public.tx: event_type_a +# public.ty: event_type_b # - type: sqs # database: foo # tables: # ta: "" # tb: "" # sqsArn: arn:aws:sqs:eu-central-1:111122223333 +# sqsFifo: true # queueName: foo-queue diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index b6fc64c80..4086ab1d7 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -486,6 +486,8 @@ spec: type: string sqsArn: type: string + sqsFifo: + type: boolean tables: type: object additionalProperties: @@ -495,6 +497,7 @@ spec: enum: - "nakadi" - "sqs" + - "wal" teamId: type: string tls: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index 617a219f9..787d3a236 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -684,6 +684,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ "sqsArn": { Type: "string", }, + "sqsFifo": { + Type: "boolean", + }, "tables": { Type: "object", AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ @@ -701,6 +704,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ { Raw: []byte(`"sqs"`), }, + { + Raw: []byte(`"wal"`), + }, }, }, }, diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index a4a8f4477..04d36276b 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -233,5 +233,6 @@ type Stream struct { Filter map[string]string `json:"filter,omitempty"` BatchSize uint32 `json:"batchSize,omitempty"` SqsArn string `json:"sqsArn,omitempty"` + SqsFifo bool `json:"sqsFifo,omitempty"` QueueName string `json:"queueName,omitempty"` } diff --git a/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go b/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go index fbfad76f1..36f68ca2f 100644 --- a/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go @@ -45,6 +45,7 @@ type EventStreamFlow struct { DataOpColumn string `json:"dataOpColumn,omitempty"` MetadataColumn string `json:"metadataColumn,omitempty"` DataColumn string `json:"dataColumn,omitempty"` + PayloadColumn string `json:"payloadColumn,omitempty"` CallHomeIdColumn string `json:"callHomeIdColumn,omitempty"` CallHomeUrl string `json:"callHomeUrl,omitempty"` } @@ -55,6 +56,7 @@ type EventStreamSink struct { EventType string `json:"eventType,omitempty"` MaxBatchSize uint32 `json:"maxBatchSize,omitempty"` QueueName string `json:"queueName,omitempty"` + QueueUrl string `json:"queueUrl,omitempty"` } // EventStreamSource defines the source of the event stream and connection for FES operator diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 72e03fb20..4134e1723 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -142,10 +142,11 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType st Filter: streamFilter, Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), } - case "sqs": + case "default": return zalandov1alpha1.EventStreamSource{ Type: constants.EventStreamSourcePGType, - EventStreamTable: getSqsTable(table), + Schema: schema, + EventStreamTable: getSourceTable(table), Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), } } @@ -161,12 +162,13 @@ func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { DataTypeColumn: constants.EventStreamFlowDataTypeColumn, DataOpColumn: constants.EventStreamFlowDataOpColumn, MetadataColumn: constants.EventStreamFlowMetadataColumn, - DataColumn: constants.EventStreamFlowDataColumn} - case "sqs": + DataColumn: constants.EventStreamFlowDataColumn, + } + case "default": return zalandov1alpha1.EventStreamFlow{ - Type: constants.EventStreamFlowPgApiType, - CallHomeIdColumn: "id", - CallHomeUrl: stream.SqsArn} + Type: constants.EventStreamFlowPgGenericType, + PayloadColumn: constants.EventStreamFlowPayloadColumn, + } } return zalandov1alpha1.EventStreamFlow{} @@ -174,15 +176,23 @@ func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1alpha1.EventStreamSink { switch stream.StreamType { - case "nakadi": + case "sqs": + sqsSinkType := constants.EventStreamSinkSqsStandardType + if stream.SqsFifo { + sqsSinkType = constants.EventStreamSinkSqsFifoType + } + return zalandov1alpha1.EventStreamSink{ + Type: sqsSinkType, + QueueName: stream.QueueName, + QueueUrl: stream.SqsArn, + MaxBatchSize: stream.BatchSize, + } + case "default": return zalandov1alpha1.EventStreamSink{ Type: constants.EventStreamSinkNakadiType, EventType: eventType, - MaxBatchSize: stream.BatchSize} - case "sqs": - return zalandov1alpha1.EventStreamSink{ - Type: constants.EventStreamSinkSqsType, - QueueName: stream.QueueName} + MaxBatchSize: stream.BatchSize, + } } return zalandov1alpha1.EventStreamSink{} @@ -206,7 +216,7 @@ func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTabl } } -func getSqsTable(tableName string) zalandov1alpha1.EventStreamTable { +func getSourceTable(tableName string) zalandov1alpha1.EventStreamTable { return zalandov1alpha1.EventStreamTable{ Name: outboxTableNameTemplate.Format("table", tableName), } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 71d263025..78c1a4ec7 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -54,6 +54,14 @@ var ( }, BatchSize: uint32(100), }, + { + StreamType: "wal", + Database: "foo", + Tables: map[string]string{ + "bar": "stream_type_a", + }, + BatchSize: uint32(100), + }, { StreamType: "sqs", Database: "foo", diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go index a7ea2b0b8..3eb28b5f3 100644 --- a/pkg/util/constants/streams.go +++ b/pkg/util/constants/streams.go @@ -2,15 +2,18 @@ package constants // PostgreSQL specific constants const ( - EventStreamSourcePGType = "PostgresLogicalReplication" - EventStreamSourceSlotPrefix = "fes_" - EventStreamSourceAuthType = "DatabaseAuthenticationSecret" - EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent" - EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent" - EventStreamFlowDataTypeColumn = "data_type" - EventStreamFlowDataOpColumn = "data_op" - EventStreamFlowMetadataColumn = "metadata" - EventStreamFlowDataColumn = "data" - EventStreamSinkNakadiType = "Nakadi" - EventStreamSinkSqsType = "Sqs" + EventStreamSourcePGType = "PostgresLogicalReplication" + EventStreamSourceSlotPrefix = "fes_" + EventStreamSourceAuthType = "DatabaseAuthenticationSecret" + EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent" + EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" + EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent" + EventStreamFlowDataTypeColumn = "data_type" + EventStreamFlowDataOpColumn = "data_op" + EventStreamFlowMetadataColumn = "metadata" + EventStreamFlowDataColumn = "data" + EventStreamFlowPayloadColumn = "payload" + EventStreamSinkNakadiType = "Nakadi" + EventStreamSinkSqsStandardType = "SqsStandard" + EventStreamSinkSqsFifoType = "SqsFifo" )