diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 917073364..4c75d07cf 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -315,6 +315,17 @@ func (c *Cluster) syncStreams() error { return nil } + // create publications to each created slot + c.logger.Debug("syncing database publications") + for publication, tables := range publications { + // but first check for existing publications + dbName := slots[publication]["database"] + err = c.syncPublication(publication, dbName, tables) + if err != nil { + c.logger.Warningf("could not sync publication %q in database %q: %v", publication, dbName, err) + } + } + // add extra logical slots to Patroni config c.logger.Debug("syncing Postgres config for logical decoding") requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig) @@ -326,17 +337,7 @@ func (c *Cluster) syncStreams() error { return nil } - // next, create publications to each created slot - c.logger.Debug("syncing database publications") - for publication, tables := range publications { - // but first check for existing publications - dbName := slots[publication]["database"] - err = c.syncPublication(publication, dbName, tables) - if err != nil { - c.logger.Warningf("could not sync publication %q in database %q: %v", publication, dbName, err) - } - } - + // after Postgres was restarted we can create stream CRDs err = c.createOrUpdateStreams() if err != nil { return err