From 11f3715ed1ba0e88dc2621bcc790103d0b1410fe Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Thu, 29 Jul 2021 14:29:42 +0200 Subject: [PATCH] check manifest settings for logical decoding before creating streams --- pkg/cluster/streams.go | 42 +++++++++++++++++++++++++++++++++-- pkg/cluster/streams_test.go | 17 +++++++++++++- pkg/util/constants/streams.go | 2 +- 3 files changed, 57 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 6f1b7efec..b84f4424c 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -19,8 +19,13 @@ var outboxTableNameTemplate config.StringTemplate = "{table}_{eventtype}_outbox" func (c *Cluster) createStreams() error { c.setProcessName("creating streams") + logicalDecodingEnabled, err := c.logicalDecodingEnabled() + if !logicalDecodingEnabled || err != nil { + return fmt.Errorf("logical decoding setup incomplete: %v", err) + } + fes := c.generateFabricEventStream() - _, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) + _, err = c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("could not create event stream custom resource: %v", err) } @@ -28,6 +33,29 @@ func (c *Cluster) createStreams() error { return nil } +func (c *Cluster) logicalDecodingEnabled() (bool, error) { + var errors []string + + walLevel := c.Spec.PostgresqlParam.Parameters["wal_level"] + if walLevel == "" || walLevel != "logical" { + errors = append(errors, "setting 'wal_level: logical' missing under spec.postgresql.parameters") + } + + for _, stream := range c.Spec.Streams { + slotName := c.getLogicalReplicationSlot(stream.Database) + + if slotName == "" { + errors = append(errors, fmt.Sprintf("no logical replication slot defined under spec.patroni.slots for database %q", stream.Database)) + } + } + + if len(errors) > 0 { + return false, fmt.Errorf("logical decoding setup incomplete: %v", errors) + } + + return true, nil +} + func (c *Cluster) syncStreamDbResources() error { for _, stream := range c.Spec.Streams { @@ -148,7 +176,7 @@ func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTabl func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Connection { return zalandov1alpha1.Connection{ Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), - SlotName: constants.EventStreamSourceSlotName, + SlotName: c.getLogicalReplicationSlot(database), DBAuth: zalandov1alpha1.DBAuth{ Type: constants.EventStreamSourceAuthType, Name: c.credentialSecretNameForCluster(user, c.ClusterName), @@ -158,6 +186,16 @@ func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Con } } +func (c *Cluster) getLogicalReplicationSlot(database string) string { + for slotName, slot := range c.Spec.Patroni.Slots { + if strings.HasPrefix(slotName, constants.EventStreamSourceSlotPrefix) && slot["type"] == "logical" && slot["database"] == database { + return slotName + } + } + + return "" +} + func (c *Cluster) syncStreams() error { c.setProcessName("syncing streams") diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index aebd29ff6..06f46e8fc 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -40,6 +40,20 @@ func TestGenerateFabricEventStream(t *testing.T) { Databases: map[string]string{ "foo": "foo_user", }, + Patroni: acidv1.Patroni{ + Slots: map[string]map[string]string{ + "fes": { + "type": "logical", + "database": "foo", + "plugin": "wal2json", + }, + }, + }, + PostgresqlParam: acidv1.PostgresqlParam{ + Parameters: map[string]string{ + "wal_level": "logical", + }, + }, Streams: []acidv1.Stream{ { Type: "nakadi", @@ -93,7 +107,8 @@ func TestGenerateFabricEventStream(t *testing.T) { }, }, client, pg, logger, eventRecorder) - cluster.syncStreams() + err := cluster.syncStreams() + assert.NoError(t, err) streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name+constants.FESsuffix, metav1.GetOptions{}) assert.NoError(t, err) diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go index c8360c11b..36eb376cf 100644 --- a/pkg/util/constants/streams.go +++ b/pkg/util/constants/streams.go @@ -4,7 +4,7 @@ package constants const ( FESsuffix = "-event-streams" EventStreamSourcePGType = "PostgresLogicalReplication" - EventStreamSourceSlotName = "fes" + EventStreamSourceSlotPrefix = "fes" EventStreamSourceAuthType = "DatabaseAuthenticationSecret" EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent" EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent"