diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 0236925ca..917073364 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -72,7 +72,7 @@ func gatherApplicationIds(streams []acidv1.Stream) []string { return appIds } -func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) error { +func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) (bool, error) { errorMsg := "no pods found to update config" // if streams are defined wal_level must be switched to logical @@ -91,17 +91,17 @@ func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) error continue } - _, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters) + configPatched, _, err := c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters) if err != nil { errorMsg = fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) continue } // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used - return nil + return configPatched, nil } - return fmt.Errorf(errorMsg) + return false, fmt.Errorf(errorMsg) } func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error { @@ -317,10 +317,14 @@ func (c *Cluster) syncStreams() error { // add extra logical slots to Patroni config c.logger.Debug("syncing Postgres config for logical decoding") - err = c.syncPostgresConfig(requiredPatroniConfig) + requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig) if err != nil { return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err) } + if requiresRestart { + c.logger.Debugf("updated Postgres config. Server will be restarted and streams will get created during next sync") + return nil + } // next, create publications to each created slot c.logger.Debug("syncing database publications") diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index b68a36486..cdb6fa77a 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -21,7 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var requireMasterRestartWhenDecreased = []string{ +var requirePrimaryRestartWhenDecreased = []string{ "max_connections", "max_prepared_transactions", "max_locks_per_transaction", @@ -278,8 +278,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { func (c *Cluster) syncStatefulSet() error { var ( - restartWait uint32 - restartMasterFirst bool + restartWait uint32 + configPatched bool + restartPrimaryFirst bool ) podsToRecreate := make([]v1.Pod, 0) isSafeToRecreatePods := true @@ -420,22 +421,24 @@ func (c *Cluster) syncStatefulSet() error { // do not attempt a restart if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 { // compare config returned from Patroni with what is specified in the manifest - restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters) + configPatched, restartPrimaryFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters) if err != nil { c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err) continue } // it could take up to LoopWait to apply the config - time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2) - break + if configPatched { + time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2) + break + } } } - // restart instances if required + // restart instances if it is still pending remainingPods := make([]*v1.Pod, 0) skipRole := Master - if restartMasterFirst { + if restartPrimaryFirst { skipRole = Replica } for i, pod := range pods { @@ -474,6 +477,7 @@ func (c *Cluster) syncStatefulSet() error { c.logger.Warningf("postpone pod recreation until next sync") } } + return nil } @@ -533,10 +537,11 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters // (like max_connections) have changed and if necessary sets it via the Patroni API -func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectivePatroniConfig, desiredPatroniConfig acidv1.Patroni, effectivePgParameters, desiredPgParameters map[string]string) (bool, error) { +func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectivePatroniConfig, desiredPatroniConfig acidv1.Patroni, effectivePgParameters, desiredPgParameters map[string]string) (bool, bool, error) { configToSet := make(map[string]interface{}) parametersToSet := make(map[string]string) - restartMaster := make([]bool, 0) + restartPrimary := make([]bool, 0) + configPatched := false requiresMasterRestart := false // compare effective and desired Patroni config options @@ -581,22 +586,23 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv effectiveValue := effectivePgParameters[desiredOption] if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) { parametersToSet[desiredOption] = desiredValue - if util.SliceContains(requireMasterRestartWhenDecreased, desiredOption) { + if util.SliceContains(requirePrimaryRestartWhenDecreased, desiredOption) { effectiveValueNum, errConv := strconv.Atoi(effectiveValue) desiredValueNum, errConv2 := strconv.Atoi(desiredValue) if errConv != nil || errConv2 != nil { continue } if effectiveValueNum > desiredValueNum { - restartMaster = append(restartMaster, true) + restartPrimary = append(restartPrimary, true) continue } } - restartMaster = append(restartMaster, false) + restartPrimary = append(restartPrimary, false) } } - if !util.SliceContains(restartMaster, false) && len(configToSet) == 0 { + // check if there exist only config updates that require a restart of the primary + if !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 { requiresMasterRestart = true } @@ -605,7 +611,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv } if len(configToSet) == 0 { - return false, nil + return configPatched, requiresMasterRestart, nil } configToSetJson, err := json.Marshal(configToSet) @@ -619,10 +625,11 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", podName, configToSetJson) if err = c.patroni.SetConfig(pod, configToSet); err != nil { - return requiresMasterRestart, fmt.Errorf("could not patch postgres parameters within pod %s: %v", podName, err) + return configPatched, requiresMasterRestart, fmt.Errorf("could not patch postgres parameters within pod %s: %v", podName, err) } + configPatched = true - return requiresMasterRestart, nil + return configPatched, requiresMasterRestart, nil } func (c *Cluster) syncSecrets() error { diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index ff7a03103..cc7554b0e 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -204,10 +204,10 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { // simulate existing config that differs from cluster.Spec tests := []struct { - subtest string - patroni acidv1.Patroni - pgParams map[string]string - restartMaster bool + subtest string + patroni acidv1.Patroni + pgParams map[string]string + restartPrimary bool }{ { subtest: "Patroni and Postgresql.Parameters differ - restart replica first", @@ -218,7 +218,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "500", // desired 200 "max_connections": "100", // desired 50 }, - restartMaster: false, + restartPrimary: false, }, { subtest: "multiple Postgresql.Parameters differ - restart replica first", @@ -229,7 +229,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "500", // desired 200 "max_connections": "100", // desired 50 }, - restartMaster: false, + restartPrimary: false, }, { subtest: "desired max_connections bigger - restart replica first", @@ -240,7 +240,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "200", "max_connections": "30", // desired 50 }, - restartMaster: false, + restartPrimary: false, }, { subtest: "desired max_connections smaller - restart master first", @@ -251,15 +251,18 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { "log_min_duration_statement": "200", "max_connections": "100", // desired 50 }, - restartMaster: true, + restartPrimary: true, }, } for _, tt := range tests { - requireMasterRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters) + configPatched, requirePrimaryRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters) assert.NoError(t, err) - if requireMasterRestart != tt.restartMaster { - t.Errorf("%s - %s: unexpect master restart strategy, got %v, expected %v", testName, tt.subtest, requireMasterRestart, tt.restartMaster) + if configPatched != true { + t.Errorf("%s - %s: expected config update did not happen", testName, tt.subtest) + } + if requirePrimaryRestart != tt.restartPrimary { + t.Errorf("%s - %s: wrong master restart strategy, got restart %v, expected restart %v", testName, tt.subtest, requirePrimaryRestart, tt.restartPrimary) } } }