diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 978b570b0..f1ea736ce 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -48,7 +48,7 @@ const ( getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename) FROM pg_publication p - JOIN pg_publication_tables pt ON pt.pubname = p.pubname + LEFT JOIN pg_publication_tables pt ON pt.pubname = p.pubname GROUP BY p.pubname;` createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');` alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;` diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 51014b7b8..d911e6a83 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -273,6 +273,17 @@ func (c *Cluster) syncStreams() error { return nil } + // update config to set wal_level: logical + requiredPatroniConfig := c.Spec.Patroni + requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig) + if err != nil { + return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err) + } + if requiresRestart { + c.logger.Debugf("updated Postgres config. Server will be restarted and streams will get created during next sync") + return nil + } + // fetch different application IDs from streams section // there will be a separate event stream resource for each ID appIds := gatherApplicationIds(c.Spec.Streams) @@ -282,7 +293,6 @@ func (c *Cluster) syncStreams() error { slotsToSync := make(map[string]map[string]string) publications := make(map[string]map[string]acidv1.StreamTable) - requiredPatroniConfig := c.Spec.Patroni if len(requiredPatroniConfig.Slots) > 0 { slots = requiredPatroniConfig.Slots } @@ -330,15 +340,10 @@ func (c *Cluster) syncStreams() error { } // add extra logical slots to Patroni config - c.logger.Debug("syncing Postgres config for logical decoding") - requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig) + _, err = c.syncPostgresConfig(requiredPatroniConfig) if err != nil { return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err) } - if requiresRestart { - c.logger.Debugf("updated Postgres config. Server will be restarted and streams will get created during next sync") - return nil - } // after Postgres was restarted we can create stream CRDs err = c.createOrUpdateStreams()