diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 4c75d07cf..51014b7b8 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -279,6 +279,7 @@ func (c *Cluster) syncStreams() error { c.streamApplications = appIds slots := make(map[string]map[string]string) + slotsToSync := make(map[string]map[string]string) publications := make(map[string]map[string]acidv1.StreamTable) requiredPatroniConfig := c.Spec.Patroni @@ -308,13 +309,6 @@ func (c *Cluster) syncStreams() error { } } - // no slots = no streams defined - if len(slots) > 0 { - requiredPatroniConfig.Slots = slots - } else { - return nil - } - // create publications to each created slot c.logger.Debug("syncing database publications") for publication, tables := range publications { @@ -323,7 +317,16 @@ func (c *Cluster) syncStreams() error { err = c.syncPublication(publication, dbName, tables) if err != nil { c.logger.Warningf("could not sync publication %q in database %q: %v", publication, dbName, err) + continue } + slotsToSync[publication] = slots[publication] + } + + // no slots to sync = no streams defined or publications created + if len(slotsToSync) > 0 { + requiredPatroniConfig.Slots = slotsToSync + } else { + return nil } // add extra logical slots to Patroni config