refactor restarting instances

This commit is contained in:
Felix Kunde 2021-06-23 14:18:01 +02:00
parent 54e506c00b
commit fa98d6748f
1 changed files with 58 additions and 72 deletions

View File

@ -261,7 +261,10 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
} }
func (c *Cluster) syncStatefulSet() error { func (c *Cluster) syncStatefulSet() error {
var instancesRestartRequired bool var (
masterPod *v1.Pod
instanceRestartRequired bool
)
podsToRecreate := make([]v1.Pod, 0) podsToRecreate := make([]v1.Pod, 0)
switchoverCandidates := make([]spec.NamespacedName, 0) switchoverCandidates := make([]spec.NamespacedName, 0)
@ -381,21 +384,52 @@ func (c *Cluster) syncStatefulSet() error {
// Apply special PostgreSQL parameters that can only be set via the Patroni API. // Apply special PostgreSQL parameters that can only be set via the Patroni API.
// it is important to do it after the statefulset pods are there, but before the rolling update // it is important to do it after the statefulset pods are there, but before the rolling update
// since those parameters require PostgreSQL restart. // since those parameters require PostgreSQL restart.
instancesRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration() pods, err = c.listPods()
if err != nil { if err != nil {
return fmt.Errorf("could not set cluster-wide PostgreSQL configuration options: %v", err) c.logger.Infof("could not list pods of the statefulset: %v", err)
} }
for i, pod := range pods {
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
if instancesRestartRequired { if role == Master {
c.logger.Debugln("restarting Postgres server within pods") masterPod = &pods[i]
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "restarting Postgres server within pods") continue
if err := c.restartInstances(); err != nil { }
c.logger.Warningf("could not restart Postgres server within pods: %v", err)
podName := util.NameFromMeta(pods[i].ObjectMeta)
config, err := c.patroni.GetConfig(&pod)
if err != nil {
return fmt.Errorf("could not get config for pod %s: %v", podName, err)
}
instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod)
if err != nil {
return fmt.Errorf("could not set cluster-wide PostgreSQL configuration options: %v", err)
}
if instanceRestartRequired {
c.logger.Debugln("restarting Postgres server within pods")
ttl, ok := config["ttl"].(int32)
if !ok {
ttl = 30
}
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "restarting Postgres server within pod "+pod.Name)
if err := c.restartInstance(&pod, ttl); err != nil {
c.logger.Warningf("could not restart Postgres server within pod %s: %v", podName, err)
}
c.logger.Infof("Postgres server successfuly restarted in pod %s", podName)
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Postgres server restart done for pod "+pod.Name)
} }
c.logger.Infof("Postgres server successfuly restarted on all pods")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Postgres server restart done - all instances have been restarted")
} }
if masterPod != nil {
masterPodName := util.NameFromMeta(masterPod.ObjectMeta)
if err := c.restartInstance(masterPod, 0); err != nil {
c.logger.Warningf("could not restart Postgres master within pod %s: %v", masterPodName, err)
}
}
// if we get here we also need to re-create the pods (either leftovers from the old // if we get here we also need to re-create the pods (either leftovers from the old
// statefulset or those that got their configuration from the outdated statefulset) // statefulset or those that got their configuration from the outdated statefulset)
if len(podsToRecreate) > 0 { if len(podsToRecreate) > 0 {
@ -409,53 +443,12 @@ func (c *Cluster) syncStatefulSet() error {
return nil return nil
} }
func (c *Cluster) restartInstances() error { func (c *Cluster) restartInstance(pod *v1.Pod, ttl int32) error {
c.setProcessName("starting to restart Postgres servers")
ls := c.labelsSet(false)
namespace := c.Namespace
listOptions := metav1.ListOptions{ if err := c.patroni.Restart(pod); err != nil {
LabelSelector: ls.String(), return err
}
pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
return fmt.Errorf("could not get the list of pods: %v", err)
}
c.logger.Infof("there are %d pods in the cluster which resquire Postgres server restart", len(pods.Items))
var (
masterPod *v1.Pod
)
for i, pod := range pods.Items {
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
if role == Master {
masterPod = &pods.Items[i]
continue
}
podName := util.NameFromMeta(pods.Items[i].ObjectMeta)
config, err := c.patroni.GetConfig(&pod)
if err != nil {
return fmt.Errorf("could not get config for pod %s: %v", podName, err)
}
ttl, ok := config["ttl"].(int32)
if !ok {
ttl = 30
}
if err = c.patroni.Restart(&pod); err != nil {
return fmt.Errorf("could not restart Postgres server on pod %s: %v", podName, err)
}
time.Sleep(time.Duration(ttl) * time.Second)
}
if masterPod != nil {
podName := util.NameFromMeta(masterPod.ObjectMeta)
if err = c.patroni.Restart(masterPod); err != nil {
return fmt.Errorf("could not restart postgres server on masterPod %s: %v", podName, err)
}
} }
time.Sleep(time.Duration(ttl) * time.Second)
return nil return nil
} }
@ -493,8 +486,8 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri
} }
// checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
// (like max_connections) has changed and if necessary sets it via the Patroni API // (like max_connections) have changed and if necessary sets it via the Patroni API
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() (bool, error) { func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod) (bool, error) {
var ( var (
err error err error
pods []v1.Pod pods []v1.Pod
@ -515,24 +508,17 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() (bool, error) {
return restartRequired, nil return restartRequired, nil
} }
if pods, err = c.listPods(); err != nil {
return restartRequired, err
}
if len(pods) == 0 {
return restartRequired, fmt.Errorf("could not call Patroni API: cluster has no pods")
}
// try all pods until the first one that is successful, as it doesn't matter which pod // try all pods until the first one that is successful, as it doesn't matter which pod
// carries the request to change configuration through // carries the request to change configuration through
for _, pod := range pods { podName := util.NameFromMeta(pod.ObjectMeta)
podName := util.NameFromMeta(pod.ObjectMeta) c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v",
c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v", podName, optionsToSet)
podName, optionsToSet) if err = c.patroni.SetPostgresParameters(pod, optionsToSet); err == nil {
if err = c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil { restartRequired = true
restartRequired = true return restartRequired, nil
return restartRequired, nil
}
c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err)
} }
c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err)
return restartRequired, fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)", return restartRequired, fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)",
len(pods)) len(pods))
} }