diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index bb03c2e36..683740af3 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -79,38 +79,6 @@ func gatherApplicationIds(streams []acidv1.Stream) []string { return appIds } -func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) (bool, error) { - errorMsg := "no pods found to update config" - - // if streams are defined wal_level must be switched to logical - requiredPgParameters := map[string]string{"wal_level": "logical"} - - // apply config changes in pods - pods, err := c.listPods() - if err != nil { - errorMsg = fmt.Sprintf("could not list pods of the statefulset: %v", err) - } - for i, pod := range pods { - podName := util.NameFromMeta(pods[i].ObjectMeta) - effectivePatroniConfig, effectivePgParameters, err := c.patroni.GetConfig(&pod) - if err != nil { - errorMsg = fmt.Sprintf("could not get Postgres config from pod %s: %v", podName, err) - continue - } - - configPatched, _, err := c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters) - if err != nil { - errorMsg = fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) - continue - } - - // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used - return configPatched, nil - } - - return false, fmt.Errorf(errorMsg) -} - func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error { createPublications := make(map[string]string) alterPublications := make(map[string]string) @@ -273,7 +241,6 @@ func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Co } func (c *Cluster) syncStreams() error { - c.setProcessName("syncing streams") _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{}) @@ -282,20 +249,10 @@ func (c *Cluster) syncStreams() error { return nil } - // update config to set wal_level: logical - requiredPatroniConfig := c.Spec.Patroni - requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig) - if err != nil { - return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err) - } - if requiresRestart { - c.logger.Debugf("updated Postgres config. Server will be restarted and streams will get created during next sync") - return nil - } - slots := make(map[string]map[string]string) slotsToSync := make(map[string]map[string]string) publications := make(map[string]map[string]acidv1.StreamTable) + requiredPatroniConfig := c.Spec.Patroni if len(requiredPatroniConfig.Slots) > 0 { slots = requiredPatroniConfig.Slots @@ -343,13 +300,19 @@ func (c *Cluster) syncStreams() error { return nil } - // add extra logical slots to Patroni config - _, err = c.syncPostgresConfig(requiredPatroniConfig) + c.logger.Debug("syncing logical replication slots") + pods, err := c.listPods() if err != nil { - return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err) + return fmt.Errorf("could not get list of pods to sync logical replication slots via Patroni API: %v", err) } - // after Postgres was restarted we can create stream CRDs + // sync logical replication slots in Patroni config + configPatched, _, _, err := c.syncPatroniConfig(pods, requiredPatroniConfig, nil) + if err != nil { + c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err) + } + + // finally sync stream CRDs err = c.createOrUpdateStreams() if err != nil { return err diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 797a07256..7fc97d9ea 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -395,69 +395,30 @@ func (c *Cluster) syncStatefulSet() error { } } - // Apply special PostgreSQL parameters that can only be set via the Patroni API. + // apply PostgreSQL parameters that can only be set via the Patroni API. // it is important to do it after the statefulset pods are there, but before the rolling update // since those parameters require PostgreSQL restart. pods, err = c.listPods() if err != nil { - c.logger.Warnf("could not get list of pods to apply special PostgreSQL parameters only to be set via Patroni API: %v", err) + c.logger.Warnf("could not get list of pods to apply PostgreSQL parameters only to be set via Patroni API: %v", err) } - // get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs - // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used - for i, pod := range pods { - patroniConfig, pgParameters, err := c.getPatroniConfig(&pod) - if err != nil { - c.logger.Warningf("%v", err) - isSafeToRecreatePods = false - continue - } - restartWait = patroniConfig.LoopWait - - // empty config probably means cluster is not fully initialized yet, e.g. restoring from backup - // do not attempt a restart - if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 { - // compare config returned from Patroni with what is specified in the manifest - configPatched, restartPrimaryFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters) - if err != nil { - c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err) - continue - } - - // it could take up to LoopWait to apply the config - if configPatched { - time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2) - break - } - } + requiredPgParameters := c.Spec.Parameters + // if streams are defined wal_level must be switched to logical + if len(c.Spec.Streams) > 0 { + requiredPgParameters["wal_level"] = "logical" } - // restart instances if it is still pending - remainingPods := make([]*v1.Pod, 0) - skipRole := Master - if restartPrimaryFirst { - skipRole = Replica - } - for i, pod := range pods { - role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) - if role == skipRole { - remainingPods = append(remainingPods, &pods[i]) - continue - } - if err = c.restartInstance(&pod, restartWait); err != nil { - c.logger.Errorf("%v", err) - isSafeToRecreatePods = false - } + // sync Patroni config + if configPatched, restartPrimaryFirst, restartWait, err = c.syncPatroniConfig(pods, c.Spec.Patroni, requiredPgParameters); err != nil { + c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err) + isSafeToRecreatePods = false } - // in most cases only the master should be left to restart - if len(remainingPods) > 0 { - for _, remainingPod := range remainingPods { - if err = c.restartInstance(remainingPod, restartWait); err != nil { - c.logger.Errorf("%v", err) - isSafeToRecreatePods = false - } - } + // restart Postgres where it is still pending + if err = c.restartInstances(pods, restartWait, restartPrimaryFirst); err != nil { + c.logger.Errorf("errors while restarting Postgres in pods via Patroni API: %v", err) + isSafeToRecreatePods = false } // if we get here we also need to re-create the pods (either leftovers from the old @@ -471,13 +432,98 @@ func (c *Cluster) syncStatefulSet() error { } c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") } else { - c.logger.Warningf("postpone pod recreation until next sync") + c.logger.Warningf("postpone pod recreation until next sync because of errors during config sync") } } return nil } +func (c *Cluster) syncPatroniConfig(pods []v1.Pod, requiredPatroniConfig acidv1.Patroni, requiredPgParameters map[string]string) (bool, bool, uint32, error) { + var ( + effectivePatroniConfig acidv1.Patroni + effectivePgParameters map[string]string + loopWait uint32 + configPatched bool + restartPrimaryFirst bool + err error + ) + + errors := make([]string, 0) + + // get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs + for i, pod := range pods { + podName := util.NameFromMeta(pods[i].ObjectMeta) + effectivePatroniConfig, effectivePgParameters, err = c.patroni.GetConfig(&pod) + if err != nil { + errors = append(errors, fmt.Sprintf("could not get Postgres config from pod %s: %v", podName, err)) + continue + } + loopWait = effectivePatroniConfig.LoopWait + + // empty config probably means cluster is not fully initialized yet, e.g. restoring from backup + if reflect.DeepEqual(effectivePatroniConfig, acidv1.Patroni{}) || len(effectivePgParameters) == 0 { + errors = append(errors, fmt.Sprintf("empty Patroni config on pod %s - skipping config patch", podName)) + } else { + configPatched, restartPrimaryFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters) + if err != nil { + errors = append(errors, fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)) + continue + } + + // it could take up to LoopWait to apply the config + if configPatched { + time.Sleep(time.Duration(loopWait)*time.Second + time.Second*2) + // Patroni's config endpoint is just a "proxy" to DCS. + // It is enough to patch it only once and it doesn't matter which pod is used + break + } + } + } + + if len(errors) > 0 { + err = fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + + return configPatched, restartPrimaryFirst, loopWait, err +} + +func (c *Cluster) restartInstances(pods []v1.Pod, restartWait uint32, restartPrimaryFirst bool) (err error) { + errors := make([]string, 0) + remainingPods := make([]*v1.Pod, 0) + + skipRole := Master + if restartPrimaryFirst { + skipRole = Replica + } + + for i, pod := range pods { + role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) + if role == skipRole { + remainingPods = append(remainingPods, &pods[i]) + continue + } + if err = c.restartInstance(&pod, restartWait); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + } + + // in most cases only the master should be left to restart + if len(remainingPods) > 0 { + for _, remainingPod := range remainingPods { + if err = c.restartInstance(remainingPod, restartWait); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + } + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + + return nil +} + func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error { // if the config update requires a restart, call Patroni restart podName := util.NameFromMeta(pod.ObjectMeta)