diff --git a/pkg/apis/zalando.org/v1/fabriceventstream.go b/pkg/apis/zalando.org/v1/fabriceventstream.go index 65d082c1e..e4beea298 100644 --- a/pkg/apis/zalando.org/v1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1/fabriceventstream.go @@ -70,7 +70,7 @@ type EventStreamTable struct { type Connection struct { Url string `json:"jdbcUrl"` SlotName string `json:"slotName"` - PluginType string `json:"pluginType,omitempty" defaults:"wal2json"` + PluginType string `json:"pluginType,omitempty" defaults:"pgoutput"` PublicationName string `json:"publicationName,omitempty"` DBAuth DBAuth `json:"databaseAuthentication"` } diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index a1a1b4d4a..00d94f629 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -205,6 +205,11 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn } func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.EventStreamFlow { + if payloadColumn == "" { + return zalandov1.EventStreamFlow{ + Type: constants.EventStreamFlowPgGenericType, + } + } return zalandov1.EventStreamFlow{ Type: constants.EventStreamFlowPgGenericType, PayloadColumn: payloadColumn, @@ -212,6 +217,12 @@ func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.Ev } func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventStreamSink { + if stream.BatchSize == 0 { + return zalandov1.EventStreamSink{ + Type: constants.EventStreamSinkNakadiType, + EventType: eventType, + } + } return zalandov1.EventStreamSink{ Type: constants.EventStreamSinkNakadiType, EventType: eventType, @@ -231,6 +242,11 @@ func getTableSchema(fullTableName string) (tableName, schemaName string) { } func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable { + if idColumn == "" { + return zalandov1.EventStreamTable{ + Name: tableName, + } + } return zalandov1.EventStreamTable{ Name: tableName, IDColumn: idColumn, @@ -347,8 +363,8 @@ func (c *Cluster) createOrUpdateStreams() error { c.logger.Infof("event stream %q has been successfully created", fesName) } else { desiredStreams := c.generateFabricEventStream(appId) - if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { - c.logger.Debug("updating event streams") + if match, reason := sameStreams(effectiveStreams.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { + c.logger.Debugf("updating event streams: %s", reason) desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion err = c.updateStreams(desiredStreams) if err != nil { @@ -361,3 +377,27 @@ func (c *Cluster) createOrUpdateStreams() error { return nil } + +func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) { + if len(newEventStreams) != len(curEventStreams) { + return false, "number of defined streams is different" + } + + for _, newStream := range newEventStreams { + match = false + reason = "event stream specs differ" + for _, curStream := range curEventStreams { + if reflect.DeepEqual(newStream.EventStreamSource, curStream.EventStreamSource) && + reflect.DeepEqual(newStream.EventStreamFlow, curStream.EventStreamFlow) && + reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) { + match = true + break + } + } + if !match { + return false, reason + } + } + + return true, "" +} diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 89dd294ca..ea7a8e6a0 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - v1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" + zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" fakezalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" @@ -63,10 +63,13 @@ var ( Database: "foo", Tables: map[string]acidv1.StreamTable{ "data.bar": acidv1.StreamTable{ - EventType: "stream_type_a", + EventType: "stream-type-a", IdColumn: "b_id", PayloadColumn: "b_payload", }, + "data.foobar": acidv1.StreamTable{ + EventType: "stream-type-b", + }, }, Filter: map[string]string{ "data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]", @@ -80,7 +83,7 @@ var ( }, } - fes = &v1.FabricEventStream{ + fes = &zalandov1.FabricEventStream{ TypeMeta: metav1.TypeMeta{ APIVersion: constants.EventStreamCRDApiVersion, Kind: constants.EventStreamCRDKind, @@ -97,23 +100,23 @@ var ( }, }, }, - Spec: v1.FabricEventStreamSpec{ + Spec: zalandov1.FabricEventStreamSpec{ ApplicationId: appId, - EventStreams: []v1.EventStream{ - { - EventStreamFlow: v1.EventStreamFlow{ + EventStreams: []zalandov1.EventStream{ + zalandov1.EventStream{ + EventStreamFlow: zalandov1.EventStreamFlow{ PayloadColumn: "b_payload", Type: constants.EventStreamFlowPgGenericType, }, - EventStreamSink: v1.EventStreamSink{ - EventType: "stream_type_a", + EventStreamSink: zalandov1.EventStreamSink{ + EventType: "stream-type-a", MaxBatchSize: uint32(100), Type: constants.EventStreamSinkNakadiType, }, - EventStreamSource: v1.EventStreamSource{ + EventStreamSource: zalandov1.EventStreamSource{ Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]", - Connection: v1.Connection{ - DBAuth: v1.DBAuth{ + Connection: zalandov1.Connection{ + DBAuth: zalandov1.DBAuth{ Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName), PasswordKey: "password", Type: constants.EventStreamSourceAuthType, @@ -124,13 +127,41 @@ var ( PluginType: constants.EventStreamSourcePluginType, }, Schema: "data", - EventStreamTable: v1.EventStreamTable{ + EventStreamTable: zalandov1.EventStreamTable{ IDColumn: "b_id", Name: "bar", }, Type: constants.EventStreamSourcePGType, }, }, + zalandov1.EventStream{ + EventStreamFlow: zalandov1.EventStreamFlow{ + Type: constants.EventStreamFlowPgGenericType, + }, + EventStreamSink: zalandov1.EventStreamSink{ + EventType: "stream-type-b", + MaxBatchSize: uint32(100), + Type: constants.EventStreamSinkNakadiType, + }, + EventStreamSource: zalandov1.EventStreamSource{ + Connection: zalandov1.Connection{ + DBAuth: zalandov1.DBAuth{ + Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName), + PasswordKey: "password", + Type: constants.EventStreamSourceAuthType, + UserKey: "username", + }, + Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), + SlotName: slotName, + PluginType: constants.EventStreamSourcePluginType, + }, + Schema: "data", + EventStreamTable: zalandov1.EventStreamTable{ + Name: "foobar", + }, + Type: constants.EventStreamSourcePGType, + }, + }, }, }, } @@ -161,23 +192,116 @@ func TestGenerateFabricEventStream(t *testing.T) { cluster.Name = clusterName cluster.Namespace = namespace + // create statefulset to have ownerReference for streams _, err := cluster.createStatefulSet() assert.NoError(t, err) + // create the streams err = cluster.createOrUpdateStreams() assert.NoError(t, err) + // compare generated stream with expected stream result := cluster.generateFabricEventStream(appId) - if !reflect.DeepEqual(result, fes) { - t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, result) + t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result) } + // compare stream resturned from API with expected stream streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) assert.NoError(t, err) + if match, _ := sameStreams(streamCRD.Spec.EventStreams, fes.Spec.EventStreams); !match { + t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streamCRD) + } - if !reflect.DeepEqual(streamCRD, fes) { - t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, streamCRD) + // sync streams once again + err = cluster.createOrUpdateStreams() + assert.NoError(t, err) + + // compare stream resturned from API with generated stream + streamCRD, err = cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) + assert.NoError(t, err) + if match, _ := sameStreams(streamCRD.Spec.EventStreams, result.Spec.EventStreams); !match { + t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streamCRD) + } +} + +func TestSameStreams(t *testing.T) { + testName := "TestSameStreams" + + stream1 := zalandov1.EventStream{ + EventStreamFlow: zalandov1.EventStreamFlow{}, + EventStreamSink: zalandov1.EventStreamSink{ + EventType: "stream-type-a", + }, + EventStreamSource: zalandov1.EventStreamSource{ + EventStreamTable: zalandov1.EventStreamTable{ + Name: "foo", + }, + }, + } + + stream2 := zalandov1.EventStream{ + EventStreamFlow: zalandov1.EventStreamFlow{}, + EventStreamSink: zalandov1.EventStreamSink{ + EventType: "stream-type-b", + }, + EventStreamSource: zalandov1.EventStreamSource{ + EventStreamTable: zalandov1.EventStreamTable{ + Name: "bar", + }, + }, + } + + tests := []struct { + subTest string + streamsA []zalandov1.EventStream + streamsB []zalandov1.EventStream + match bool + reason string + }{ + { + subTest: "identical streams", + streamsA: fes.Spec.EventStreams, + streamsB: fes.Spec.EventStreams, + match: true, + reason: "", + }, + { + subTest: "same streams different order", + streamsA: []zalandov1.EventStream{stream1, stream2}, + streamsB: []zalandov1.EventStream{stream2, stream1}, + match: true, + reason: "", + }, + { + subTest: "same streams different order", + streamsA: []zalandov1.EventStream{stream1}, + streamsB: []zalandov1.EventStream{stream1, stream2}, + match: false, + reason: "number of defined streams is different", + }, + { + subTest: "different number of streams", + streamsA: []zalandov1.EventStream{stream1}, + streamsB: []zalandov1.EventStream{stream1, stream2}, + match: false, + reason: "number of defined streams is different", + }, + { + subTest: "event stream specs differ", + streamsA: []zalandov1.EventStream{stream1, stream2}, + streamsB: fes.Spec.EventStreams, + match: false, + reason: "number of defined streams is different", + }, + } + + for _, tt := range tests { + streamsMatch, matchReason := sameStreams(tt.streamsA, tt.streamsB) + if streamsMatch != tt.match { + t.Errorf("%s %s: unexpected match result when comparing streams: got %s, epxected %s", + testName, tt.subTest, matchReason, tt.reason) + } } } @@ -213,7 +337,7 @@ func TestUpdateFabricEventStream(t *testing.T) { Database: dbName, Tables: map[string]acidv1.StreamTable{ "data.bar": acidv1.StreamTable{ - EventType: "stream_type_b", + EventType: "stream-type-c", IdColumn: "b_id", PayloadColumn: "b_payload", },