fix comparison of event stream array (#1817)
* fix comparison of event stream array * turn optional stream fields to pointers
This commit is contained in:
parent
2719d411c3
commit
1d88009ec4
|
|
@ -239,12 +239,12 @@ type Stream struct {
|
||||||
ApplicationId string `json:"applicationId"`
|
ApplicationId string `json:"applicationId"`
|
||||||
Database string `json:"database"`
|
Database string `json:"database"`
|
||||||
Tables map[string]StreamTable `json:"tables"`
|
Tables map[string]StreamTable `json:"tables"`
|
||||||
Filter map[string]string `json:"filter,omitempty"`
|
Filter map[string]*string `json:"filter,omitempty"`
|
||||||
BatchSize uint32 `json:"batchSize,omitempty"`
|
BatchSize *uint32 `json:"batchSize,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type StreamTable struct {
|
type StreamTable struct {
|
||||||
EventType string `json:"eventType"`
|
EventType string `json:"eventType"`
|
||||||
IdColumn string `json:"idColumn,omitempty" defaults:"id"`
|
IdColumn *string `json:"idColumn,omitempty"`
|
||||||
PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"`
|
PayloadColumn *string `json:"payloadColumn,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1181,16 +1181,29 @@ func (in *Stream) DeepCopyInto(out *Stream) {
|
||||||
in, out := &in.Tables, &out.Tables
|
in, out := &in.Tables, &out.Tables
|
||||||
*out = make(map[string]StreamTable, len(*in))
|
*out = make(map[string]StreamTable, len(*in))
|
||||||
for key, val := range *in {
|
for key, val := range *in {
|
||||||
(*out)[key] = val
|
(*out)[key] = *val.DeepCopy()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if in.Filter != nil {
|
if in.Filter != nil {
|
||||||
in, out := &in.Filter, &out.Filter
|
in, out := &in.Filter, &out.Filter
|
||||||
*out = make(map[string]string, len(*in))
|
*out = make(map[string]*string, len(*in))
|
||||||
for key, val := range *in {
|
for key, val := range *in {
|
||||||
(*out)[key] = val
|
var outVal *string
|
||||||
|
if val == nil {
|
||||||
|
(*out)[key] = nil
|
||||||
|
} else {
|
||||||
|
in, out := &val, &outVal
|
||||||
|
*out = new(string)
|
||||||
|
**out = **in
|
||||||
|
}
|
||||||
|
(*out)[key] = outVal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if in.BatchSize != nil {
|
||||||
|
in, out := &in.BatchSize, &out.BatchSize
|
||||||
|
*out = new(uint32)
|
||||||
|
**out = **in
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1207,6 +1220,16 @@ func (in *Stream) DeepCopy() *Stream {
|
||||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
func (in *StreamTable) DeepCopyInto(out *StreamTable) {
|
func (in *StreamTable) DeepCopyInto(out *StreamTable) {
|
||||||
*out = *in
|
*out = *in
|
||||||
|
if in.IdColumn != nil {
|
||||||
|
in, out := &in.IdColumn, &out.IdColumn
|
||||||
|
*out = new(string)
|
||||||
|
**out = **in
|
||||||
|
}
|
||||||
|
if in.PayloadColumn != nil {
|
||||||
|
in, out := &in.PayloadColumn, &out.PayloadColumn
|
||||||
|
*out = new(string)
|
||||||
|
**out = **in
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -40,15 +40,15 @@ type EventStream struct {
|
||||||
|
|
||||||
// EventStreamFlow defines the flow characteristics of the event stream
|
// EventStreamFlow defines the flow characteristics of the event stream
|
||||||
type EventStreamFlow struct {
|
type EventStreamFlow struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"`
|
PayloadColumn *string `json:"payloadColumn,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventStreamSink defines the target of the event stream
|
// EventStreamSink defines the target of the event stream
|
||||||
type EventStreamSink struct {
|
type EventStreamSink struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
EventType string `json:"eventType,omitempty"`
|
EventType string `json:"eventType,omitempty"`
|
||||||
MaxBatchSize uint32 `json:"maxBatchSize,omitempty"`
|
MaxBatchSize *uint32 `json:"maxBatchSize,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventStreamSource defines the source of the event stream and connection for FES operator
|
// EventStreamSource defines the source of the event stream and connection for FES operator
|
||||||
|
|
@ -56,23 +56,23 @@ type EventStreamSource struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
Schema string `json:"schema,omitempty" defaults:"public"`
|
Schema string `json:"schema,omitempty" defaults:"public"`
|
||||||
EventStreamTable EventStreamTable `json:"table"`
|
EventStreamTable EventStreamTable `json:"table"`
|
||||||
Filter string `json:"filter,omitempty"`
|
Filter *string `json:"filter,omitempty"`
|
||||||
Connection Connection `json:"jdbcConnection"`
|
Connection Connection `json:"jdbcConnection"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventStreamTable defines the name and ID column to be used for streaming
|
// EventStreamTable defines the name and ID column to be used for streaming
|
||||||
type EventStreamTable struct {
|
type EventStreamTable struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
IDColumn string `json:"idColumn,omitempty" defaults:"id"`
|
IDColumn *string `json:"idColumn,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connection to be used for allowing the FES operator to connect to a database
|
// Connection to be used for allowing the FES operator to connect to a database
|
||||||
type Connection struct {
|
type Connection struct {
|
||||||
Url string `json:"jdbcUrl"`
|
Url string `json:"jdbcUrl"`
|
||||||
SlotName string `json:"slotName"`
|
SlotName string `json:"slotName"`
|
||||||
PluginType string `json:"pluginType,omitempty" defaults:"wal2json"`
|
PluginType string `json:"pluginType,omitempty"`
|
||||||
PublicationName string `json:"publicationName,omitempty"`
|
PublicationName *string `json:"publicationName,omitempty"`
|
||||||
DBAuth DBAuth `json:"databaseAuthentication"`
|
DBAuth DBAuth `json:"databaseAuthentication"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// DBAuth specifies the credentials to be used for connecting with the database
|
// DBAuth specifies the credentials to be used for connecting with the database
|
||||||
|
|
|
||||||
|
|
@ -189,7 +189,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1.EventStreamSource {
|
func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName string, idColumn *string) zalandov1.EventStreamSource {
|
||||||
table, schema := getTableSchema(tableName)
|
table, schema := getTableSchema(tableName)
|
||||||
streamFilter := stream.Filter[tableName]
|
streamFilter := stream.Filter[tableName]
|
||||||
return zalandov1.EventStreamSource{
|
return zalandov1.EventStreamSource{
|
||||||
|
|
@ -204,7 +204,7 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.EventStreamFlow {
|
func getEventStreamFlow(stream acidv1.Stream, payloadColumn *string) zalandov1.EventStreamFlow {
|
||||||
return zalandov1.EventStreamFlow{
|
return zalandov1.EventStreamFlow{
|
||||||
Type: constants.EventStreamFlowPgGenericType,
|
Type: constants.EventStreamFlowPgGenericType,
|
||||||
PayloadColumn: payloadColumn,
|
PayloadColumn: payloadColumn,
|
||||||
|
|
@ -230,7 +230,7 @@ func getTableSchema(fullTableName string) (tableName, schemaName string) {
|
||||||
return tableName, schemaName
|
return tableName, schemaName
|
||||||
}
|
}
|
||||||
|
|
||||||
func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable {
|
func getOutboxTable(tableName string, idColumn *string) zalandov1.EventStreamTable {
|
||||||
return zalandov1.EventStreamTable{
|
return zalandov1.EventStreamTable{
|
||||||
Name: tableName,
|
Name: tableName,
|
||||||
IDColumn: idColumn,
|
IDColumn: idColumn,
|
||||||
|
|
@ -347,8 +347,8 @@ func (c *Cluster) createOrUpdateStreams() error {
|
||||||
c.logger.Infof("event stream %q has been successfully created", fesName)
|
c.logger.Infof("event stream %q has been successfully created", fesName)
|
||||||
} else {
|
} else {
|
||||||
desiredStreams := c.generateFabricEventStream(appId)
|
desiredStreams := c.generateFabricEventStream(appId)
|
||||||
if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) {
|
if match, reason := sameStreams(effectiveStreams.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match {
|
||||||
c.logger.Debug("updating event streams")
|
c.logger.Debugf("updating event streams: %s", reason)
|
||||||
desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion
|
desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion
|
||||||
err = c.updateStreams(desiredStreams)
|
err = c.updateStreams(desiredStreams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -361,3 +361,27 @@ func (c *Cluster) createOrUpdateStreams() error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
|
||||||
|
if len(newEventStreams) != len(curEventStreams) {
|
||||||
|
return false, "number of defined streams is different"
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, newStream := range newEventStreams {
|
||||||
|
match = false
|
||||||
|
reason = "event stream specs differ"
|
||||||
|
for _, curStream := range curEventStreams {
|
||||||
|
if reflect.DeepEqual(newStream.EventStreamSource, curStream.EventStreamSource) &&
|
||||||
|
reflect.DeepEqual(newStream.EventStreamFlow, curStream.EventStreamFlow) &&
|
||||||
|
reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) {
|
||||||
|
match = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !match {
|
||||||
|
return false, reason
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, ""
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||||
v1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1"
|
zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1"
|
||||||
fakezalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake"
|
fakezalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake"
|
||||||
"github.com/zalando/postgres-operator/pkg/util"
|
"github.com/zalando/postgres-operator/pkg/util"
|
||||||
"github.com/zalando/postgres-operator/pkg/util/config"
|
"github.com/zalando/postgres-operator/pkg/util/config"
|
||||||
|
|
@ -63,15 +63,18 @@ var (
|
||||||
Database: "foo",
|
Database: "foo",
|
||||||
Tables: map[string]acidv1.StreamTable{
|
Tables: map[string]acidv1.StreamTable{
|
||||||
"data.bar": acidv1.StreamTable{
|
"data.bar": acidv1.StreamTable{
|
||||||
EventType: "stream_type_a",
|
EventType: "stream-type-a",
|
||||||
IdColumn: "b_id",
|
IdColumn: k8sutil.StringToPointer("b_id"),
|
||||||
PayloadColumn: "b_payload",
|
PayloadColumn: k8sutil.StringToPointer("b_payload"),
|
||||||
|
},
|
||||||
|
"data.foobar": acidv1.StreamTable{
|
||||||
|
EventType: "stream-type-b",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Filter: map[string]string{
|
Filter: map[string]*string{
|
||||||
"data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]",
|
"data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
|
||||||
},
|
},
|
||||||
BatchSize: uint32(100),
|
BatchSize: k8sutil.UInt32ToPointer(uint32(100)),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Volume: acidv1.Volume{
|
Volume: acidv1.Volume{
|
||||||
|
|
@ -80,7 +83,7 @@ var (
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fes = &v1.FabricEventStream{
|
fes = &zalandov1.FabricEventStream{
|
||||||
TypeMeta: metav1.TypeMeta{
|
TypeMeta: metav1.TypeMeta{
|
||||||
APIVersion: constants.EventStreamCRDApiVersion,
|
APIVersion: constants.EventStreamCRDApiVersion,
|
||||||
Kind: constants.EventStreamCRDKind,
|
Kind: constants.EventStreamCRDKind,
|
||||||
|
|
@ -97,23 +100,23 @@ var (
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Spec: v1.FabricEventStreamSpec{
|
Spec: zalandov1.FabricEventStreamSpec{
|
||||||
ApplicationId: appId,
|
ApplicationId: appId,
|
||||||
EventStreams: []v1.EventStream{
|
EventStreams: []zalandov1.EventStream{
|
||||||
{
|
zalandov1.EventStream{
|
||||||
EventStreamFlow: v1.EventStreamFlow{
|
EventStreamFlow: zalandov1.EventStreamFlow{
|
||||||
PayloadColumn: "b_payload",
|
PayloadColumn: k8sutil.StringToPointer("b_payload"),
|
||||||
Type: constants.EventStreamFlowPgGenericType,
|
Type: constants.EventStreamFlowPgGenericType,
|
||||||
},
|
},
|
||||||
EventStreamSink: v1.EventStreamSink{
|
EventStreamSink: zalandov1.EventStreamSink{
|
||||||
EventType: "stream_type_a",
|
EventType: "stream-type-a",
|
||||||
MaxBatchSize: uint32(100),
|
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
|
||||||
Type: constants.EventStreamSinkNakadiType,
|
Type: constants.EventStreamSinkNakadiType,
|
||||||
},
|
},
|
||||||
EventStreamSource: v1.EventStreamSource{
|
EventStreamSource: zalandov1.EventStreamSource{
|
||||||
Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]",
|
Filter: k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
|
||||||
Connection: v1.Connection{
|
Connection: zalandov1.Connection{
|
||||||
DBAuth: v1.DBAuth{
|
DBAuth: zalandov1.DBAuth{
|
||||||
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
|
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
|
||||||
PasswordKey: "password",
|
PasswordKey: "password",
|
||||||
Type: constants.EventStreamSourceAuthType,
|
Type: constants.EventStreamSourceAuthType,
|
||||||
|
|
@ -124,13 +127,41 @@ var (
|
||||||
PluginType: constants.EventStreamSourcePluginType,
|
PluginType: constants.EventStreamSourcePluginType,
|
||||||
},
|
},
|
||||||
Schema: "data",
|
Schema: "data",
|
||||||
EventStreamTable: v1.EventStreamTable{
|
EventStreamTable: zalandov1.EventStreamTable{
|
||||||
IDColumn: "b_id",
|
IDColumn: k8sutil.StringToPointer("b_id"),
|
||||||
Name: "bar",
|
Name: "bar",
|
||||||
},
|
},
|
||||||
Type: constants.EventStreamSourcePGType,
|
Type: constants.EventStreamSourcePGType,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
zalandov1.EventStream{
|
||||||
|
EventStreamFlow: zalandov1.EventStreamFlow{
|
||||||
|
Type: constants.EventStreamFlowPgGenericType,
|
||||||
|
},
|
||||||
|
EventStreamSink: zalandov1.EventStreamSink{
|
||||||
|
EventType: "stream-type-b",
|
||||||
|
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
|
||||||
|
Type: constants.EventStreamSinkNakadiType,
|
||||||
|
},
|
||||||
|
EventStreamSource: zalandov1.EventStreamSource{
|
||||||
|
Connection: zalandov1.Connection{
|
||||||
|
DBAuth: zalandov1.DBAuth{
|
||||||
|
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
|
||||||
|
PasswordKey: "password",
|
||||||
|
Type: constants.EventStreamSourceAuthType,
|
||||||
|
UserKey: "username",
|
||||||
|
},
|
||||||
|
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser),
|
||||||
|
SlotName: slotName,
|
||||||
|
PluginType: constants.EventStreamSourcePluginType,
|
||||||
|
},
|
||||||
|
Schema: "data",
|
||||||
|
EventStreamTable: zalandov1.EventStreamTable{
|
||||||
|
Name: "foobar",
|
||||||
|
},
|
||||||
|
Type: constants.EventStreamSourcePGType,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -161,23 +192,116 @@ func TestGenerateFabricEventStream(t *testing.T) {
|
||||||
cluster.Name = clusterName
|
cluster.Name = clusterName
|
||||||
cluster.Namespace = namespace
|
cluster.Namespace = namespace
|
||||||
|
|
||||||
|
// create statefulset to have ownerReference for streams
|
||||||
_, err := cluster.createStatefulSet()
|
_, err := cluster.createStatefulSet()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// create the streams
|
||||||
err = cluster.createOrUpdateStreams()
|
err = cluster.createOrUpdateStreams()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// compare generated stream with expected stream
|
||||||
result := cluster.generateFabricEventStream(appId)
|
result := cluster.generateFabricEventStream(appId)
|
||||||
|
if match, _ := sameStreams(result.Spec.EventStreams, fes.Spec.EventStreams); !match {
|
||||||
if !reflect.DeepEqual(result, fes) {
|
t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result)
|
||||||
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// compare stream resturned from API with expected stream
|
||||||
streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
|
streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
if match, _ := sameStreams(streamCRD.Spec.EventStreams, fes.Spec.EventStreams); !match {
|
||||||
|
t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streamCRD)
|
||||||
|
}
|
||||||
|
|
||||||
if !reflect.DeepEqual(streamCRD, fes) {
|
// sync streams once again
|
||||||
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, streamCRD)
|
err = cluster.createOrUpdateStreams()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// compare stream resturned from API with generated stream
|
||||||
|
streamCRD, err = cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
if match, _ := sameStreams(streamCRD.Spec.EventStreams, result.Spec.EventStreams); !match {
|
||||||
|
t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streamCRD)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSameStreams(t *testing.T) {
|
||||||
|
testName := "TestSameStreams"
|
||||||
|
|
||||||
|
stream1 := zalandov1.EventStream{
|
||||||
|
EventStreamFlow: zalandov1.EventStreamFlow{},
|
||||||
|
EventStreamSink: zalandov1.EventStreamSink{
|
||||||
|
EventType: "stream-type-a",
|
||||||
|
},
|
||||||
|
EventStreamSource: zalandov1.EventStreamSource{
|
||||||
|
EventStreamTable: zalandov1.EventStreamTable{
|
||||||
|
Name: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
stream2 := zalandov1.EventStream{
|
||||||
|
EventStreamFlow: zalandov1.EventStreamFlow{},
|
||||||
|
EventStreamSink: zalandov1.EventStreamSink{
|
||||||
|
EventType: "stream-type-b",
|
||||||
|
},
|
||||||
|
EventStreamSource: zalandov1.EventStreamSource{
|
||||||
|
EventStreamTable: zalandov1.EventStreamTable{
|
||||||
|
Name: "bar",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
subTest string
|
||||||
|
streamsA []zalandov1.EventStream
|
||||||
|
streamsB []zalandov1.EventStream
|
||||||
|
match bool
|
||||||
|
reason string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
subTest: "identical streams",
|
||||||
|
streamsA: []zalandov1.EventStream{stream1, stream2},
|
||||||
|
streamsB: []zalandov1.EventStream{stream1, stream2},
|
||||||
|
match: true,
|
||||||
|
reason: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
subTest: "same streams different order",
|
||||||
|
streamsA: []zalandov1.EventStream{stream1, stream2},
|
||||||
|
streamsB: []zalandov1.EventStream{stream2, stream1},
|
||||||
|
match: true,
|
||||||
|
reason: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
subTest: "same streams different order",
|
||||||
|
streamsA: []zalandov1.EventStream{stream1},
|
||||||
|
streamsB: []zalandov1.EventStream{stream1, stream2},
|
||||||
|
match: false,
|
||||||
|
reason: "number of defined streams is different",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
subTest: "different number of streams",
|
||||||
|
streamsA: []zalandov1.EventStream{stream1},
|
||||||
|
streamsB: []zalandov1.EventStream{stream1, stream2},
|
||||||
|
match: false,
|
||||||
|
reason: "number of defined streams is different",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
subTest: "event stream specs differ",
|
||||||
|
streamsA: []zalandov1.EventStream{stream1, stream2},
|
||||||
|
streamsB: fes.Spec.EventStreams,
|
||||||
|
match: false,
|
||||||
|
reason: "number of defined streams is different",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
streamsMatch, matchReason := sameStreams(tt.streamsA, tt.streamsB)
|
||||||
|
if streamsMatch != tt.match {
|
||||||
|
t.Errorf("%s %s: unexpected match result when comparing streams: got %s, epxected %s",
|
||||||
|
testName, tt.subTest, matchReason, tt.reason)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -213,12 +337,12 @@ func TestUpdateFabricEventStream(t *testing.T) {
|
||||||
Database: dbName,
|
Database: dbName,
|
||||||
Tables: map[string]acidv1.StreamTable{
|
Tables: map[string]acidv1.StreamTable{
|
||||||
"data.bar": acidv1.StreamTable{
|
"data.bar": acidv1.StreamTable{
|
||||||
EventType: "stream_type_b",
|
EventType: "stream-type-c",
|
||||||
IdColumn: "b_id",
|
IdColumn: k8sutil.StringToPointer("b_id"),
|
||||||
PayloadColumn: "b_payload",
|
PayloadColumn: k8sutil.StringToPointer("b_payload"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
BatchSize: uint32(250),
|
BatchSize: k8sutil.UInt32ToPointer(uint32(250)),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
patch, err := json.Marshal(struct {
|
patch, err := json.Marshal(struct {
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,14 @@ func Int32ToPointer(value int32) *int32 {
|
||||||
return &value
|
return &value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UInt32ToPointer(value uint32) *uint32 {
|
||||||
|
return &value
|
||||||
|
}
|
||||||
|
|
||||||
|
func StringToPointer(str string) *string {
|
||||||
|
return &str
|
||||||
|
}
|
||||||
|
|
||||||
// KubernetesClient describes getters for Kubernetes objects
|
// KubernetesClient describes getters for Kubernetes objects
|
||||||
type KubernetesClient struct {
|
type KubernetesClient struct {
|
||||||
corev1.SecretsGetter
|
corev1.SecretsGetter
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue