first sync wal_level then publications (#2109)
This commit is contained in:
parent
2a54e49e9f
commit
528bb81a78
|
|
@ -48,7 +48,7 @@ const (
|
||||||
|
|
||||||
getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename)
|
getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename)
|
||||||
FROM pg_publication p
|
FROM pg_publication p
|
||||||
JOIN pg_publication_tables pt ON pt.pubname = p.pubname
|
LEFT JOIN pg_publication_tables pt ON pt.pubname = p.pubname
|
||||||
GROUP BY p.pubname;`
|
GROUP BY p.pubname;`
|
||||||
createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');`
|
createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');`
|
||||||
alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;`
|
alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;`
|
||||||
|
|
|
||||||
|
|
@ -273,6 +273,17 @@ func (c *Cluster) syncStreams() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update config to set wal_level: logical
|
||||||
|
requiredPatroniConfig := c.Spec.Patroni
|
||||||
|
requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err)
|
||||||
|
}
|
||||||
|
if requiresRestart {
|
||||||
|
c.logger.Debugf("updated Postgres config. Server will be restarted and streams will get created during next sync")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// fetch different application IDs from streams section
|
// fetch different application IDs from streams section
|
||||||
// there will be a separate event stream resource for each ID
|
// there will be a separate event stream resource for each ID
|
||||||
appIds := gatherApplicationIds(c.Spec.Streams)
|
appIds := gatherApplicationIds(c.Spec.Streams)
|
||||||
|
|
@ -282,7 +293,6 @@ func (c *Cluster) syncStreams() error {
|
||||||
slotsToSync := make(map[string]map[string]string)
|
slotsToSync := make(map[string]map[string]string)
|
||||||
publications := make(map[string]map[string]acidv1.StreamTable)
|
publications := make(map[string]map[string]acidv1.StreamTable)
|
||||||
|
|
||||||
requiredPatroniConfig := c.Spec.Patroni
|
|
||||||
if len(requiredPatroniConfig.Slots) > 0 {
|
if len(requiredPatroniConfig.Slots) > 0 {
|
||||||
slots = requiredPatroniConfig.Slots
|
slots = requiredPatroniConfig.Slots
|
||||||
}
|
}
|
||||||
|
|
@ -330,15 +340,10 @@ func (c *Cluster) syncStreams() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// add extra logical slots to Patroni config
|
// add extra logical slots to Patroni config
|
||||||
c.logger.Debug("syncing Postgres config for logical decoding")
|
_, err = c.syncPostgresConfig(requiredPatroniConfig)
|
||||||
requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err)
|
return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err)
|
||||||
}
|
}
|
||||||
if requiresRestart {
|
|
||||||
c.logger.Debugf("updated Postgres config. Server will be restarted and streams will get created during next sync")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// after Postgres was restarted we can create stream CRDs
|
// after Postgres was restarted we can create stream CRDs
|
||||||
err = c.createOrUpdateStreams()
|
err = c.createOrUpdateStreams()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue