add resource annotation and ignore recovery type (#2817)

* add resource annotation and ignore recovery type
* Update docs/reference/cluster_manifest.md

---------

Co-authored-by: Ida Novindasari <idanovinda@gmail.com>
This commit is contained in:
Felix Kunde 2024-12-16 18:17:19 +01:00 committed by GitHub
parent 301462c415
commit 80ef38f7f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 150 additions and 24 deletions

View File

@ -514,6 +514,9 @@ spec:
type: string type: string
batchSize: batchSize:
type: integer type: integer
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
database: database:
type: string type: string
enableRecovery: enableRecovery:
@ -522,6 +525,9 @@ spec:
type: object type: object
additionalProperties: additionalProperties:
type: string type: string
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
tables: tables:
type: object type: object
additionalProperties: additionalProperties:
@ -533,6 +539,8 @@ spec:
type: string type: string
idColumn: idColumn:
type: string type: string
ignoreRecovery:
type: boolean
payloadColumn: payloadColumn:
type: string type: string
recoveryEventType: recoveryEventType:

View File

@ -652,11 +652,11 @@ can have the following properties:
* **applicationId** * **applicationId**
The application name to which the database and CDC belongs to. For each The application name to which the database and CDC belongs to. For each
set of streams with a distinct `applicationId` a separate stream CR as well set of streams with a distinct `applicationId` a separate stream resource as
as a separate logical replication slot will be created. This means there can well as a separate logical replication slot will be created. This means there
be different streams in the same database and streams with the same can be different streams in the same database and streams with the same
`applicationId` are bundled in one stream CR. The stream CR will be called `applicationId` are bundled in one stream resource. The stream resource will
like the Postgres cluster plus "-<applicationId>" suffix. Required. be called like the Postgres cluster plus "-<applicationId>" suffix. Required.
* **database** * **database**
Name of the database from where events will be published via Postgres' Name of the database from where events will be published via Postgres'
@ -667,7 +667,8 @@ can have the following properties:
* **tables** * **tables**
Defines a map of table names and their properties (`eventType`, `idColumn` Defines a map of table names and their properties (`eventType`, `idColumn`
and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/). and `payloadColumn`). Required.
The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
The application is responsible for putting events into a (JSON/B or VARCHAR) The application is responsible for putting events into a (JSON/B or VARCHAR)
payload column of the outbox table in the structure of the specified target payload column of the outbox table in the structure of the specified target
event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/16/logical-replication-publication.html) event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/16/logical-replication-publication.html)
@ -676,12 +677,27 @@ can have the following properties:
committed to the outbox table. The `idColumn` will be used in telemetry for committed to the outbox table. The `idColumn` will be used in telemetry for
the CDC operator. The names for `idColumn` and `payloadColumn` can be the CDC operator. The names for `idColumn` and `payloadColumn` can be
configured. Defaults are `id` and `payload`. The target `eventType` has to configured. Defaults are `id` and `payload`. The target `eventType` has to
be defined. Required. be defined. One can also specify a `recoveryEventType` that will be used
for a dead letter queue. By enabling `ignoreRecovery`, you can choose to
ignore failing events.
* **filter** * **filter**
Streamed events can be filtered by a jsonpath expression for each table. Streamed events can be filtered by a jsonpath expression for each table.
Optional. Optional.
* **enableRecovery**
Flag to enable a dead letter queue recovery for all streams tables.
Alternatively, recovery can also be enable for single outbox tables by only
specifying a `recoveryEventType` and no `enableRecovery` flag. When set to
false or missing, events will be retried until consuming succeeded. You can
use a `filter` expression to get rid of poison pills. Optional.
* **batchSize** * **batchSize**
Defines the size of batches in which events are consumed. Optional. Defines the size of batches in which events are consumed. Optional.
Defaults to 1. Defaults to 1.
* **cpu**
CPU requests to be set as an annotation on the stream resource. Optional.
* **memory**
memory requests to be set as an annotation on the stream resource. Optional.

View File

@ -512,6 +512,9 @@ spec:
type: string type: string
batchSize: batchSize:
type: integer type: integer
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
database: database:
type: string type: string
enableRecovery: enableRecovery:
@ -520,6 +523,9 @@ spec:
type: object type: object
additionalProperties: additionalProperties:
type: string type: string
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
tables: tables:
type: object type: object
additionalProperties: additionalProperties:
@ -531,6 +537,8 @@ spec:
type: string type: string
idColumn: idColumn:
type: string type: string
ignoreRecovery:
type: boolean
payloadColumn: payloadColumn:
type: string type: string
recoveryEventType: recoveryEventType:

View File

@ -258,6 +258,8 @@ type Stream struct {
Tables map[string]StreamTable `json:"tables"` Tables map[string]StreamTable `json:"tables"`
Filter map[string]*string `json:"filter,omitempty"` Filter map[string]*string `json:"filter,omitempty"`
BatchSize *uint32 `json:"batchSize,omitempty"` BatchSize *uint32 `json:"batchSize,omitempty"`
CPU *string `json:"cpu,omitempty"`
Memory *string `json:"memory,omitempty"`
EnableRecovery *bool `json:"enableRecovery,omitempty"` EnableRecovery *bool `json:"enableRecovery,omitempty"`
} }
@ -265,6 +267,7 @@ type Stream struct {
type StreamTable struct { type StreamTable struct {
EventType string `json:"eventType"` EventType string `json:"eventType"`
RecoveryEventType string `json:"recoveryEventType,omitempty"` RecoveryEventType string `json:"recoveryEventType,omitempty"`
IgnoreRecovery *bool `json:"ignoreRecovery,omitempty"`
IdColumn *string `json:"idColumn,omitempty"` IdColumn *string `json:"idColumn,omitempty"`
PayloadColumn *string `json:"payloadColumn,omitempty"` PayloadColumn *string `json:"payloadColumn,omitempty"`
} }

View File

@ -1336,6 +1336,16 @@ func (in *Stream) DeepCopyInto(out *Stream) {
*out = new(uint32) *out = new(uint32)
**out = **in **out = **in
} }
if in.CPU != nil {
in, out := &in.CPU, &out.CPU
*out = new(string)
**out = **in
}
if in.Memory != nil {
in, out := &in.Memory, &out.Memory
*out = new(string)
**out = **in
}
if in.EnableRecovery != nil { if in.EnableRecovery != nil {
in, out := &in.EnableRecovery, &out.EnableRecovery in, out := &in.EnableRecovery, &out.EnableRecovery
*out = new(bool) *out = new(bool)
@ -1357,6 +1367,11 @@ func (in *Stream) DeepCopy() *Stream {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StreamTable) DeepCopyInto(out *StreamTable) { func (in *StreamTable) DeepCopyInto(out *StreamTable) {
*out = *in *out = *in
if in.IgnoreRecovery != nil {
in, out := &in.IgnoreRecovery, &out.IgnoreRecovery
*out = new(bool)
**out = **in
}
if in.IdColumn != nil { if in.IdColumn != nil {
in, out := &in.IdColumn, &out.IdColumn in, out := &in.IdColumn, &out.IdColumn
*out = new(string) *out = new(string)

View File

@ -178,16 +178,35 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream { func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
eventStreams := make([]zalandov1.EventStream, 0) eventStreams := make([]zalandov1.EventStream, 0)
resourceAnnotations := map[string]string{}
for _, stream := range c.Spec.Streams { for _, stream := range c.Spec.Streams {
if stream.ApplicationId != appId { if stream.ApplicationId != appId {
continue continue
} }
if stream.CPU != nil {
cpu, exists := resourceAnnotations[constants.EventStreamCpuAnnotationKey]
if exists {
isSmaller, _ := util.IsSmallerQuantity(cpu, *stream.CPU)
if isSmaller {
resourceAnnotations[constants.EventStreamCpuAnnotationKey] = *stream.CPU
}
}
}
if stream.Memory != nil {
memory, exists := resourceAnnotations[constants.EventStreamMemoryAnnotationKey]
if exists {
isSmaller, _ := util.IsSmallerQuantity(memory, *stream.Memory)
if isSmaller {
resourceAnnotations[constants.EventStreamMemoryAnnotationKey] = *stream.Memory
}
}
}
for tableName, table := range stream.Tables { for tableName, table := range stream.Tables {
streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn) streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn)
streamFlow := getEventStreamFlow(table.PayloadColumn) streamFlow := getEventStreamFlow(table.PayloadColumn)
streamSink := getEventStreamSink(stream, table.EventType) streamSink := getEventStreamSink(stream, table.EventType)
streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType) streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType, table.IgnoreRecovery)
eventStreams = append(eventStreams, zalandov1.EventStream{ eventStreams = append(eventStreams, zalandov1.EventStream{
EventStreamFlow: streamFlow, EventStreamFlow: streamFlow,
@ -207,7 +226,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
Name: fmt.Sprintf("%s-%s", c.Name, strings.ToLower(util.RandomPassword(5))), Name: fmt.Sprintf("%s-%s", c.Name, strings.ToLower(util.RandomPassword(5))),
Namespace: c.Namespace, Namespace: c.Namespace,
Labels: c.labelsSet(true), Labels: c.labelsSet(true),
Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)), Annotations: c.AnnotationsToPropagate(c.annotationsSet(resourceAnnotations)),
OwnerReferences: c.ownerReferences(), OwnerReferences: c.ownerReferences(),
}, },
Spec: zalandov1.FabricEventStreamSpec{ Spec: zalandov1.FabricEventStreamSpec{
@ -247,7 +266,7 @@ func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventS
} }
} }
func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string) zalandov1.EventStreamRecovery { func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string, ignoreRecovery *bool) zalandov1.EventStreamRecovery {
if (stream.EnableRecovery != nil && !*stream.EnableRecovery) || if (stream.EnableRecovery != nil && !*stream.EnableRecovery) ||
(stream.EnableRecovery == nil && recoveryEventType == "") { (stream.EnableRecovery == nil && recoveryEventType == "") {
return zalandov1.EventStreamRecovery{ return zalandov1.EventStreamRecovery{
@ -255,6 +274,12 @@ func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType s
} }
} }
if ignoreRecovery != nil && *ignoreRecovery {
return zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryIgnoreType,
}
}
if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" { if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" {
recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix) recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix)
} }

View File

@ -65,12 +65,18 @@ var (
EventType: "stream-type-b", EventType: "stream-type-b",
RecoveryEventType: "stream-type-b-dlq", RecoveryEventType: "stream-type-b-dlq",
}, },
"data.foofoobar": {
EventType: "stream-type-c",
IgnoreRecovery: util.True(),
},
}, },
EnableRecovery: util.True(), EnableRecovery: util.True(),
Filter: map[string]*string{ Filter: map[string]*string{
"data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"), "data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
}, },
BatchSize: k8sutil.UInt32ToPointer(uint32(100)), BatchSize: k8sutil.UInt32ToPointer(uint32(100)),
CPU: k8sutil.StringToPointer("250m"),
Memory: k8sutil.StringToPointer("500Mi"),
}, },
}, },
TeamID: "acid", TeamID: "acid",
@ -88,6 +94,10 @@ var (
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-12345", clusterName), Name: fmt.Sprintf("%s-12345", clusterName),
Namespace: namespace, Namespace: namespace,
Annotations: map[string]string{
constants.EventStreamCpuAnnotationKey: "250m",
constants.EventStreamMemoryAnnotationKey: "500Mi",
},
Labels: map[string]string{ Labels: map[string]string{
"application": "spilo", "application": "spilo",
"cluster-name": clusterName, "cluster-name": clusterName,
@ -180,6 +190,37 @@ var (
Type: constants.EventStreamSourcePGType, Type: constants.EventStreamSourcePGType,
}, },
}, },
{
EventStreamFlow: zalandov1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType,
},
EventStreamRecovery: zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryIgnoreType,
},
EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-c",
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
Type: constants.EventStreamSinkNakadiType,
},
EventStreamSource: zalandov1.EventStreamSource{
Connection: zalandov1.Connection{
DBAuth: zalandov1.DBAuth{
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
PasswordKey: "password",
Type: constants.EventStreamSourceAuthType,
UserKey: "username",
},
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser),
SlotName: slotName,
PluginType: constants.EventStreamSourcePluginType,
},
Schema: "data",
EventStreamTable: zalandov1.EventStreamTable{
Name: "foofoobar",
},
Type: constants.EventStreamSourcePGType,
},
},
}, },
}, },
} }
@ -528,8 +569,8 @@ func TestSyncStreams(t *testing.T) {
func TestSameStreams(t *testing.T) { func TestSameStreams(t *testing.T) {
testName := "TestSameStreams" testName := "TestSameStreams"
annotationsA := map[string]string{"owned-by": "acid"} annotationsA := map[string]string{constants.EventStreamMemoryAnnotationKey: "500Mi"}
annotationsB := map[string]string{"owned-by": "foo"} annotationsB := map[string]string{constants.EventStreamMemoryAnnotationKey: "1Gi"}
stream1 := zalandov1.EventStream{ stream1 := zalandov1.EventStream{
EventStreamFlow: zalandov1.EventStreamFlow{}, EventStreamFlow: zalandov1.EventStreamFlow{},
@ -621,6 +662,13 @@ func TestSameStreams(t *testing.T) {
match: false, match: false,
reason: "event stream specs differ", reason: "event stream specs differ",
}, },
{
subTest: "event stream annotations differ",
streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, nil),
streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, annotationsA),
match: false,
reason: "event stream specs differ",
},
{ {
subTest: "event stream annotations differ", subTest: "event stream annotations differ",
streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, annotationsA), streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, annotationsA),

View File

@ -2,16 +2,19 @@ package constants
// PostgreSQL specific constants // PostgreSQL specific constants
const ( const (
EventStreamCRDApiVersion = "zalando.org/v1" EventStreamCRDApiVersion = "zalando.org/v1"
EventStreamCRDKind = "FabricEventStream" EventStreamCRDKind = "FabricEventStream"
EventStreamCRDName = "fabriceventstreams.zalando.org" EventStreamCRDName = "fabriceventstreams.zalando.org"
EventStreamSourcePGType = "PostgresLogicalReplication" EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes" EventStreamSourceSlotPrefix = "fes"
EventStreamSourcePluginType = "pgoutput" EventStreamSourcePluginType = "pgoutput"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret" EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
EventStreamSinkNakadiType = "Nakadi" EventStreamSinkNakadiType = "Nakadi"
EventStreamRecoveryNoneType = "None" EventStreamRecoveryDLQType = "DeadLetter"
EventStreamRecoveryDLQType = "DeadLetter" EventStreamRecoveryIgnoreType = "Ignore"
EventStreamRecoverySuffix = "dead-letter-queue" EventStreamRecoveryNoneType = "None"
EventStreamRecoverySuffix = "dead-letter-queue"
EventStreamCpuAnnotationKey = "fes.zalando.org/FES_CPU"
EventStreamMemoryAnnotationKey = "fes.zalando.org/FES_MEMORY"
) )