intermediate commit

This commit is contained in:
Felix Kunde 2021-07-30 14:11:12 +02:00
parent 11f3715ed1
commit 70df4515bc
1 changed files with 15 additions and 11 deletions

View File

@ -19,8 +19,8 @@ var outboxTableNameTemplate config.StringTemplate = "{table}_{eventtype}_outbox"
func (c *Cluster) createStreams() error { func (c *Cluster) createStreams() error {
c.setProcessName("creating streams") c.setProcessName("creating streams")
logicalDecodingEnabled, err := c.logicalDecodingEnabled() err := c.syncLogicalDecoding()
if !logicalDecodingEnabled || err != nil { if err != nil {
return fmt.Errorf("logical decoding setup incomplete: %v", err) return fmt.Errorf("logical decoding setup incomplete: %v", err)
} }
@ -33,27 +33,31 @@ func (c *Cluster) createStreams() error {
return nil return nil
} }
func (c *Cluster) logicalDecodingEnabled() (bool, error) { func (c *Cluster) syncLogicalDecoding() error {
var errors []string
walLevel := c.Spec.PostgresqlParam.Parameters["wal_level"] walLevel := c.Spec.PostgresqlParam.Parameters["wal_level"]
if walLevel == "" || walLevel != "logical" { if walLevel == "" || walLevel != "logical" {
errors = append(errors, "setting 'wal_level: logical' missing under spec.postgresql.parameters") c.logger.Debugf("setting wal level to 'logical' in postgres configuration")
pods, err := c.listPods()
if err != nil || len(pods) == 0 {
return err
}
for _, pod := range pods {
if err := c.patroni.SetPostgresParameters(&pod, map[string]string{"wal_level": "logical"}); err == nil {
return fmt.Errorf("could not set wal_level to 'logical' calling Patroni REST API: %v", err)
}
}
} }
for _, stream := range c.Spec.Streams { for _, stream := range c.Spec.Streams {
slotName := c.getLogicalReplicationSlot(stream.Database) slotName := c.getLogicalReplicationSlot(stream.Database)
if slotName == "" { if slotName == "" {
errors = append(errors, fmt.Sprintf("no logical replication slot defined under spec.patroni.slots for database %q", stream.Database)) c.logger.Debugf("creating logical replication slot %d in database %d", constants.EventStreamSourceSlotPrefix+stream.Database, stream.Database)
} }
} }
if len(errors) > 0 { return nil
return false, fmt.Errorf("logical decoding setup incomplete: %v", errors)
}
return true, nil
} }
func (c *Cluster) syncStreamDbResources() error { func (c *Cluster) syncStreamDbResources() error {