set wal_level for streams in statefulSet sync (#2187)
* set wal_level for streams in statefulSet sync
This commit is contained in:
		
							parent
							
								
									4741b3f734
								
							
						
					
					
						commit
						b9165190e1
					
				| 
						 | 
					@ -79,38 +79,6 @@ func gatherApplicationIds(streams []acidv1.Stream) []string {
 | 
				
			||||||
	return appIds
 | 
						return appIds
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) (bool, error) {
 | 
					 | 
				
			||||||
	errorMsg := "no pods found to update config"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// if streams are defined wal_level must be switched to logical
 | 
					 | 
				
			||||||
	requiredPgParameters := map[string]string{"wal_level": "logical"}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// apply config changes in pods
 | 
					 | 
				
			||||||
	pods, err := c.listPods()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		errorMsg = fmt.Sprintf("could not list pods of the statefulset: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for i, pod := range pods {
 | 
					 | 
				
			||||||
		podName := util.NameFromMeta(pods[i].ObjectMeta)
 | 
					 | 
				
			||||||
		effectivePatroniConfig, effectivePgParameters, err := c.patroni.GetConfig(&pod)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			errorMsg = fmt.Sprintf("could not get Postgres config from pod %s: %v", podName, err)
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		configPatched, _, err := c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			errorMsg = fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used
 | 
					 | 
				
			||||||
		return configPatched, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return false, fmt.Errorf(errorMsg)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error {
 | 
					func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error {
 | 
				
			||||||
	createPublications := make(map[string]string)
 | 
						createPublications := make(map[string]string)
 | 
				
			||||||
	alterPublications := make(map[string]string)
 | 
						alterPublications := make(map[string]string)
 | 
				
			||||||
| 
						 | 
					@ -273,7 +241,6 @@ func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Co
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cluster) syncStreams() error {
 | 
					func (c *Cluster) syncStreams() error {
 | 
				
			||||||
 | 
					 | 
				
			||||||
	c.setProcessName("syncing streams")
 | 
						c.setProcessName("syncing streams")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{})
 | 
						_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{})
 | 
				
			||||||
| 
						 | 
					@ -282,20 +249,10 @@ func (c *Cluster) syncStreams() error {
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// update config to set wal_level: logical
 | 
					 | 
				
			||||||
	requiredPatroniConfig := c.Spec.Patroni
 | 
					 | 
				
			||||||
	requiresRestart, err := c.syncPostgresConfig(requiredPatroniConfig)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if requiresRestart {
 | 
					 | 
				
			||||||
		c.logger.Debugf("updated Postgres config. Server will be restarted and streams will get created during next sync")
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	slots := make(map[string]map[string]string)
 | 
						slots := make(map[string]map[string]string)
 | 
				
			||||||
	slotsToSync := make(map[string]map[string]string)
 | 
						slotsToSync := make(map[string]map[string]string)
 | 
				
			||||||
	publications := make(map[string]map[string]acidv1.StreamTable)
 | 
						publications := make(map[string]map[string]acidv1.StreamTable)
 | 
				
			||||||
 | 
						requiredPatroniConfig := c.Spec.Patroni
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if len(requiredPatroniConfig.Slots) > 0 {
 | 
						if len(requiredPatroniConfig.Slots) > 0 {
 | 
				
			||||||
		slots = requiredPatroniConfig.Slots
 | 
							slots = requiredPatroniConfig.Slots
 | 
				
			||||||
| 
						 | 
					@ -343,13 +300,19 @@ func (c *Cluster) syncStreams() error {
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// add extra logical slots to Patroni config
 | 
						c.logger.Debug("syncing logical replication slots")
 | 
				
			||||||
	_, err = c.syncPostgresConfig(requiredPatroniConfig)
 | 
						pods, err := c.listPods()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err)
 | 
							return fmt.Errorf("could not get list of pods to sync logical replication slots via Patroni API: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// after Postgres was restarted we can create stream CRDs
 | 
						// sync logical replication slots in Patroni config
 | 
				
			||||||
 | 
						configPatched, _, _, err := c.syncPatroniConfig(pods, requiredPatroniConfig, nil)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// finally sync stream CRDs
 | 
				
			||||||
	err = c.createOrUpdateStreams()
 | 
						err = c.createOrUpdateStreams()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -395,69 +395,30 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Apply special PostgreSQL parameters that can only be set via the Patroni API.
 | 
						// apply PostgreSQL parameters that can only be set via the Patroni API.
 | 
				
			||||||
	// it is important to do it after the statefulset pods are there, but before the rolling update
 | 
						// it is important to do it after the statefulset pods are there, but before the rolling update
 | 
				
			||||||
	// since those parameters require PostgreSQL restart.
 | 
						// since those parameters require PostgreSQL restart.
 | 
				
			||||||
	pods, err = c.listPods()
 | 
						pods, err = c.listPods()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		c.logger.Warnf("could not get list of pods to apply special PostgreSQL parameters only to be set via Patroni API: %v", err)
 | 
							c.logger.Warnf("could not get list of pods to apply PostgreSQL parameters only to be set via Patroni API: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
 | 
						requiredPgParameters := c.Spec.Parameters
 | 
				
			||||||
	// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used
 | 
						// if streams are defined wal_level must be switched to logical
 | 
				
			||||||
	for i, pod := range pods {
 | 
						if len(c.Spec.Streams) > 0 {
 | 
				
			||||||
		patroniConfig, pgParameters, err := c.getPatroniConfig(&pod)
 | 
							requiredPgParameters["wal_level"] = "logical"
 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			c.logger.Warningf("%v", err)
 | 
					 | 
				
			||||||
			isSafeToRecreatePods = false
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		restartWait = patroniConfig.LoopWait
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | 
					 | 
				
			||||||
		// do not attempt a restart
 | 
					 | 
				
			||||||
		if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 {
 | 
					 | 
				
			||||||
			// compare config returned from Patroni with what is specified in the manifest
 | 
					 | 
				
			||||||
			configPatched, restartPrimaryFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err)
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// it could take up to LoopWait to apply the config
 | 
					 | 
				
			||||||
			if configPatched {
 | 
					 | 
				
			||||||
				time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2)
 | 
					 | 
				
			||||||
				break
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// restart instances if it is still pending
 | 
						// sync Patroni config
 | 
				
			||||||
	remainingPods := make([]*v1.Pod, 0)
 | 
						if configPatched, restartPrimaryFirst, restartWait, err = c.syncPatroniConfig(pods, c.Spec.Patroni, requiredPgParameters); err != nil {
 | 
				
			||||||
	skipRole := Master
 | 
							c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
 | 
				
			||||||
	if restartPrimaryFirst {
 | 
							isSafeToRecreatePods = false
 | 
				
			||||||
		skipRole = Replica
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for i, pod := range pods {
 | 
					 | 
				
			||||||
		role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
					 | 
				
			||||||
		if role == skipRole {
 | 
					 | 
				
			||||||
			remainingPods = append(remainingPods, &pods[i])
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if err = c.restartInstance(&pod, restartWait); err != nil {
 | 
					 | 
				
			||||||
			c.logger.Errorf("%v", err)
 | 
					 | 
				
			||||||
			isSafeToRecreatePods = false
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// in most cases only the master should be left to restart
 | 
						// restart Postgres where it is still pending
 | 
				
			||||||
	if len(remainingPods) > 0 {
 | 
						if err = c.restartInstances(pods, restartWait, restartPrimaryFirst); err != nil {
 | 
				
			||||||
		for _, remainingPod := range remainingPods {
 | 
							c.logger.Errorf("errors while restarting Postgres in pods via Patroni API: %v", err)
 | 
				
			||||||
			if err = c.restartInstance(remainingPod, restartWait); err != nil {
 | 
							isSafeToRecreatePods = false
 | 
				
			||||||
				c.logger.Errorf("%v", err)
 | 
					 | 
				
			||||||
				isSafeToRecreatePods = false
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// if we get here we also need to re-create the pods (either leftovers from the old
 | 
						// if we get here we also need to re-create the pods (either leftovers from the old
 | 
				
			||||||
| 
						 | 
					@ -471,13 +432,98 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 | 
								c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			c.logger.Warningf("postpone pod recreation until next sync")
 | 
								c.logger.Warningf("postpone pod recreation until next sync because of errors during config sync")
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Cluster) syncPatroniConfig(pods []v1.Pod, requiredPatroniConfig acidv1.Patroni, requiredPgParameters map[string]string) (bool, bool, uint32, error) {
 | 
				
			||||||
 | 
						var (
 | 
				
			||||||
 | 
							effectivePatroniConfig acidv1.Patroni
 | 
				
			||||||
 | 
							effectivePgParameters  map[string]string
 | 
				
			||||||
 | 
							loopWait               uint32
 | 
				
			||||||
 | 
							configPatched          bool
 | 
				
			||||||
 | 
							restartPrimaryFirst    bool
 | 
				
			||||||
 | 
							err                    error
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						errors := make([]string, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
 | 
				
			||||||
 | 
						for i, pod := range pods {
 | 
				
			||||||
 | 
							podName := util.NameFromMeta(pods[i].ObjectMeta)
 | 
				
			||||||
 | 
							effectivePatroniConfig, effectivePgParameters, err = c.patroni.GetConfig(&pod)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								errors = append(errors, fmt.Sprintf("could not get Postgres config from pod %s: %v", podName, err))
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							loopWait = effectivePatroniConfig.LoopWait
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | 
				
			||||||
 | 
							if reflect.DeepEqual(effectivePatroniConfig, acidv1.Patroni{}) || len(effectivePgParameters) == 0 {
 | 
				
			||||||
 | 
								errors = append(errors, fmt.Sprintf("empty Patroni config on pod %s - skipping config patch", podName))
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								configPatched, restartPrimaryFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									errors = append(errors, fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err))
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// it could take up to LoopWait to apply the config
 | 
				
			||||||
 | 
								if configPatched {
 | 
				
			||||||
 | 
									time.Sleep(time.Duration(loopWait)*time.Second + time.Second*2)
 | 
				
			||||||
 | 
									// Patroni's config endpoint is just a "proxy" to DCS.
 | 
				
			||||||
 | 
									// It is enough to patch it only once and it doesn't matter which pod is used
 | 
				
			||||||
 | 
									break
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(errors) > 0 {
 | 
				
			||||||
 | 
							err = fmt.Errorf("%v", strings.Join(errors, `', '`))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return configPatched, restartPrimaryFirst, loopWait, err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Cluster) restartInstances(pods []v1.Pod, restartWait uint32, restartPrimaryFirst bool) (err error) {
 | 
				
			||||||
 | 
						errors := make([]string, 0)
 | 
				
			||||||
 | 
						remainingPods := make([]*v1.Pod, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						skipRole := Master
 | 
				
			||||||
 | 
						if restartPrimaryFirst {
 | 
				
			||||||
 | 
							skipRole = Replica
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i, pod := range pods {
 | 
				
			||||||
 | 
							role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
				
			||||||
 | 
							if role == skipRole {
 | 
				
			||||||
 | 
								remainingPods = append(remainingPods, &pods[i])
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if err = c.restartInstance(&pod, restartWait); err != nil {
 | 
				
			||||||
 | 
								errors = append(errors, fmt.Sprintf("%v", err))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// in most cases only the master should be left to restart
 | 
				
			||||||
 | 
						if len(remainingPods) > 0 {
 | 
				
			||||||
 | 
							for _, remainingPod := range remainingPods {
 | 
				
			||||||
 | 
								if err = c.restartInstance(remainingPod, restartWait); err != nil {
 | 
				
			||||||
 | 
									errors = append(errors, fmt.Sprintf("%v", err))
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(errors) > 0 {
 | 
				
			||||||
 | 
							return fmt.Errorf("%v", strings.Join(errors, `', '`))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error {
 | 
					func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error {
 | 
				
			||||||
	// if the config update requires a restart, call Patroni restart
 | 
						// if the config update requires a restart, call Patroni restart
 | 
				
			||||||
	podName := util.NameFromMeta(pod.ObjectMeta)
 | 
						podName := util.NameFromMeta(pod.ObjectMeta)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue