From 70df4515bcb0f6de73bfff4e0cd2aa089c47372f Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 30 Jul 2021 14:11:12 +0200 Subject: [PATCH] intermediate commit --- pkg/cluster/streams.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index b84f4424c..a2adb1b15 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -19,8 +19,8 @@ var outboxTableNameTemplate config.StringTemplate = "{table}_{eventtype}_outbox" func (c *Cluster) createStreams() error { c.setProcessName("creating streams") - logicalDecodingEnabled, err := c.logicalDecodingEnabled() - if !logicalDecodingEnabled || err != nil { + err := c.syncLogicalDecoding() + if err != nil { return fmt.Errorf("logical decoding setup incomplete: %v", err) } @@ -33,27 +33,31 @@ func (c *Cluster) createStreams() error { return nil } -func (c *Cluster) logicalDecodingEnabled() (bool, error) { - var errors []string +func (c *Cluster) syncLogicalDecoding() error { walLevel := c.Spec.PostgresqlParam.Parameters["wal_level"] if walLevel == "" || walLevel != "logical" { - errors = append(errors, "setting 'wal_level: logical' missing under spec.postgresql.parameters") + c.logger.Debugf("setting wal level to 'logical' in postgres configuration") + pods, err := c.listPods() + if err != nil || len(pods) == 0 { + return err + } + for _, pod := range pods { + if err := c.patroni.SetPostgresParameters(&pod, map[string]string{"wal_level": "logical"}); err == nil { + return fmt.Errorf("could not set wal_level to 'logical' calling Patroni REST API: %v", err) + } + } } for _, stream := range c.Spec.Streams { slotName := c.getLogicalReplicationSlot(stream.Database) if slotName == "" { - errors = append(errors, fmt.Sprintf("no logical replication slot defined under spec.patroni.slots for database %q", stream.Database)) + c.logger.Debugf("creating logical replication slot %d in database %d", constants.EventStreamSourceSlotPrefix+stream.Database, stream.Database) } } - if len(errors) > 0 { - return false, fmt.Errorf("logical decoding setup incomplete: %v", errors) - } - - return true, nil + return nil } func (c *Cluster) syncStreamDbResources() error {