From 1d88009ec4cc01bad192a1e882e00571179e91ba Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 18 Mar 2022 15:06:17 +0100 Subject: [PATCH] fix comparison of event stream array (#1817) * fix comparison of event stream array * turn optional stream fields to pointers --- pkg/apis/acid.zalan.do/v1/postgresql_type.go | 10 +- .../acid.zalan.do/v1/zz_generated.deepcopy.go | 29 ++- pkg/apis/zalando.org/v1/fabriceventstream.go | 26 +-- pkg/cluster/streams.go | 34 +++- pkg/cluster/streams_test.go | 186 +++++++++++++++--- pkg/util/k8sutil/k8sutil.go | 8 + 6 files changed, 236 insertions(+), 57 deletions(-) diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 50be9e551..5bb24ed10 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -239,12 +239,12 @@ type Stream struct { ApplicationId string `json:"applicationId"` Database string `json:"database"` Tables map[string]StreamTable `json:"tables"` - Filter map[string]string `json:"filter,omitempty"` - BatchSize uint32 `json:"batchSize,omitempty"` + Filter map[string]*string `json:"filter,omitempty"` + BatchSize *uint32 `json:"batchSize,omitempty"` } type StreamTable struct { - EventType string `json:"eventType"` - IdColumn string `json:"idColumn,omitempty" defaults:"id"` - PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"` + EventType string `json:"eventType"` + IdColumn *string `json:"idColumn,omitempty"` + PayloadColumn *string `json:"payloadColumn,omitempty"` } diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index d3f09f402..d7c3b1a86 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -1181,16 +1181,29 @@ func (in *Stream) DeepCopyInto(out *Stream) { in, out := &in.Tables, &out.Tables *out = make(map[string]StreamTable, len(*in)) for key, val := range *in { - (*out)[key] = val + (*out)[key] = *val.DeepCopy() } } if in.Filter != nil { in, out := &in.Filter, &out.Filter - *out = make(map[string]string, len(*in)) + *out = make(map[string]*string, len(*in)) for key, val := range *in { - (*out)[key] = val + var outVal *string + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(string) + **out = **in + } + (*out)[key] = outVal } } + if in.BatchSize != nil { + in, out := &in.BatchSize, &out.BatchSize + *out = new(uint32) + **out = **in + } return } @@ -1207,6 +1220,16 @@ func (in *Stream) DeepCopy() *Stream { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StreamTable) DeepCopyInto(out *StreamTable) { *out = *in + if in.IdColumn != nil { + in, out := &in.IdColumn, &out.IdColumn + *out = new(string) + **out = **in + } + if in.PayloadColumn != nil { + in, out := &in.PayloadColumn, &out.PayloadColumn + *out = new(string) + **out = **in + } return } diff --git a/pkg/apis/zalando.org/v1/fabriceventstream.go b/pkg/apis/zalando.org/v1/fabriceventstream.go index 65d082c1e..559c52d3e 100644 --- a/pkg/apis/zalando.org/v1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1/fabriceventstream.go @@ -40,15 +40,15 @@ type EventStream struct { // EventStreamFlow defines the flow characteristics of the event stream type EventStreamFlow struct { - Type string `json:"type"` - PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"` + Type string `json:"type"` + PayloadColumn *string `json:"payloadColumn,omitempty"` } // EventStreamSink defines the target of the event stream type EventStreamSink struct { - Type string `json:"type"` - EventType string `json:"eventType,omitempty"` - MaxBatchSize uint32 `json:"maxBatchSize,omitempty"` + Type string `json:"type"` + EventType string `json:"eventType,omitempty"` + MaxBatchSize *uint32 `json:"maxBatchSize,omitempty"` } // EventStreamSource defines the source of the event stream and connection for FES operator @@ -56,23 +56,23 @@ type EventStreamSource struct { Type string `json:"type"` Schema string `json:"schema,omitempty" defaults:"public"` EventStreamTable EventStreamTable `json:"table"` - Filter string `json:"filter,omitempty"` + Filter *string `json:"filter,omitempty"` Connection Connection `json:"jdbcConnection"` } // EventStreamTable defines the name and ID column to be used for streaming type EventStreamTable struct { - Name string `json:"name"` - IDColumn string `json:"idColumn,omitempty" defaults:"id"` + Name string `json:"name"` + IDColumn *string `json:"idColumn,omitempty"` } // Connection to be used for allowing the FES operator to connect to a database type Connection struct { - Url string `json:"jdbcUrl"` - SlotName string `json:"slotName"` - PluginType string `json:"pluginType,omitempty" defaults:"wal2json"` - PublicationName string `json:"publicationName,omitempty"` - DBAuth DBAuth `json:"databaseAuthentication"` + Url string `json:"jdbcUrl"` + SlotName string `json:"slotName"` + PluginType string `json:"pluginType,omitempty"` + PublicationName *string `json:"publicationName,omitempty"` + DBAuth DBAuth `json:"databaseAuthentication"` } // DBAuth specifies the credentials to be used for connecting with the database diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index a1a1b4d4a..7325fa857 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -189,7 +189,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent } } -func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1.EventStreamSource { +func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName string, idColumn *string) zalandov1.EventStreamSource { table, schema := getTableSchema(tableName) streamFilter := stream.Filter[tableName] return zalandov1.EventStreamSource{ @@ -204,7 +204,7 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn } } -func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.EventStreamFlow { +func getEventStreamFlow(stream acidv1.Stream, payloadColumn *string) zalandov1.EventStreamFlow { return zalandov1.EventStreamFlow{ Type: constants.EventStreamFlowPgGenericType, PayloadColumn: payloadColumn, @@ -230,7 +230,7 @@ func getTableSchema(fullTableName string) (tableName, schemaName string) { return tableName, schemaName } -func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable { +func getOutboxTable(tableName string, idColumn *string) zalandov1.EventStreamTable { return zalandov1.EventStreamTable{ Name: tableName, IDColumn: idColumn, @@ -347,8 +347,8 @@ func (c *Cluster) createOrUpdateStreams() error { c.logger.Infof("event stream %q has been successfully created", fesName) } else { desiredStreams := c.generateFabricEventStream(appId) - if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { - c.logger.Debug("updating event streams") + if match, reason := sameStreams(effectiveStreams.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { + c.logger.Debugf("updating event streams: %s", reason) desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion err = c.updateStreams(desiredStreams) if err != nil { @@ -361,3 +361,27 @@ func (c *Cluster) createOrUpdateStreams() error { return nil } + +func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) { + if len(newEventStreams) != len(curEventStreams) { + return false, "number of defined streams is different" + } + + for _, newStream := range newEventStreams { + match = false + reason = "event stream specs differ" + for _, curStream := range curEventStreams { + if reflect.DeepEqual(newStream.EventStreamSource, curStream.EventStreamSource) && + reflect.DeepEqual(newStream.EventStreamFlow, curStream.EventStreamFlow) && + reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) { + match = true + break + } + } + if !match { + return false, reason + } + } + + return true, "" +} diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 89dd294ca..7ebb1c89a 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - v1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" + zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" fakezalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" @@ -63,15 +63,18 @@ var ( Database: "foo", Tables: map[string]acidv1.StreamTable{ "data.bar": acidv1.StreamTable{ - EventType: "stream_type_a", - IdColumn: "b_id", - PayloadColumn: "b_payload", + EventType: "stream-type-a", + IdColumn: k8sutil.StringToPointer("b_id"), + PayloadColumn: k8sutil.StringToPointer("b_payload"), + }, + "data.foobar": acidv1.StreamTable{ + EventType: "stream-type-b", }, }, - Filter: map[string]string{ - "data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]", + Filter: map[string]*string{ + "data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"), }, - BatchSize: uint32(100), + BatchSize: k8sutil.UInt32ToPointer(uint32(100)), }, }, Volume: acidv1.Volume{ @@ -80,7 +83,7 @@ var ( }, } - fes = &v1.FabricEventStream{ + fes = &zalandov1.FabricEventStream{ TypeMeta: metav1.TypeMeta{ APIVersion: constants.EventStreamCRDApiVersion, Kind: constants.EventStreamCRDKind, @@ -97,23 +100,23 @@ var ( }, }, }, - Spec: v1.FabricEventStreamSpec{ + Spec: zalandov1.FabricEventStreamSpec{ ApplicationId: appId, - EventStreams: []v1.EventStream{ - { - EventStreamFlow: v1.EventStreamFlow{ - PayloadColumn: "b_payload", + EventStreams: []zalandov1.EventStream{ + zalandov1.EventStream{ + EventStreamFlow: zalandov1.EventStreamFlow{ + PayloadColumn: k8sutil.StringToPointer("b_payload"), Type: constants.EventStreamFlowPgGenericType, }, - EventStreamSink: v1.EventStreamSink{ - EventType: "stream_type_a", - MaxBatchSize: uint32(100), + EventStreamSink: zalandov1.EventStreamSink{ + EventType: "stream-type-a", + MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), Type: constants.EventStreamSinkNakadiType, }, - EventStreamSource: v1.EventStreamSource{ - Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]", - Connection: v1.Connection{ - DBAuth: v1.DBAuth{ + EventStreamSource: zalandov1.EventStreamSource{ + Filter: k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"), + Connection: zalandov1.Connection{ + DBAuth: zalandov1.DBAuth{ Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName), PasswordKey: "password", Type: constants.EventStreamSourceAuthType, @@ -124,13 +127,41 @@ var ( PluginType: constants.EventStreamSourcePluginType, }, Schema: "data", - EventStreamTable: v1.EventStreamTable{ - IDColumn: "b_id", + EventStreamTable: zalandov1.EventStreamTable{ + IDColumn: k8sutil.StringToPointer("b_id"), Name: "bar", }, Type: constants.EventStreamSourcePGType, }, }, + zalandov1.EventStream{ + EventStreamFlow: zalandov1.EventStreamFlow{ + Type: constants.EventStreamFlowPgGenericType, + }, + EventStreamSink: zalandov1.EventStreamSink{ + EventType: "stream-type-b", + MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)), + Type: constants.EventStreamSinkNakadiType, + }, + EventStreamSource: zalandov1.EventStreamSource{ + Connection: zalandov1.Connection{ + DBAuth: zalandov1.DBAuth{ + Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName), + PasswordKey: "password", + Type: constants.EventStreamSourceAuthType, + UserKey: "username", + }, + Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), + SlotName: slotName, + PluginType: constants.EventStreamSourcePluginType, + }, + Schema: "data", + EventStreamTable: zalandov1.EventStreamTable{ + Name: "foobar", + }, + Type: constants.EventStreamSourcePGType, + }, + }, }, }, } @@ -161,23 +192,116 @@ func TestGenerateFabricEventStream(t *testing.T) { cluster.Name = clusterName cluster.Namespace = namespace + // create statefulset to have ownerReference for streams _, err := cluster.createStatefulSet() assert.NoError(t, err) + // create the streams err = cluster.createOrUpdateStreams() assert.NoError(t, err) + // compare generated stream with expected stream result := cluster.generateFabricEventStream(appId) - - if !reflect.DeepEqual(result, fes) { - t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, result) + if match, _ := sameStreams(result.Spec.EventStreams, fes.Spec.EventStreams); !match { + t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result) } + // compare stream resturned from API with expected stream streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) assert.NoError(t, err) + if match, _ := sameStreams(streamCRD.Spec.EventStreams, fes.Spec.EventStreams); !match { + t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streamCRD) + } - if !reflect.DeepEqual(streamCRD, fes) { - t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, streamCRD) + // sync streams once again + err = cluster.createOrUpdateStreams() + assert.NoError(t, err) + + // compare stream resturned from API with generated stream + streamCRD, err = cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) + assert.NoError(t, err) + if match, _ := sameStreams(streamCRD.Spec.EventStreams, result.Spec.EventStreams); !match { + t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streamCRD) + } +} + +func TestSameStreams(t *testing.T) { + testName := "TestSameStreams" + + stream1 := zalandov1.EventStream{ + EventStreamFlow: zalandov1.EventStreamFlow{}, + EventStreamSink: zalandov1.EventStreamSink{ + EventType: "stream-type-a", + }, + EventStreamSource: zalandov1.EventStreamSource{ + EventStreamTable: zalandov1.EventStreamTable{ + Name: "foo", + }, + }, + } + + stream2 := zalandov1.EventStream{ + EventStreamFlow: zalandov1.EventStreamFlow{}, + EventStreamSink: zalandov1.EventStreamSink{ + EventType: "stream-type-b", + }, + EventStreamSource: zalandov1.EventStreamSource{ + EventStreamTable: zalandov1.EventStreamTable{ + Name: "bar", + }, + }, + } + + tests := []struct { + subTest string + streamsA []zalandov1.EventStream + streamsB []zalandov1.EventStream + match bool + reason string + }{ + { + subTest: "identical streams", + streamsA: []zalandov1.EventStream{stream1, stream2}, + streamsB: []zalandov1.EventStream{stream1, stream2}, + match: true, + reason: "", + }, + { + subTest: "same streams different order", + streamsA: []zalandov1.EventStream{stream1, stream2}, + streamsB: []zalandov1.EventStream{stream2, stream1}, + match: true, + reason: "", + }, + { + subTest: "same streams different order", + streamsA: []zalandov1.EventStream{stream1}, + streamsB: []zalandov1.EventStream{stream1, stream2}, + match: false, + reason: "number of defined streams is different", + }, + { + subTest: "different number of streams", + streamsA: []zalandov1.EventStream{stream1}, + streamsB: []zalandov1.EventStream{stream1, stream2}, + match: false, + reason: "number of defined streams is different", + }, + { + subTest: "event stream specs differ", + streamsA: []zalandov1.EventStream{stream1, stream2}, + streamsB: fes.Spec.EventStreams, + match: false, + reason: "number of defined streams is different", + }, + } + + for _, tt := range tests { + streamsMatch, matchReason := sameStreams(tt.streamsA, tt.streamsB) + if streamsMatch != tt.match { + t.Errorf("%s %s: unexpected match result when comparing streams: got %s, epxected %s", + testName, tt.subTest, matchReason, tt.reason) + } } } @@ -213,12 +337,12 @@ func TestUpdateFabricEventStream(t *testing.T) { Database: dbName, Tables: map[string]acidv1.StreamTable{ "data.bar": acidv1.StreamTable{ - EventType: "stream_type_b", - IdColumn: "b_id", - PayloadColumn: "b_payload", + EventType: "stream-type-c", + IdColumn: k8sutil.StringToPointer("b_id"), + PayloadColumn: k8sutil.StringToPointer("b_payload"), }, }, - BatchSize: uint32(250), + BatchSize: k8sutil.UInt32ToPointer(uint32(250)), }, } patch, err := json.Marshal(struct { diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 0897777ee..fd5de8195 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -37,6 +37,14 @@ func Int32ToPointer(value int32) *int32 { return &value } +func UInt32ToPointer(value uint32) *uint32 { + return &value +} + +func StringToPointer(str string) *string { + return &str +} + // KubernetesClient describes getters for Kubernetes objects type KubernetesClient struct { corev1.SecretsGetter