diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 67007b522..3504d615d 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -247,16 +247,18 @@ type ConnectionPooler struct { // Stream defines properties for creating FabricEventStream resources type Stream struct { - ApplicationId string `json:"applicationId"` - Database string `json:"database"` - Tables map[string]StreamTable `json:"tables"` - Filter map[string]*string `json:"filter,omitempty"` - BatchSize *uint32 `json:"batchSize,omitempty"` + ApplicationId string `json:"applicationId"` + Database string `json:"database"` + Tables map[string]StreamTable `json:"tables"` + Filter map[string]*string `json:"filter,omitempty"` + BatchSize *uint32 `json:"batchSize,omitempty"` + EnableRecovery *bool `json:"enableRecovery,omitempty"` } // StreamTable defines properties of outbox tables for FabricEventStreams type StreamTable struct { - EventType string `json:"eventType"` - IdColumn *string `json:"idColumn,omitempty"` - PayloadColumn *string `json:"payloadColumn,omitempty"` + EventType string `json:"eventType"` + RecoveryEventType string `json:"recoveryEventType"` + IdColumn *string `json:"idColumn,omitempty"` + PayloadColumn *string `json:"payloadColumn,omitempty"` } diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 52e67989a..536feec73 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -1281,6 +1281,11 @@ func (in *Stream) DeepCopyInto(out *Stream) { *out = new(uint32) **out = **in } + if in.EnableRecovery != nil { + in, out := &in.EnableRecovery, &out.EnableRecovery + *out = new(bool) + **out = **in + } return } diff --git a/pkg/apis/zalando.org/v1/fabriceventstream.go b/pkg/apis/zalando.org/v1/fabriceventstream.go index 559c52d3e..609f3c9bc 100644 --- a/pkg/apis/zalando.org/v1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1/fabriceventstream.go @@ -33,9 +33,10 @@ type FabricEventStreamList struct { // EventStream defines the source, flow and sink of the event stream type EventStream struct { - EventStreamFlow EventStreamFlow `json:"flow"` - EventStreamSink EventStreamSink `json:"sink"` - EventStreamSource EventStreamSource `json:"source"` + EventStreamFlow EventStreamFlow `json:"flow"` + EventStreamSink EventStreamSink `json:"sink"` + EventStreamSource EventStreamSource `json:"source"` + EventStreamRecovery EventStreamRecovery `json:"recovery"` } // EventStreamFlow defines the flow characteristics of the event stream @@ -51,6 +52,12 @@ type EventStreamSink struct { MaxBatchSize *uint32 `json:"maxBatchSize,omitempty"` } +// EventStreamRecovery defines the target of dead letter queue +type EventStreamRecovery struct { + Type string `json:"type"` + Sink *EventStreamSink `json:"sink"` +} + // EventStreamSource defines the source of the event stream and connection for FES operator type EventStreamSource struct { Type string `json:"type"` diff --git a/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go index a44439a94..8a46b9a25 100644 --- a/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go @@ -33,6 +33,12 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Connection) DeepCopyInto(out *Connection) { *out = *in + if in.PublicationName != nil { + in, out := &in.PublicationName, &out.PublicationName + *out = new(string) + **out = **in + } + in.DBAuth.DeepCopyInto(&out.DBAuth) return } @@ -65,6 +71,10 @@ func (in *DBAuth) DeepCopy() *DBAuth { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EventStream) DeepCopyInto(out *EventStream) { *out = *in + in.EventStreamFlow.DeepCopyInto(&out.EventStreamFlow) + in.EventStreamRecovery.DeepCopyInto(&out.EventStreamRecovery) + in.EventStreamSink.DeepCopyInto(&out.EventStreamSink) + in.EventStreamSource.DeepCopyInto(&out.EventStreamSource) return } @@ -81,6 +91,11 @@ func (in *EventStream) DeepCopy() *EventStream { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EventStreamFlow) DeepCopyInto(out *EventStreamFlow) { *out = *in + if in.PayloadColumn != nil { + in, out := &in.PayloadColumn, &out.PayloadColumn + *out = new(string) + **out = **in + } return } @@ -94,9 +109,35 @@ func (in *EventStreamFlow) DeepCopy() *EventStreamFlow { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamRecovery) DeepCopyInto(out *EventStreamRecovery) { + *out = *in + if in.Sink != nil { + in, out := &in.Sink, &out.Sink + *out = new(EventStreamSink) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamRecovery) DeepCopy() *EventStreamRecovery { + if in == nil { + return nil + } + out := new(EventStreamRecovery) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EventStreamSink) DeepCopyInto(out *EventStreamSink) { *out = *in + if in.MaxBatchSize != nil { + in, out := &in.MaxBatchSize, &out.MaxBatchSize + *out = new(uint32) + **out = **in + } return } @@ -113,6 +154,13 @@ func (in *EventStreamSink) DeepCopy() *EventStreamSink { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EventStreamSource) DeepCopyInto(out *EventStreamSource) { *out = *in + in.Connection.DeepCopyInto(&out.Connection) + if in.Filter != nil { + in, out := &in.Filter, &out.Filter + *out = new(string) + **out = **in + } + in.EventStreamTable.DeepCopyInto(&out.EventStreamTable) return } @@ -129,6 +177,11 @@ func (in *EventStreamSource) DeepCopy() *EventStreamSource { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EventStreamTable) DeepCopyInto(out *EventStreamTable) { *out = *in + if in.IDColumn != nil { + in, out := &in.IDColumn, &out.IDColumn + *out = new(string) + **out = **in + } return } diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 683740af3..10df7974c 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -145,11 +145,13 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn) streamFlow := getEventStreamFlow(stream, table.PayloadColumn) streamSink := getEventStreamSink(stream, table.EventType) + streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType) eventStreams = append(eventStreams, zalandov1.EventStream{ - EventStreamFlow: streamFlow, - EventStreamSink: streamSink, - EventStreamSource: streamSource}) + EventStreamFlow: streamFlow, + EventStreamRecovery: streamRecovery, + EventStreamSink: streamSink, + EventStreamSource: streamSource}) } } @@ -204,6 +206,28 @@ func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventS } } +func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string) zalandov1.EventStreamRecovery { + if (stream.EnableRecovery != nil && !*stream.EnableRecovery) || + (stream.EnableRecovery == nil && recoveryEventType == "") { + return zalandov1.EventStreamRecovery{ + Type: constants.EventStreamRecoveryNoneType, + } + } + + if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" { + recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix) + } + + return zalandov1.EventStreamRecovery{ + Type: constants.EventStreamRecoveryDLQType, + Sink: &zalandov1.EventStreamSink{ + Type: constants.EventStreamSinkNakadiType, + EventType: recoveryEventType, + MaxBatchSize: stream.BatchSize, + }, + } +} + func getTableSchema(fullTableName string) (tableName, schemaName string) { schemaName = "public" tableName = fullTableName @@ -381,7 +405,8 @@ func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (matc for _, curStream := range curEventStreams { if reflect.DeepEqual(newStream.EventStreamSource, curStream.EventStreamSource) && reflect.DeepEqual(newStream.EventStreamFlow, curStream.EventStreamFlow) && - reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) { + reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) && + reflect.DeepEqual(newStream.EventStreamRecovery, curStream.EventStreamRecovery) { match = true break } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 00f18b7a2..7030c914e 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -65,9 +65,11 @@ var ( PayloadColumn: k8sutil.StringToPointer("b_payload"), }, "data.foobar": acidv1.StreamTable{ - EventType: "stream-type-b", + EventType: "stream-type-b", + RecoveryEventType: "stream-type-b-dlq", }, }, + EnableRecovery: util.True(), Filter: map[string]*string{ "data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"), }, @@ -106,6 +108,14 @@ var ( PayloadColumn: k8sutil.StringToPointer("b_payload"), Type: constants.EventStreamFlowPgGenericType, }, + EventStreamRecovery: zalandov1.EventStreamRecovery{ + Type: constants.EventStreamRecoveryDLQType, + Sink: &zalandov1.EventStreamSink{ + EventType: fmt.Sprintf("%s-%s", "stream-type-a", constants.EventStreamRecoverySuffix), + MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), + Type: constants.EventStreamSinkNakadiType, + }, + }, EventStreamSink: zalandov1.EventStreamSink{ EventType: "stream-type-a", MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), @@ -136,6 +146,14 @@ var ( EventStreamFlow: zalandov1.EventStreamFlow{ Type: constants.EventStreamFlowPgGenericType, }, + EventStreamRecovery: zalandov1.EventStreamRecovery{ + Type: constants.EventStreamRecoveryDLQType, + Sink: &zalandov1.EventStreamSink{ + EventType: "stream-type-b-dlq", + MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), + Type: constants.EventStreamSinkNakadiType, + }, + }, EventStreamSink: zalandov1.EventStreamSink{ EventType: "stream-type-b", MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), @@ -251,7 +269,8 @@ func TestSameStreams(t *testing.T) { testName := "TestSameStreams" stream1 := zalandov1.EventStream{ - EventStreamFlow: zalandov1.EventStreamFlow{}, + EventStreamFlow: zalandov1.EventStreamFlow{}, + EventStreamRecovery: zalandov1.EventStreamRecovery{}, EventStreamSink: zalandov1.EventStreamSink{ EventType: "stream-type-a", }, @@ -263,7 +282,8 @@ func TestSameStreams(t *testing.T) { } stream2 := zalandov1.EventStream{ - EventStreamFlow: zalandov1.EventStreamFlow{}, + EventStreamFlow: zalandov1.EventStreamFlow{}, + EventStreamRecovery: zalandov1.EventStreamRecovery{}, EventStreamSink: zalandov1.EventStreamSink{ EventType: "stream-type-b", }, diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go index bd70b719b..8916701f3 100644 --- a/pkg/util/constants/streams.go +++ b/pkg/util/constants/streams.go @@ -11,4 +11,7 @@ const ( EventStreamSourceAuthType = "DatabaseAuthenticationSecret" EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" EventStreamSinkNakadiType = "Nakadi" + EventStreamRecoveryNoneType = "None" + EventStreamRecoveryDLQType = "DeadLetter" + EventStreamRecoverySuffix = "dead-letter-queue" )