From 48a574431462b4543dcf67dfde3a43f5a372270f Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Tue, 29 May 2018 12:35:25 +0200 Subject: [PATCH] Use Patroni API to set bootstrap-only options. (#299) Call Patroni API /config in order to set special options that are ignored when set in the configuration file, such as max_connections. Per https://github.com/zalando-incubator/postgres-operator/issues/297 * Some minor refacoring: Rename Cluster ManualFailover to Swithover Rename Patroni Failover to Switchover Add more details to error messages and comments introduced in this PR. Review by @zerg-junior --- pkg/cluster/cluster.go | 6 ++--- pkg/cluster/pod.go | 4 ++-- pkg/cluster/resources.go | 2 +- pkg/cluster/sync.go | 46 +++++++++++++++++++++++++++++++++++++ pkg/spec/types.go | 5 ++++ pkg/util/patroni/patroni.go | 39 ++++++++++++++++++++++--------- 6 files changed, 85 insertions(+), 17 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 93966ce2c..2662cd521 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -866,8 +866,8 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus { } } -// ManualFailover does manual failover to a candidate pod -func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedName) error { +// Switchover does a switchover (via Patroni) to a candidate pod +func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error { c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate) podLabelErr := make(chan error) @@ -889,7 +889,7 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam } }() - if err := c.patroni.Failover(curMaster, candidate.Name); err != nil { + if err := c.patroni.Switchover(curMaster, candidate.Name); err != nil { close(stopCh) return fmt.Errorf("could not failover: %v", err) } diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index ea3e02dcd..66c2950bc 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -232,7 +232,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { } masterCandidateName := util.NameFromMeta(pod.ObjectMeta) - if err := c.ManualFailover(oldMaster, masterCandidateName); err != nil { + if err := c.Switchover(oldMaster, masterCandidateName); err != nil { return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err) } } else { @@ -330,7 +330,7 @@ func (c *Cluster) recreatePods() error { if masterPod != nil { // failover if we have not observed a master pod when re-creating former replicas. if newMasterPod == nil && len(replicas) > 0 { - if err := c.ManualFailover(masterPod, masterCandidate(replicas)); err != nil { + if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil { c.logger.Warningf("could not perform failover: %v", err) } } else if newMasterPod == nil && len(replicas) == 0 { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 1a1f3c69a..cc3c4e6f9 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -125,7 +125,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error { return fmt.Errorf("pod %q does not belong to cluster", podName) } - if err := c.patroni.Failover(&masterPod[0], masterCandidatePod.Name); err != nil { + if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name); err != nil { return fmt.Errorf("could not failover: %v", err) } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7cff5d935..cf38a6d4f 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -229,6 +229,7 @@ func (c *Cluster) syncStatefulSet() error { var ( podsRollingUpdateRequired bool ) + // NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early. sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { @@ -288,6 +289,14 @@ func (c *Cluster) syncStatefulSet() error { } } } + + // Apply special PostgreSQL parameters that can only be set via the Patroni API. + // it is important to do it after the statefulset pods are there, but before the rolling update + // since those parameters require PostgreSQL restart. + if err := c.checkAndSetGlobalPostgreSQLConfiguration(); err != nil { + return fmt.Errorf("could not set cluster-wide PostgreSQL configuration options: %v", err) + } + // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) if podsRollingUpdateRequired { @@ -303,6 +312,43 @@ func (c *Cluster) syncStatefulSet() error { return nil } +// checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters +// (like max_connections) has changed and if necessary sets it via the Patroni API +func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error { + // we need to extract those options from the cluster manifest. + optionsToSet := make(map[string]string) + pgOptions := c.Spec.Parameters + + for k, v := range pgOptions { + if isBootstrapOnlyParameter(k) { + optionsToSet[k] = v + } + } + + if len(optionsToSet) > 0 { + pods, err := c.listPods() + if err != nil { + return err + } + if len(pods) == 0 { + return fmt.Errorf("could not call Patroni API: cluster has no pods") + } + for _, pod := range pods { + podName := util.NameFromMeta(pod.ObjectMeta) + c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v", + podName, optionsToSet) + if err := c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil { + return nil + } else { + c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err) + } + } + return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)", + len(pods)) + } + return nil +} + func (c *Cluster) syncSecrets() error { c.setProcessName("syncing secrets") secrets := c.generateUserSecrets() diff --git a/pkg/spec/types.go b/pkg/spec/types.go index c559f782a..66d6a73fa 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "log" + "os" "strings" "time" @@ -14,6 +15,7 @@ import ( "k8s.io/client-go/pkg/apis/apps/v1beta1" policyv1beta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" "k8s.io/client-go/rest" + ) // EventType contains type of the events for the TPRs and Pods received from Kubernetes @@ -223,6 +225,9 @@ func (r RoleOrigin) String() string { // Placing this func here instead of pgk/util avoids circular import func GetOperatorNamespace() string { if operatorNamespace == "" { + if namespaceFromEnvironment := os.Getenv("OPERATOR_NAMESPACE"); namespaceFromEnvironment != "" { + return namespaceFromEnvironment + } operatorNamespaceBytes, err := ioutil.ReadFile(fileWithNamespace) if err != nil { log.Fatalf("Unable to detect operator namespace from within its pod due to: %v", err) diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index f5cbbf678..5e5dc4aeb 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -14,13 +14,15 @@ import ( const ( failoverPath = "/failover" + configPath = "/config" apiPort = 8008 timeout = 30 * time.Second ) // Interface describe patroni methods type Interface interface { - Failover(master *v1.Pod, candidate string) error + Switchover(master *v1.Pod, candidate string) error + SetPostgresParameters(server *v1.Pod, options map[string]string) error } // Patroni API client @@ -45,20 +47,13 @@ func apiURL(masterPod *v1.Pod) string { return fmt.Sprintf("http://%s:%d", masterPod.Status.PodIP, apiPort) } -// Failover does manual failover via patroni api -func (p *Patroni) Failover(master *v1.Pod, candidate string) error { - buf := &bytes.Buffer{} - - err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate}) - if err != nil { - return fmt.Errorf("could not encode json: %v", err) - } - request, err := http.NewRequest(http.MethodPost, apiURL(master)+failoverPath, buf) +func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) error { + request, err := http.NewRequest(method, url, body) if err != nil { return fmt.Errorf("could not create request: %v", err) } - p.logger.Debugf("making http request: %s", request.URL.String()) + p.logger.Debugf("making %s http request: %s", method, request.URL.String()) resp, err := p.httpClient.Do(request) if err != nil { @@ -74,6 +69,28 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error { return fmt.Errorf("patroni returned '%s'", string(bodyBytes)) } + return nil +} + +// Switchover by calling Patroni REST API +func (p *Patroni) Switchover(master *v1.Pod, candidate string) error { + buf := &bytes.Buffer{} + err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate}) + if err != nil { + return fmt.Errorf("could not encode json: %v", err) + } + return p.httpPostOrPatch(http.MethodPost, apiURL(master)+failoverPath, buf) return nil } + +//TODO: add an option call /patroni to check if it is necessary to restart the server +// SetPostgresParameters sets Postgres options via Patroni patch API call. +func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error { + buf := &bytes.Buffer{} + err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}}) + if err != nil { + return fmt.Errorf("could not encode json: %v", err) + } + return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf) +}