remove wal type and distinguish source between nakadi and sqs

This commit is contained in:
Felix Kunde 2021-08-25 18:15:19 +02:00
parent 2726492fd3
commit 7a84c0852c
7 changed files with 44 additions and 21 deletions

View File

@ -499,7 +499,6 @@ spec:
enum:
- "nakadi"
- "sqs"
- "wal"
teamId:
type: string
tls:

View File

@ -189,3 +189,19 @@ spec:
# operator: In
# values:
# - enabled
# Enables change data capture streams for defined database tables
# streams:
# - type: nakadi
# batchSize: 100
# database: foo
# tables:
# ta: event_type_a
# tb: event_type_b
# - type: sqs
# database: foo
# tables:
# ta: ""
# tb: ""
# sqsArn: arn:aws:sqs:eu-central-1:111122223333
# queueName: foo-queue

View File

@ -495,7 +495,6 @@ spec:
enum:
- "nakadi"
- "sqs"
- "wal"
teamId:
type: string
tls:

View File

@ -701,9 +701,6 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
{
Raw: []byte(`"sqs"`),
},
{
Raw: []byte(`"wal"`),
},
},
},
},

View File

@ -69,7 +69,7 @@ type EventStreamSource struct {
// EventStreamTable defines the name and ID column to be used for streaming
type EventStreamTable struct {
Name string `json:"name"`
IDColumn string `json:"idColumn,omitempty" defaults:"id"`
IDColumn string `json:"idColumn,omitempty"`
}
// Connection to be used for allowing the FES operator to connect to a database

View File

@ -131,15 +131,26 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream
}
func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType string) zalandov1alpha1.EventStreamSource {
streamFilter := stream.Filter[table]
_, schema := getTableSchema(table)
return zalandov1alpha1.EventStreamSource{
Type: constants.EventStreamSourcePGType,
Schema: schema,
EventStreamTable: getOutboxTable(table, eventType),
Filter: streamFilter,
Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix),
switch stream.StreamType {
case "nakadi":
streamFilter := stream.Filter[table]
return zalandov1alpha1.EventStreamSource{
Type: constants.EventStreamSourcePGType,
Schema: schema,
EventStreamTable: getOutboxTable(table, eventType),
Filter: streamFilter,
Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix),
}
case "sqs":
return zalandov1alpha1.EventStreamSource{
Type: constants.EventStreamSourcePGType,
EventStreamTable: getSqsTable(table),
Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix),
}
}
return zalandov1alpha1.EventStreamSource{}
}
func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow {
@ -195,6 +206,12 @@ func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTabl
}
}
func getSqsTable(tableName string) zalandov1alpha1.EventStreamTable {
return zalandov1alpha1.EventStreamTable{
Name: outboxTableNameTemplate.Format("table", tableName),
}
}
func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Connection {
return zalandov1alpha1.Connection{
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user),

View File

@ -55,18 +55,13 @@ var (
BatchSize: uint32(100),
},
{
StreamType: "wal",
StreamType: "sqs",
Database: "foo",
Tables: map[string]string{
"bar": "stream_type_a",
},
BatchSize: uint32(100),
},
{
StreamType: "sqs",
Database: "foo",
SqsArn: "arn:aws:sqs:eu-central-1:111122223333",
QueueName: "foo-queue",
SqsArn: "arn:aws:sqs:eu-central-1:111122223333",
QueueName: "foo-queue",
},
},
Users: map[string]acidv1.UserFlags{