From 8f3699e43517d43c79f051d89b1e1ff8c05ed246 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Thu, 17 Mar 2022 19:07:17 +0100 Subject: [PATCH] turn optional stream fields to pointers --- pkg/apis/acid.zalan.do/v1/postgresql_type.go | 10 +++--- .../acid.zalan.do/v1/zz_generated.deepcopy.go | 29 +++++++++++++++-- pkg/apis/zalando.org/v1/fabriceventstream.go | 26 +++++++-------- pkg/cluster/streams.go | 22 ++----------- pkg/cluster/streams_test.go | 32 +++++++++---------- pkg/util/k8sutil/k8sutil.go | 8 +++++ 6 files changed, 71 insertions(+), 56 deletions(-) diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 50be9e551..5bb24ed10 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -239,12 +239,12 @@ type Stream struct { ApplicationId string `json:"applicationId"` Database string `json:"database"` Tables map[string]StreamTable `json:"tables"` - Filter map[string]string `json:"filter,omitempty"` - BatchSize uint32 `json:"batchSize,omitempty"` + Filter map[string]*string `json:"filter,omitempty"` + BatchSize *uint32 `json:"batchSize,omitempty"` } type StreamTable struct { - EventType string `json:"eventType"` - IdColumn string `json:"idColumn,omitempty" defaults:"id"` - PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"` + EventType string `json:"eventType"` + IdColumn *string `json:"idColumn,omitempty"` + PayloadColumn *string `json:"payloadColumn,omitempty"` } diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 91f6be563..3ed69d68f 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -1176,16 +1176,29 @@ func (in *Stream) DeepCopyInto(out *Stream) { in, out := &in.Tables, &out.Tables *out = make(map[string]StreamTable, len(*in)) for key, val := range *in { - (*out)[key] = val + (*out)[key] = *val.DeepCopy() } } if in.Filter != nil { in, out := &in.Filter, &out.Filter - *out = make(map[string]string, len(*in)) + *out = make(map[string]*string, len(*in)) for key, val := range *in { - (*out)[key] = val + var outVal *string + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(string) + **out = **in + } + (*out)[key] = outVal } } + if in.BatchSize != nil { + in, out := &in.BatchSize, &out.BatchSize + *out = new(uint32) + **out = **in + } return } @@ -1202,6 +1215,16 @@ func (in *Stream) DeepCopy() *Stream { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StreamTable) DeepCopyInto(out *StreamTable) { *out = *in + if in.IdColumn != nil { + in, out := &in.IdColumn, &out.IdColumn + *out = new(string) + **out = **in + } + if in.PayloadColumn != nil { + in, out := &in.PayloadColumn, &out.PayloadColumn + *out = new(string) + **out = **in + } return } diff --git a/pkg/apis/zalando.org/v1/fabriceventstream.go b/pkg/apis/zalando.org/v1/fabriceventstream.go index e4beea298..559c52d3e 100644 --- a/pkg/apis/zalando.org/v1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1/fabriceventstream.go @@ -40,15 +40,15 @@ type EventStream struct { // EventStreamFlow defines the flow characteristics of the event stream type EventStreamFlow struct { - Type string `json:"type"` - PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"` + Type string `json:"type"` + PayloadColumn *string `json:"payloadColumn,omitempty"` } // EventStreamSink defines the target of the event stream type EventStreamSink struct { - Type string `json:"type"` - EventType string `json:"eventType,omitempty"` - MaxBatchSize uint32 `json:"maxBatchSize,omitempty"` + Type string `json:"type"` + EventType string `json:"eventType,omitempty"` + MaxBatchSize *uint32 `json:"maxBatchSize,omitempty"` } // EventStreamSource defines the source of the event stream and connection for FES operator @@ -56,23 +56,23 @@ type EventStreamSource struct { Type string `json:"type"` Schema string `json:"schema,omitempty" defaults:"public"` EventStreamTable EventStreamTable `json:"table"` - Filter string `json:"filter,omitempty"` + Filter *string `json:"filter,omitempty"` Connection Connection `json:"jdbcConnection"` } // EventStreamTable defines the name and ID column to be used for streaming type EventStreamTable struct { - Name string `json:"name"` - IDColumn string `json:"idColumn,omitempty" defaults:"id"` + Name string `json:"name"` + IDColumn *string `json:"idColumn,omitempty"` } // Connection to be used for allowing the FES operator to connect to a database type Connection struct { - Url string `json:"jdbcUrl"` - SlotName string `json:"slotName"` - PluginType string `json:"pluginType,omitempty" defaults:"pgoutput"` - PublicationName string `json:"publicationName,omitempty"` - DBAuth DBAuth `json:"databaseAuthentication"` + Url string `json:"jdbcUrl"` + SlotName string `json:"slotName"` + PluginType string `json:"pluginType,omitempty"` + PublicationName *string `json:"publicationName,omitempty"` + DBAuth DBAuth `json:"databaseAuthentication"` } // DBAuth specifies the credentials to be used for connecting with the database diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 00d94f629..7325fa857 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -189,7 +189,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent } } -func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1.EventStreamSource { +func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName string, idColumn *string) zalandov1.EventStreamSource { table, schema := getTableSchema(tableName) streamFilter := stream.Filter[tableName] return zalandov1.EventStreamSource{ @@ -204,12 +204,7 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn } } -func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.EventStreamFlow { - if payloadColumn == "" { - return zalandov1.EventStreamFlow{ - Type: constants.EventStreamFlowPgGenericType, - } - } +func getEventStreamFlow(stream acidv1.Stream, payloadColumn *string) zalandov1.EventStreamFlow { return zalandov1.EventStreamFlow{ Type: constants.EventStreamFlowPgGenericType, PayloadColumn: payloadColumn, @@ -217,12 +212,6 @@ func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.Ev } func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventStreamSink { - if stream.BatchSize == 0 { - return zalandov1.EventStreamSink{ - Type: constants.EventStreamSinkNakadiType, - EventType: eventType, - } - } return zalandov1.EventStreamSink{ Type: constants.EventStreamSinkNakadiType, EventType: eventType, @@ -241,12 +230,7 @@ func getTableSchema(fullTableName string) (tableName, schemaName string) { return tableName, schemaName } -func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable { - if idColumn == "" { - return zalandov1.EventStreamTable{ - Name: tableName, - } - } +func getOutboxTable(tableName string, idColumn *string) zalandov1.EventStreamTable { return zalandov1.EventStreamTable{ Name: tableName, IDColumn: idColumn, diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index ea7a8e6a0..12d32d0d0 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -64,17 +64,17 @@ var ( Tables: map[string]acidv1.StreamTable{ "data.bar": acidv1.StreamTable{ EventType: "stream-type-a", - IdColumn: "b_id", - PayloadColumn: "b_payload", + IdColumn: k8sutil.StringPointer("b_id"), + PayloadColumn: k8sutil.StringPointer("b_payload"), }, "data.foobar": acidv1.StreamTable{ EventType: "stream-type-b", }, }, - Filter: map[string]string{ - "data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]", + Filter: map[string]*string{ + "data.bar": k8sutil.StringPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"), }, - BatchSize: uint32(100), + BatchSize: k8sutil.UInt32ToPointer(uint32(100)), }, }, Volume: acidv1.Volume{ @@ -105,16 +105,16 @@ var ( EventStreams: []zalandov1.EventStream{ zalandov1.EventStream{ EventStreamFlow: zalandov1.EventStreamFlow{ - PayloadColumn: "b_payload", + PayloadColumn: k8sutil.StringPointer("b_payload"), Type: constants.EventStreamFlowPgGenericType, }, EventStreamSink: zalandov1.EventStreamSink{ EventType: "stream-type-a", - MaxBatchSize: uint32(100), + MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), Type: constants.EventStreamSinkNakadiType, }, EventStreamSource: zalandov1.EventStreamSource{ - Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]", + Filter: k8sutil.StringPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"), Connection: zalandov1.Connection{ DBAuth: zalandov1.DBAuth{ Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName), @@ -128,7 +128,7 @@ var ( }, Schema: "data", EventStreamTable: zalandov1.EventStreamTable{ - IDColumn: "b_id", + IDColumn: k8sutil.StringPointer("b_id"), Name: "bar", }, Type: constants.EventStreamSourcePGType, @@ -140,7 +140,7 @@ var ( }, EventStreamSink: zalandov1.EventStreamSink{ EventType: "stream-type-b", - MaxBatchSize: uint32(100), + MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), Type: constants.EventStreamSinkNakadiType, }, EventStreamSource: zalandov1.EventStreamSource{ @@ -202,7 +202,7 @@ func TestGenerateFabricEventStream(t *testing.T) { // compare generated stream with expected stream result := cluster.generateFabricEventStream(appId) - if !reflect.DeepEqual(result, fes) { + if match, _ := sameStreams(result.Spec.EventStreams, fes.Spec.EventStreams); !match { t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result) } @@ -261,8 +261,8 @@ func TestSameStreams(t *testing.T) { }{ { subTest: "identical streams", - streamsA: fes.Spec.EventStreams, - streamsB: fes.Spec.EventStreams, + streamsA: []zalandov1.EventStream{stream1, stream2}, + streamsB: []zalandov1.EventStream{stream1, stream2}, match: true, reason: "", }, @@ -338,11 +338,11 @@ func TestUpdateFabricEventStream(t *testing.T) { Tables: map[string]acidv1.StreamTable{ "data.bar": acidv1.StreamTable{ EventType: "stream-type-c", - IdColumn: "b_id", - PayloadColumn: "b_payload", + IdColumn: k8sutil.StringPointer("b_id"), + PayloadColumn: k8sutil.StringPointer("b_payload"), }, }, - BatchSize: uint32(250), + BatchSize: k8sutil.UInt32ToPointer(uint32(250)), }, } patch, err := json.Marshal(struct { diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 0897777ee..7f05b03ca 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -37,6 +37,14 @@ func Int32ToPointer(value int32) *int32 { return &value } +func UInt32ToPointer(value uint32) *uint32 { + return &value +} + +func StringPointer(str string) *string { + return &str +} + // KubernetesClient describes getters for Kubernetes objects type KubernetesClient struct { corev1.SecretsGetter