fix comparison of event stream array

This commit is contained in:
Felix Kunde 2022-03-17 16:52:30 +01:00
parent 69254abeba
commit 5cf0b69662
3 changed files with 185 additions and 21 deletions

View File

@ -70,7 +70,7 @@ type EventStreamTable struct {
type Connection struct { type Connection struct {
Url string `json:"jdbcUrl"` Url string `json:"jdbcUrl"`
SlotName string `json:"slotName"` SlotName string `json:"slotName"`
PluginType string `json:"pluginType,omitempty" defaults:"wal2json"` PluginType string `json:"pluginType,omitempty" defaults:"pgoutput"`
PublicationName string `json:"publicationName,omitempty"` PublicationName string `json:"publicationName,omitempty"`
DBAuth DBAuth `json:"databaseAuthentication"` DBAuth DBAuth `json:"databaseAuthentication"`
} }

View File

@ -205,6 +205,11 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn
} }
func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.EventStreamFlow { func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.EventStreamFlow {
if payloadColumn == "" {
return zalandov1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType,
}
}
return zalandov1.EventStreamFlow{ return zalandov1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType, Type: constants.EventStreamFlowPgGenericType,
PayloadColumn: payloadColumn, PayloadColumn: payloadColumn,
@ -212,6 +217,12 @@ func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.Ev
} }
func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventStreamSink { func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventStreamSink {
if stream.BatchSize == 0 {
return zalandov1.EventStreamSink{
Type: constants.EventStreamSinkNakadiType,
EventType: eventType,
}
}
return zalandov1.EventStreamSink{ return zalandov1.EventStreamSink{
Type: constants.EventStreamSinkNakadiType, Type: constants.EventStreamSinkNakadiType,
EventType: eventType, EventType: eventType,
@ -231,6 +242,11 @@ func getTableSchema(fullTableName string) (tableName, schemaName string) {
} }
func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable { func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable {
if idColumn == "" {
return zalandov1.EventStreamTable{
Name: tableName,
}
}
return zalandov1.EventStreamTable{ return zalandov1.EventStreamTable{
Name: tableName, Name: tableName,
IDColumn: idColumn, IDColumn: idColumn,
@ -347,8 +363,8 @@ func (c *Cluster) createOrUpdateStreams() error {
c.logger.Infof("event stream %q has been successfully created", fesName) c.logger.Infof("event stream %q has been successfully created", fesName)
} else { } else {
desiredStreams := c.generateFabricEventStream(appId) desiredStreams := c.generateFabricEventStream(appId)
if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { if match, reason := sameStreams(effectiveStreams.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match {
c.logger.Debug("updating event streams") c.logger.Debugf("updating event streams: %s", reason)
desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion
err = c.updateStreams(desiredStreams) err = c.updateStreams(desiredStreams)
if err != nil { if err != nil {
@ -361,3 +377,27 @@ func (c *Cluster) createOrUpdateStreams() error {
return nil return nil
} }
func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
if len(newEventStreams) != len(curEventStreams) {
return false, "number of defined streams is different"
}
for _, newStream := range newEventStreams {
match = false
reason = "event stream specs differ"
for _, curStream := range curEventStreams {
if reflect.DeepEqual(newStream.EventStreamSource, curStream.EventStreamSource) &&
reflect.DeepEqual(newStream.EventStreamFlow, curStream.EventStreamFlow) &&
reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) {
match = true
break
}
}
if !match {
return false, reason
}
}
return true, ""
}

View File

@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
v1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1"
fakezalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" fakezalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake"
"github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/config"
@ -63,10 +63,13 @@ var (
Database: "foo", Database: "foo",
Tables: map[string]acidv1.StreamTable{ Tables: map[string]acidv1.StreamTable{
"data.bar": acidv1.StreamTable{ "data.bar": acidv1.StreamTable{
EventType: "stream_type_a", EventType: "stream-type-a",
IdColumn: "b_id", IdColumn: "b_id",
PayloadColumn: "b_payload", PayloadColumn: "b_payload",
}, },
"data.foobar": acidv1.StreamTable{
EventType: "stream-type-b",
},
}, },
Filter: map[string]string{ Filter: map[string]string{
"data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]", "data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]",
@ -80,7 +83,7 @@ var (
}, },
} }
fes = &v1.FabricEventStream{ fes = &zalandov1.FabricEventStream{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
APIVersion: constants.EventStreamCRDApiVersion, APIVersion: constants.EventStreamCRDApiVersion,
Kind: constants.EventStreamCRDKind, Kind: constants.EventStreamCRDKind,
@ -97,23 +100,23 @@ var (
}, },
}, },
}, },
Spec: v1.FabricEventStreamSpec{ Spec: zalandov1.FabricEventStreamSpec{
ApplicationId: appId, ApplicationId: appId,
EventStreams: []v1.EventStream{ EventStreams: []zalandov1.EventStream{
{ zalandov1.EventStream{
EventStreamFlow: v1.EventStreamFlow{ EventStreamFlow: zalandov1.EventStreamFlow{
PayloadColumn: "b_payload", PayloadColumn: "b_payload",
Type: constants.EventStreamFlowPgGenericType, Type: constants.EventStreamFlowPgGenericType,
}, },
EventStreamSink: v1.EventStreamSink{ EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream_type_a", EventType: "stream-type-a",
MaxBatchSize: uint32(100), MaxBatchSize: uint32(100),
Type: constants.EventStreamSinkNakadiType, Type: constants.EventStreamSinkNakadiType,
}, },
EventStreamSource: v1.EventStreamSource{ EventStreamSource: zalandov1.EventStreamSource{
Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]", Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]",
Connection: v1.Connection{ Connection: zalandov1.Connection{
DBAuth: v1.DBAuth{ DBAuth: zalandov1.DBAuth{
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName), Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
PasswordKey: "password", PasswordKey: "password",
Type: constants.EventStreamSourceAuthType, Type: constants.EventStreamSourceAuthType,
@ -124,13 +127,41 @@ var (
PluginType: constants.EventStreamSourcePluginType, PluginType: constants.EventStreamSourcePluginType,
}, },
Schema: "data", Schema: "data",
EventStreamTable: v1.EventStreamTable{ EventStreamTable: zalandov1.EventStreamTable{
IDColumn: "b_id", IDColumn: "b_id",
Name: "bar", Name: "bar",
}, },
Type: constants.EventStreamSourcePGType, Type: constants.EventStreamSourcePGType,
}, },
}, },
zalandov1.EventStream{
EventStreamFlow: zalandov1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType,
},
EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-b",
MaxBatchSize: uint32(100),
Type: constants.EventStreamSinkNakadiType,
},
EventStreamSource: zalandov1.EventStreamSource{
Connection: zalandov1.Connection{
DBAuth: zalandov1.DBAuth{
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
PasswordKey: "password",
Type: constants.EventStreamSourceAuthType,
UserKey: "username",
},
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser),
SlotName: slotName,
PluginType: constants.EventStreamSourcePluginType,
},
Schema: "data",
EventStreamTable: zalandov1.EventStreamTable{
Name: "foobar",
},
Type: constants.EventStreamSourcePGType,
},
},
}, },
}, },
} }
@ -161,23 +192,116 @@ func TestGenerateFabricEventStream(t *testing.T) {
cluster.Name = clusterName cluster.Name = clusterName
cluster.Namespace = namespace cluster.Namespace = namespace
// create statefulset to have ownerReference for streams
_, err := cluster.createStatefulSet() _, err := cluster.createStatefulSet()
assert.NoError(t, err) assert.NoError(t, err)
// create the streams
err = cluster.createOrUpdateStreams() err = cluster.createOrUpdateStreams()
assert.NoError(t, err) assert.NoError(t, err)
// compare generated stream with expected stream
result := cluster.generateFabricEventStream(appId) result := cluster.generateFabricEventStream(appId)
if !reflect.DeepEqual(result, fes) { if !reflect.DeepEqual(result, fes) {
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, result) t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result)
} }
// compare stream resturned from API with expected stream
streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
assert.NoError(t, err) assert.NoError(t, err)
if match, _ := sameStreams(streamCRD.Spec.EventStreams, fes.Spec.EventStreams); !match {
t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streamCRD)
}
if !reflect.DeepEqual(streamCRD, fes) { // sync streams once again
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, streamCRD) err = cluster.createOrUpdateStreams()
assert.NoError(t, err)
// compare stream resturned from API with generated stream
streamCRD, err = cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
assert.NoError(t, err)
if match, _ := sameStreams(streamCRD.Spec.EventStreams, result.Spec.EventStreams); !match {
t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streamCRD)
}
}
func TestSameStreams(t *testing.T) {
testName := "TestSameStreams"
stream1 := zalandov1.EventStream{
EventStreamFlow: zalandov1.EventStreamFlow{},
EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-a",
},
EventStreamSource: zalandov1.EventStreamSource{
EventStreamTable: zalandov1.EventStreamTable{
Name: "foo",
},
},
}
stream2 := zalandov1.EventStream{
EventStreamFlow: zalandov1.EventStreamFlow{},
EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-b",
},
EventStreamSource: zalandov1.EventStreamSource{
EventStreamTable: zalandov1.EventStreamTable{
Name: "bar",
},
},
}
tests := []struct {
subTest string
streamsA []zalandov1.EventStream
streamsB []zalandov1.EventStream
match bool
reason string
}{
{
subTest: "identical streams",
streamsA: fes.Spec.EventStreams,
streamsB: fes.Spec.EventStreams,
match: true,
reason: "",
},
{
subTest: "same streams different order",
streamsA: []zalandov1.EventStream{stream1, stream2},
streamsB: []zalandov1.EventStream{stream2, stream1},
match: true,
reason: "",
},
{
subTest: "same streams different order",
streamsA: []zalandov1.EventStream{stream1},
streamsB: []zalandov1.EventStream{stream1, stream2},
match: false,
reason: "number of defined streams is different",
},
{
subTest: "different number of streams",
streamsA: []zalandov1.EventStream{stream1},
streamsB: []zalandov1.EventStream{stream1, stream2},
match: false,
reason: "number of defined streams is different",
},
{
subTest: "event stream specs differ",
streamsA: []zalandov1.EventStream{stream1, stream2},
streamsB: fes.Spec.EventStreams,
match: false,
reason: "number of defined streams is different",
},
}
for _, tt := range tests {
streamsMatch, matchReason := sameStreams(tt.streamsA, tt.streamsB)
if streamsMatch != tt.match {
t.Errorf("%s %s: unexpected match result when comparing streams: got %s, epxected %s",
testName, tt.subTest, matchReason, tt.reason)
}
} }
} }
@ -213,7 +337,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
Database: dbName, Database: dbName,
Tables: map[string]acidv1.StreamTable{ Tables: map[string]acidv1.StreamTable{
"data.bar": acidv1.StreamTable{ "data.bar": acidv1.StreamTable{
EventType: "stream_type_b", EventType: "stream-type-c",
IdColumn: "b_id", IdColumn: "b_id",
PayloadColumn: "b_payload", PayloadColumn: "b_payload",
}, },