turn optional stream fields to pointers

This commit is contained in:
Felix Kunde 2022-03-17 19:07:17 +01:00
parent 5cf0b69662
commit 8f3699e435
6 changed files with 71 additions and 56 deletions

View File

@ -239,12 +239,12 @@ type Stream struct {
ApplicationId string `json:"applicationId"`
Database string `json:"database"`
Tables map[string]StreamTable `json:"tables"`
Filter map[string]string `json:"filter,omitempty"`
BatchSize uint32 `json:"batchSize,omitempty"`
Filter map[string]*string `json:"filter,omitempty"`
BatchSize *uint32 `json:"batchSize,omitempty"`
}
type StreamTable struct {
EventType string `json:"eventType"`
IdColumn string `json:"idColumn,omitempty" defaults:"id"`
PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"`
EventType string `json:"eventType"`
IdColumn *string `json:"idColumn,omitempty"`
PayloadColumn *string `json:"payloadColumn,omitempty"`
}

View File

@ -1176,16 +1176,29 @@ func (in *Stream) DeepCopyInto(out *Stream) {
in, out := &in.Tables, &out.Tables
*out = make(map[string]StreamTable, len(*in))
for key, val := range *in {
(*out)[key] = val
(*out)[key] = *val.DeepCopy()
}
}
if in.Filter != nil {
in, out := &in.Filter, &out.Filter
*out = make(map[string]string, len(*in))
*out = make(map[string]*string, len(*in))
for key, val := range *in {
(*out)[key] = val
var outVal *string
if val == nil {
(*out)[key] = nil
} else {
in, out := &val, &outVal
*out = new(string)
**out = **in
}
(*out)[key] = outVal
}
}
if in.BatchSize != nil {
in, out := &in.BatchSize, &out.BatchSize
*out = new(uint32)
**out = **in
}
return
}
@ -1202,6 +1215,16 @@ func (in *Stream) DeepCopy() *Stream {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StreamTable) DeepCopyInto(out *StreamTable) {
*out = *in
if in.IdColumn != nil {
in, out := &in.IdColumn, &out.IdColumn
*out = new(string)
**out = **in
}
if in.PayloadColumn != nil {
in, out := &in.PayloadColumn, &out.PayloadColumn
*out = new(string)
**out = **in
}
return
}

View File

@ -40,15 +40,15 @@ type EventStream struct {
// EventStreamFlow defines the flow characteristics of the event stream
type EventStreamFlow struct {
Type string `json:"type"`
PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"`
Type string `json:"type"`
PayloadColumn *string `json:"payloadColumn,omitempty"`
}
// EventStreamSink defines the target of the event stream
type EventStreamSink struct {
Type string `json:"type"`
EventType string `json:"eventType,omitempty"`
MaxBatchSize uint32 `json:"maxBatchSize,omitempty"`
Type string `json:"type"`
EventType string `json:"eventType,omitempty"`
MaxBatchSize *uint32 `json:"maxBatchSize,omitempty"`
}
// EventStreamSource defines the source of the event stream and connection for FES operator
@ -56,23 +56,23 @@ type EventStreamSource struct {
Type string `json:"type"`
Schema string `json:"schema,omitempty" defaults:"public"`
EventStreamTable EventStreamTable `json:"table"`
Filter string `json:"filter,omitempty"`
Filter *string `json:"filter,omitempty"`
Connection Connection `json:"jdbcConnection"`
}
// EventStreamTable defines the name and ID column to be used for streaming
type EventStreamTable struct {
Name string `json:"name"`
IDColumn string `json:"idColumn,omitempty" defaults:"id"`
Name string `json:"name"`
IDColumn *string `json:"idColumn,omitempty"`
}
// Connection to be used for allowing the FES operator to connect to a database
type Connection struct {
Url string `json:"jdbcUrl"`
SlotName string `json:"slotName"`
PluginType string `json:"pluginType,omitempty" defaults:"pgoutput"`
PublicationName string `json:"publicationName,omitempty"`
DBAuth DBAuth `json:"databaseAuthentication"`
Url string `json:"jdbcUrl"`
SlotName string `json:"slotName"`
PluginType string `json:"pluginType,omitempty"`
PublicationName *string `json:"publicationName,omitempty"`
DBAuth DBAuth `json:"databaseAuthentication"`
}
// DBAuth specifies the credentials to be used for connecting with the database

View File

@ -189,7 +189,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
}
}
func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1.EventStreamSource {
func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName string, idColumn *string) zalandov1.EventStreamSource {
table, schema := getTableSchema(tableName)
streamFilter := stream.Filter[tableName]
return zalandov1.EventStreamSource{
@ -204,12 +204,7 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn
}
}
func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.EventStreamFlow {
if payloadColumn == "" {
return zalandov1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType,
}
}
func getEventStreamFlow(stream acidv1.Stream, payloadColumn *string) zalandov1.EventStreamFlow {
return zalandov1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType,
PayloadColumn: payloadColumn,
@ -217,12 +212,6 @@ func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.Ev
}
func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventStreamSink {
if stream.BatchSize == 0 {
return zalandov1.EventStreamSink{
Type: constants.EventStreamSinkNakadiType,
EventType: eventType,
}
}
return zalandov1.EventStreamSink{
Type: constants.EventStreamSinkNakadiType,
EventType: eventType,
@ -241,12 +230,7 @@ func getTableSchema(fullTableName string) (tableName, schemaName string) {
return tableName, schemaName
}
func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable {
if idColumn == "" {
return zalandov1.EventStreamTable{
Name: tableName,
}
}
func getOutboxTable(tableName string, idColumn *string) zalandov1.EventStreamTable {
return zalandov1.EventStreamTable{
Name: tableName,
IDColumn: idColumn,

View File

@ -64,17 +64,17 @@ var (
Tables: map[string]acidv1.StreamTable{
"data.bar": acidv1.StreamTable{
EventType: "stream-type-a",
IdColumn: "b_id",
PayloadColumn: "b_payload",
IdColumn: k8sutil.StringPointer("b_id"),
PayloadColumn: k8sutil.StringPointer("b_payload"),
},
"data.foobar": acidv1.StreamTable{
EventType: "stream-type-b",
},
},
Filter: map[string]string{
"data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]",
Filter: map[string]*string{
"data.bar": k8sutil.StringPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
},
BatchSize: uint32(100),
BatchSize: k8sutil.UInt32ToPointer(uint32(100)),
},
},
Volume: acidv1.Volume{
@ -105,16 +105,16 @@ var (
EventStreams: []zalandov1.EventStream{
zalandov1.EventStream{
EventStreamFlow: zalandov1.EventStreamFlow{
PayloadColumn: "b_payload",
PayloadColumn: k8sutil.StringPointer("b_payload"),
Type: constants.EventStreamFlowPgGenericType,
},
EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-a",
MaxBatchSize: uint32(100),
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
Type: constants.EventStreamSinkNakadiType,
},
EventStreamSource: zalandov1.EventStreamSource{
Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]",
Filter: k8sutil.StringPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
Connection: zalandov1.Connection{
DBAuth: zalandov1.DBAuth{
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
@ -128,7 +128,7 @@ var (
},
Schema: "data",
EventStreamTable: zalandov1.EventStreamTable{
IDColumn: "b_id",
IDColumn: k8sutil.StringPointer("b_id"),
Name: "bar",
},
Type: constants.EventStreamSourcePGType,
@ -140,7 +140,7 @@ var (
},
EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-b",
MaxBatchSize: uint32(100),
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
Type: constants.EventStreamSinkNakadiType,
},
EventStreamSource: zalandov1.EventStreamSource{
@ -202,7 +202,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
// compare generated stream with expected stream
result := cluster.generateFabricEventStream(appId)
if !reflect.DeepEqual(result, fes) {
if match, _ := sameStreams(result.Spec.EventStreams, fes.Spec.EventStreams); !match {
t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result)
}
@ -261,8 +261,8 @@ func TestSameStreams(t *testing.T) {
}{
{
subTest: "identical streams",
streamsA: fes.Spec.EventStreams,
streamsB: fes.Spec.EventStreams,
streamsA: []zalandov1.EventStream{stream1, stream2},
streamsB: []zalandov1.EventStream{stream1, stream2},
match: true,
reason: "",
},
@ -338,11 +338,11 @@ func TestUpdateFabricEventStream(t *testing.T) {
Tables: map[string]acidv1.StreamTable{
"data.bar": acidv1.StreamTable{
EventType: "stream-type-c",
IdColumn: "b_id",
PayloadColumn: "b_payload",
IdColumn: k8sutil.StringPointer("b_id"),
PayloadColumn: k8sutil.StringPointer("b_payload"),
},
},
BatchSize: uint32(250),
BatchSize: k8sutil.UInt32ToPointer(uint32(250)),
},
}
patch, err := json.Marshal(struct {

View File

@ -37,6 +37,14 @@ func Int32ToPointer(value int32) *int32 {
return &value
}
func UInt32ToPointer(value uint32) *uint32 {
return &value
}
func StringPointer(str string) *string {
return &str
}
// KubernetesClient describes getters for Kubernetes objects
type KubernetesClient struct {
corev1.SecretsGetter