create streams only after postgres instances were restarted (#2034)

* create streams only after postgres instances were restarted
* checkAndSetGlobalPostgreSQLConfiguration returns if config has been patched
* restart can be pending even without a config patch
This commit is contained in:
Felix Kunde 2022-09-19 15:25:55 +02:00 committed by GitHub
parent d209612b18
commit e0c4603057
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 33 deletions

View File

@ -72,7 +72,7 @@ func gatherApplicationIds(streams []acidv1.Stream) []string {
return appIds
}
func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) error {
func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) (bool, error) {
errorMsg := "no pods found to update config"
// if streams are defined wal_level must be switched to logical
@ -91,17 +91,17 @@ func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) error
continue
}
_, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters)
configPatched, _, err := c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters)
if err != nil {
errorMsg = fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
continue
}
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used
return nil
return configPatched, nil
}
return fmt.Errorf(errorMsg)
return false, fmt.Errorf(errorMsg)
}
func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error {
@ -317,10 +317,14 @@ func (c *Cluster) syncStreams() error {
// add extra logical slots to Patroni config
c.logger.Debug("syncing Postgres config for logical decoding")
err = c.syncPostgresConfig(requiredPatroniConfig)
requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig)
if err != nil {
return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err)
}
if requiresRestart {
c.logger.Debugf("updated Postgres config. Server will be restarted and streams will get created during next sync")
return nil
}
// next, create publications to each created slot
c.logger.Debug("syncing database publications")

View File

@ -21,7 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var requireMasterRestartWhenDecreased = []string{
var requirePrimaryRestartWhenDecreased = []string{
"max_connections",
"max_prepared_transactions",
"max_locks_per_transaction",
@ -278,8 +278,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
func (c *Cluster) syncStatefulSet() error {
var (
restartWait uint32
restartMasterFirst bool
restartWait uint32
configPatched bool
restartPrimaryFirst bool
)
podsToRecreate := make([]v1.Pod, 0)
isSafeToRecreatePods := true
@ -420,22 +421,24 @@ func (c *Cluster) syncStatefulSet() error {
// do not attempt a restart
if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 {
// compare config returned from Patroni with what is specified in the manifest
restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters)
configPatched, restartPrimaryFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters)
if err != nil {
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err)
continue
}
// it could take up to LoopWait to apply the config
time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2)
break
if configPatched {
time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2)
break
}
}
}
// restart instances if required
// restart instances if it is still pending
remainingPods := make([]*v1.Pod, 0)
skipRole := Master
if restartMasterFirst {
if restartPrimaryFirst {
skipRole = Replica
}
for i, pod := range pods {
@ -474,6 +477,7 @@ func (c *Cluster) syncStatefulSet() error {
c.logger.Warningf("postpone pod recreation until next sync")
}
}
return nil
}
@ -533,10 +537,11 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri
// checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
// (like max_connections) have changed and if necessary sets it via the Patroni API
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectivePatroniConfig, desiredPatroniConfig acidv1.Patroni, effectivePgParameters, desiredPgParameters map[string]string) (bool, error) {
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectivePatroniConfig, desiredPatroniConfig acidv1.Patroni, effectivePgParameters, desiredPgParameters map[string]string) (bool, bool, error) {
configToSet := make(map[string]interface{})
parametersToSet := make(map[string]string)
restartMaster := make([]bool, 0)
restartPrimary := make([]bool, 0)
configPatched := false
requiresMasterRestart := false
// compare effective and desired Patroni config options
@ -581,22 +586,23 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
effectiveValue := effectivePgParameters[desiredOption]
if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) {
parametersToSet[desiredOption] = desiredValue
if util.SliceContains(requireMasterRestartWhenDecreased, desiredOption) {
if util.SliceContains(requirePrimaryRestartWhenDecreased, desiredOption) {
effectiveValueNum, errConv := strconv.Atoi(effectiveValue)
desiredValueNum, errConv2 := strconv.Atoi(desiredValue)
if errConv != nil || errConv2 != nil {
continue
}
if effectiveValueNum > desiredValueNum {
restartMaster = append(restartMaster, true)
restartPrimary = append(restartPrimary, true)
continue
}
}
restartMaster = append(restartMaster, false)
restartPrimary = append(restartPrimary, false)
}
}
if !util.SliceContains(restartMaster, false) && len(configToSet) == 0 {
// check if there exist only config updates that require a restart of the primary
if !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 {
requiresMasterRestart = true
}
@ -605,7 +611,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
}
if len(configToSet) == 0 {
return false, nil
return configPatched, requiresMasterRestart, nil
}
configToSetJson, err := json.Marshal(configToSet)
@ -619,10 +625,11 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s",
podName, configToSetJson)
if err = c.patroni.SetConfig(pod, configToSet); err != nil {
return requiresMasterRestart, fmt.Errorf("could not patch postgres parameters within pod %s: %v", podName, err)
return configPatched, requiresMasterRestart, fmt.Errorf("could not patch postgres parameters within pod %s: %v", podName, err)
}
configPatched = true
return requiresMasterRestart, nil
return configPatched, requiresMasterRestart, nil
}
func (c *Cluster) syncSecrets() error {

View File

@ -204,10 +204,10 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
// simulate existing config that differs from cluster.Spec
tests := []struct {
subtest string
patroni acidv1.Patroni
pgParams map[string]string
restartMaster bool
subtest string
patroni acidv1.Patroni
pgParams map[string]string
restartPrimary bool
}{
{
subtest: "Patroni and Postgresql.Parameters differ - restart replica first",
@ -218,7 +218,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
"log_min_duration_statement": "500", // desired 200
"max_connections": "100", // desired 50
},
restartMaster: false,
restartPrimary: false,
},
{
subtest: "multiple Postgresql.Parameters differ - restart replica first",
@ -229,7 +229,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
"log_min_duration_statement": "500", // desired 200
"max_connections": "100", // desired 50
},
restartMaster: false,
restartPrimary: false,
},
{
subtest: "desired max_connections bigger - restart replica first",
@ -240,7 +240,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
"log_min_duration_statement": "200",
"max_connections": "30", // desired 50
},
restartMaster: false,
restartPrimary: false,
},
{
subtest: "desired max_connections smaller - restart master first",
@ -251,15 +251,18 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
"log_min_duration_statement": "200",
"max_connections": "100", // desired 50
},
restartMaster: true,
restartPrimary: true,
},
}
for _, tt := range tests {
requireMasterRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters)
configPatched, requirePrimaryRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters)
assert.NoError(t, err)
if requireMasterRestart != tt.restartMaster {
t.Errorf("%s - %s: unexpect master restart strategy, got %v, expected %v", testName, tt.subtest, requireMasterRestart, tt.restartMaster)
if configPatched != true {
t.Errorf("%s - %s: expected config update did not happen", testName, tt.subtest)
}
if requirePrimaryRestart != tt.restartPrimary {
t.Errorf("%s - %s: wrong master restart strategy, got restart %v, expected restart %v", testName, tt.subtest, requirePrimaryRestart, tt.restartPrimary)
}
}
}