add support for recovery section in event streams (#2421)

This commit is contained in:
Felix Kunde 2023-09-19 17:15:50 +02:00 committed by GitHub
parent 102a22e486
commit e03fdaaa51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 133 additions and 18 deletions

View File

@ -247,16 +247,18 @@ type ConnectionPooler struct {
// Stream defines properties for creating FabricEventStream resources // Stream defines properties for creating FabricEventStream resources
type Stream struct { type Stream struct {
ApplicationId string `json:"applicationId"` ApplicationId string `json:"applicationId"`
Database string `json:"database"` Database string `json:"database"`
Tables map[string]StreamTable `json:"tables"` Tables map[string]StreamTable `json:"tables"`
Filter map[string]*string `json:"filter,omitempty"` Filter map[string]*string `json:"filter,omitempty"`
BatchSize *uint32 `json:"batchSize,omitempty"` BatchSize *uint32 `json:"batchSize,omitempty"`
EnableRecovery *bool `json:"enableRecovery,omitempty"`
} }
// StreamTable defines properties of outbox tables for FabricEventStreams // StreamTable defines properties of outbox tables for FabricEventStreams
type StreamTable struct { type StreamTable struct {
EventType string `json:"eventType"` EventType string `json:"eventType"`
IdColumn *string `json:"idColumn,omitempty"` RecoveryEventType string `json:"recoveryEventType"`
PayloadColumn *string `json:"payloadColumn,omitempty"` IdColumn *string `json:"idColumn,omitempty"`
PayloadColumn *string `json:"payloadColumn,omitempty"`
} }

View File

@ -1281,6 +1281,11 @@ func (in *Stream) DeepCopyInto(out *Stream) {
*out = new(uint32) *out = new(uint32)
**out = **in **out = **in
} }
if in.EnableRecovery != nil {
in, out := &in.EnableRecovery, &out.EnableRecovery
*out = new(bool)
**out = **in
}
return return
} }

View File

@ -33,9 +33,10 @@ type FabricEventStreamList struct {
// EventStream defines the source, flow and sink of the event stream // EventStream defines the source, flow and sink of the event stream
type EventStream struct { type EventStream struct {
EventStreamFlow EventStreamFlow `json:"flow"` EventStreamFlow EventStreamFlow `json:"flow"`
EventStreamSink EventStreamSink `json:"sink"` EventStreamSink EventStreamSink `json:"sink"`
EventStreamSource EventStreamSource `json:"source"` EventStreamSource EventStreamSource `json:"source"`
EventStreamRecovery EventStreamRecovery `json:"recovery"`
} }
// EventStreamFlow defines the flow characteristics of the event stream // EventStreamFlow defines the flow characteristics of the event stream
@ -51,6 +52,12 @@ type EventStreamSink struct {
MaxBatchSize *uint32 `json:"maxBatchSize,omitempty"` MaxBatchSize *uint32 `json:"maxBatchSize,omitempty"`
} }
// EventStreamRecovery defines the target of dead letter queue
type EventStreamRecovery struct {
Type string `json:"type"`
Sink *EventStreamSink `json:"sink"`
}
// EventStreamSource defines the source of the event stream and connection for FES operator // EventStreamSource defines the source of the event stream and connection for FES operator
type EventStreamSource struct { type EventStreamSource struct {
Type string `json:"type"` Type string `json:"type"`

View File

@ -33,6 +33,12 @@ import (
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Connection) DeepCopyInto(out *Connection) { func (in *Connection) DeepCopyInto(out *Connection) {
*out = *in *out = *in
if in.PublicationName != nil {
in, out := &in.PublicationName, &out.PublicationName
*out = new(string)
**out = **in
}
in.DBAuth.DeepCopyInto(&out.DBAuth)
return return
} }
@ -65,6 +71,10 @@ func (in *DBAuth) DeepCopy() *DBAuth {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventStream) DeepCopyInto(out *EventStream) { func (in *EventStream) DeepCopyInto(out *EventStream) {
*out = *in *out = *in
in.EventStreamFlow.DeepCopyInto(&out.EventStreamFlow)
in.EventStreamRecovery.DeepCopyInto(&out.EventStreamRecovery)
in.EventStreamSink.DeepCopyInto(&out.EventStreamSink)
in.EventStreamSource.DeepCopyInto(&out.EventStreamSource)
return return
} }
@ -81,6 +91,11 @@ func (in *EventStream) DeepCopy() *EventStream {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventStreamFlow) DeepCopyInto(out *EventStreamFlow) { func (in *EventStreamFlow) DeepCopyInto(out *EventStreamFlow) {
*out = *in *out = *in
if in.PayloadColumn != nil {
in, out := &in.PayloadColumn, &out.PayloadColumn
*out = new(string)
**out = **in
}
return return
} }
@ -94,9 +109,35 @@ func (in *EventStreamFlow) DeepCopy() *EventStreamFlow {
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventStreamRecovery) DeepCopyInto(out *EventStreamRecovery) {
*out = *in
if in.Sink != nil {
in, out := &in.Sink, &out.Sink
*out = new(EventStreamSink)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventStreamRecovery) DeepCopy() *EventStreamRecovery {
if in == nil {
return nil
}
out := new(EventStreamRecovery)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventStreamSink) DeepCopyInto(out *EventStreamSink) { func (in *EventStreamSink) DeepCopyInto(out *EventStreamSink) {
*out = *in *out = *in
if in.MaxBatchSize != nil {
in, out := &in.MaxBatchSize, &out.MaxBatchSize
*out = new(uint32)
**out = **in
}
return return
} }
@ -113,6 +154,13 @@ func (in *EventStreamSink) DeepCopy() *EventStreamSink {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventStreamSource) DeepCopyInto(out *EventStreamSource) { func (in *EventStreamSource) DeepCopyInto(out *EventStreamSource) {
*out = *in *out = *in
in.Connection.DeepCopyInto(&out.Connection)
if in.Filter != nil {
in, out := &in.Filter, &out.Filter
*out = new(string)
**out = **in
}
in.EventStreamTable.DeepCopyInto(&out.EventStreamTable)
return return
} }
@ -129,6 +177,11 @@ func (in *EventStreamSource) DeepCopy() *EventStreamSource {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventStreamTable) DeepCopyInto(out *EventStreamTable) { func (in *EventStreamTable) DeepCopyInto(out *EventStreamTable) {
*out = *in *out = *in
if in.IDColumn != nil {
in, out := &in.IDColumn, &out.IDColumn
*out = new(string)
**out = **in
}
return return
} }

View File

@ -145,11 +145,13 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn) streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn)
streamFlow := getEventStreamFlow(stream, table.PayloadColumn) streamFlow := getEventStreamFlow(stream, table.PayloadColumn)
streamSink := getEventStreamSink(stream, table.EventType) streamSink := getEventStreamSink(stream, table.EventType)
streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType)
eventStreams = append(eventStreams, zalandov1.EventStream{ eventStreams = append(eventStreams, zalandov1.EventStream{
EventStreamFlow: streamFlow, EventStreamFlow: streamFlow,
EventStreamSink: streamSink, EventStreamRecovery: streamRecovery,
EventStreamSource: streamSource}) EventStreamSink: streamSink,
EventStreamSource: streamSource})
} }
} }
@ -204,6 +206,28 @@ func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventS
} }
} }
func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string) zalandov1.EventStreamRecovery {
if (stream.EnableRecovery != nil && !*stream.EnableRecovery) ||
(stream.EnableRecovery == nil && recoveryEventType == "") {
return zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryNoneType,
}
}
if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" {
recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix)
}
return zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryDLQType,
Sink: &zalandov1.EventStreamSink{
Type: constants.EventStreamSinkNakadiType,
EventType: recoveryEventType,
MaxBatchSize: stream.BatchSize,
},
}
}
func getTableSchema(fullTableName string) (tableName, schemaName string) { func getTableSchema(fullTableName string) (tableName, schemaName string) {
schemaName = "public" schemaName = "public"
tableName = fullTableName tableName = fullTableName
@ -381,7 +405,8 @@ func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (matc
for _, curStream := range curEventStreams { for _, curStream := range curEventStreams {
if reflect.DeepEqual(newStream.EventStreamSource, curStream.EventStreamSource) && if reflect.DeepEqual(newStream.EventStreamSource, curStream.EventStreamSource) &&
reflect.DeepEqual(newStream.EventStreamFlow, curStream.EventStreamFlow) && reflect.DeepEqual(newStream.EventStreamFlow, curStream.EventStreamFlow) &&
reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) { reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) &&
reflect.DeepEqual(newStream.EventStreamRecovery, curStream.EventStreamRecovery) {
match = true match = true
break break
} }

View File

@ -65,9 +65,11 @@ var (
PayloadColumn: k8sutil.StringToPointer("b_payload"), PayloadColumn: k8sutil.StringToPointer("b_payload"),
}, },
"data.foobar": acidv1.StreamTable{ "data.foobar": acidv1.StreamTable{
EventType: "stream-type-b", EventType: "stream-type-b",
RecoveryEventType: "stream-type-b-dlq",
}, },
}, },
EnableRecovery: util.True(),
Filter: map[string]*string{ Filter: map[string]*string{
"data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"), "data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
}, },
@ -106,6 +108,14 @@ var (
PayloadColumn: k8sutil.StringToPointer("b_payload"), PayloadColumn: k8sutil.StringToPointer("b_payload"),
Type: constants.EventStreamFlowPgGenericType, Type: constants.EventStreamFlowPgGenericType,
}, },
EventStreamRecovery: zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryDLQType,
Sink: &zalandov1.EventStreamSink{
EventType: fmt.Sprintf("%s-%s", "stream-type-a", constants.EventStreamRecoverySuffix),
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
Type: constants.EventStreamSinkNakadiType,
},
},
EventStreamSink: zalandov1.EventStreamSink{ EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-a", EventType: "stream-type-a",
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
@ -136,6 +146,14 @@ var (
EventStreamFlow: zalandov1.EventStreamFlow{ EventStreamFlow: zalandov1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType, Type: constants.EventStreamFlowPgGenericType,
}, },
EventStreamRecovery: zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryDLQType,
Sink: &zalandov1.EventStreamSink{
EventType: "stream-type-b-dlq",
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
Type: constants.EventStreamSinkNakadiType,
},
},
EventStreamSink: zalandov1.EventStreamSink{ EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-b", EventType: "stream-type-b",
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
@ -251,7 +269,8 @@ func TestSameStreams(t *testing.T) {
testName := "TestSameStreams" testName := "TestSameStreams"
stream1 := zalandov1.EventStream{ stream1 := zalandov1.EventStream{
EventStreamFlow: zalandov1.EventStreamFlow{}, EventStreamFlow: zalandov1.EventStreamFlow{},
EventStreamRecovery: zalandov1.EventStreamRecovery{},
EventStreamSink: zalandov1.EventStreamSink{ EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-a", EventType: "stream-type-a",
}, },
@ -263,7 +282,8 @@ func TestSameStreams(t *testing.T) {
} }
stream2 := zalandov1.EventStream{ stream2 := zalandov1.EventStream{
EventStreamFlow: zalandov1.EventStreamFlow{}, EventStreamFlow: zalandov1.EventStreamFlow{},
EventStreamRecovery: zalandov1.EventStreamRecovery{},
EventStreamSink: zalandov1.EventStreamSink{ EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-b", EventType: "stream-type-b",
}, },

View File

@ -11,4 +11,7 @@ const (
EventStreamSourceAuthType = "DatabaseAuthenticationSecret" EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
EventStreamSinkNakadiType = "Nakadi" EventStreamSinkNakadiType = "Nakadi"
EventStreamRecoveryNoneType = "None"
EventStreamRecoveryDLQType = "DeadLetter"
EventStreamRecoverySuffix = "dead-letter-queue"
) )