Use Patroni API to set bootstrap-only options. (#299)
Call Patroni API /config in order to set special options that are ignored when set in the configuration file, such as max_connections. Per https://github.com/zalando-incubator/postgres-operator/issues/297 * Some minor refacoring: Rename Cluster ManualFailover to Swithover Rename Patroni Failover to Switchover Add more details to error messages and comments introduced in this PR. Review by @zerg-junior
This commit is contained in:
parent
24df918dda
commit
48a5744314
|
|
@ -866,8 +866,8 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ManualFailover does manual failover to a candidate pod
|
// Switchover does a switchover (via Patroni) to a candidate pod
|
||||||
func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedName) error {
|
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error {
|
||||||
c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate)
|
c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate)
|
||||||
|
|
||||||
podLabelErr := make(chan error)
|
podLabelErr := make(chan error)
|
||||||
|
|
@ -889,7 +889,7 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if err := c.patroni.Failover(curMaster, candidate.Name); err != nil {
|
if err := c.patroni.Switchover(curMaster, candidate.Name); err != nil {
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
return fmt.Errorf("could not failover: %v", err)
|
return fmt.Errorf("could not failover: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -232,7 +232,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
masterCandidateName := util.NameFromMeta(pod.ObjectMeta)
|
masterCandidateName := util.NameFromMeta(pod.ObjectMeta)
|
||||||
if err := c.ManualFailover(oldMaster, masterCandidateName); err != nil {
|
if err := c.Switchover(oldMaster, masterCandidateName); err != nil {
|
||||||
return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err)
|
return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -330,7 +330,7 @@ func (c *Cluster) recreatePods() error {
|
||||||
if masterPod != nil {
|
if masterPod != nil {
|
||||||
// failover if we have not observed a master pod when re-creating former replicas.
|
// failover if we have not observed a master pod when re-creating former replicas.
|
||||||
if newMasterPod == nil && len(replicas) > 0 {
|
if newMasterPod == nil && len(replicas) > 0 {
|
||||||
if err := c.ManualFailover(masterPod, masterCandidate(replicas)); err != nil {
|
if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil {
|
||||||
c.logger.Warningf("could not perform failover: %v", err)
|
c.logger.Warningf("could not perform failover: %v", err)
|
||||||
}
|
}
|
||||||
} else if newMasterPod == nil && len(replicas) == 0 {
|
} else if newMasterPod == nil && len(replicas) == 0 {
|
||||||
|
|
|
||||||
|
|
@ -125,7 +125,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error {
|
||||||
return fmt.Errorf("pod %q does not belong to cluster", podName)
|
return fmt.Errorf("pod %q does not belong to cluster", podName)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.patroni.Failover(&masterPod[0], masterCandidatePod.Name); err != nil {
|
if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name); err != nil {
|
||||||
return fmt.Errorf("could not failover: %v", err)
|
return fmt.Errorf("could not failover: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -229,6 +229,7 @@ func (c *Cluster) syncStatefulSet() error {
|
||||||
var (
|
var (
|
||||||
podsRollingUpdateRequired bool
|
podsRollingUpdateRequired bool
|
||||||
)
|
)
|
||||||
|
// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
|
||||||
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{})
|
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !k8sutil.ResourceNotFound(err) {
|
if !k8sutil.ResourceNotFound(err) {
|
||||||
|
|
@ -288,6 +289,14 @@ func (c *Cluster) syncStatefulSet() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply special PostgreSQL parameters that can only be set via the Patroni API.
|
||||||
|
// it is important to do it after the statefulset pods are there, but before the rolling update
|
||||||
|
// since those parameters require PostgreSQL restart.
|
||||||
|
if err := c.checkAndSetGlobalPostgreSQLConfiguration(); err != nil {
|
||||||
|
return fmt.Errorf("could not set cluster-wide PostgreSQL configuration options: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// if we get here we also need to re-create the pods (either leftovers from the old
|
// if we get here we also need to re-create the pods (either leftovers from the old
|
||||||
// statefulset or those that got their configuration from the outdated statefulset)
|
// statefulset or those that got their configuration from the outdated statefulset)
|
||||||
if podsRollingUpdateRequired {
|
if podsRollingUpdateRequired {
|
||||||
|
|
@ -303,6 +312,43 @@ func (c *Cluster) syncStatefulSet() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
|
||||||
|
// (like max_connections) has changed and if necessary sets it via the Patroni API
|
||||||
|
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error {
|
||||||
|
// we need to extract those options from the cluster manifest.
|
||||||
|
optionsToSet := make(map[string]string)
|
||||||
|
pgOptions := c.Spec.Parameters
|
||||||
|
|
||||||
|
for k, v := range pgOptions {
|
||||||
|
if isBootstrapOnlyParameter(k) {
|
||||||
|
optionsToSet[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(optionsToSet) > 0 {
|
||||||
|
pods, err := c.listPods()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(pods) == 0 {
|
||||||
|
return fmt.Errorf("could not call Patroni API: cluster has no pods")
|
||||||
|
}
|
||||||
|
for _, pod := range pods {
|
||||||
|
podName := util.NameFromMeta(pod.ObjectMeta)
|
||||||
|
c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v",
|
||||||
|
podName, optionsToSet)
|
||||||
|
if err := c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil {
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)",
|
||||||
|
len(pods))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cluster) syncSecrets() error {
|
func (c *Cluster) syncSecrets() error {
|
||||||
c.setProcessName("syncing secrets")
|
c.setProcessName("syncing secrets")
|
||||||
secrets := c.generateUserSecrets()
|
secrets := c.generateUserSecrets()
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -14,6 +15,7 @@ import (
|
||||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||||
policyv1beta1 "k8s.io/client-go/pkg/apis/policy/v1beta1"
|
policyv1beta1 "k8s.io/client-go/pkg/apis/policy/v1beta1"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// EventType contains type of the events for the TPRs and Pods received from Kubernetes
|
// EventType contains type of the events for the TPRs and Pods received from Kubernetes
|
||||||
|
|
@ -223,6 +225,9 @@ func (r RoleOrigin) String() string {
|
||||||
// Placing this func here instead of pgk/util avoids circular import
|
// Placing this func here instead of pgk/util avoids circular import
|
||||||
func GetOperatorNamespace() string {
|
func GetOperatorNamespace() string {
|
||||||
if operatorNamespace == "" {
|
if operatorNamespace == "" {
|
||||||
|
if namespaceFromEnvironment := os.Getenv("OPERATOR_NAMESPACE"); namespaceFromEnvironment != "" {
|
||||||
|
return namespaceFromEnvironment
|
||||||
|
}
|
||||||
operatorNamespaceBytes, err := ioutil.ReadFile(fileWithNamespace)
|
operatorNamespaceBytes, err := ioutil.ReadFile(fileWithNamespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Unable to detect operator namespace from within its pod due to: %v", err)
|
log.Fatalf("Unable to detect operator namespace from within its pod due to: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -14,13 +14,15 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
failoverPath = "/failover"
|
failoverPath = "/failover"
|
||||||
|
configPath = "/config"
|
||||||
apiPort = 8008
|
apiPort = 8008
|
||||||
timeout = 30 * time.Second
|
timeout = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Interface describe patroni methods
|
// Interface describe patroni methods
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
Failover(master *v1.Pod, candidate string) error
|
Switchover(master *v1.Pod, candidate string) error
|
||||||
|
SetPostgresParameters(server *v1.Pod, options map[string]string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Patroni API client
|
// Patroni API client
|
||||||
|
|
@ -45,20 +47,13 @@ func apiURL(masterPod *v1.Pod) string {
|
||||||
return fmt.Sprintf("http://%s:%d", masterPod.Status.PodIP, apiPort)
|
return fmt.Sprintf("http://%s:%d", masterPod.Status.PodIP, apiPort)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Failover does manual failover via patroni api
|
func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) error {
|
||||||
func (p *Patroni) Failover(master *v1.Pod, candidate string) error {
|
request, err := http.NewRequest(method, url, body)
|
||||||
buf := &bytes.Buffer{}
|
|
||||||
|
|
||||||
err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not encode json: %v", err)
|
|
||||||
}
|
|
||||||
request, err := http.NewRequest(http.MethodPost, apiURL(master)+failoverPath, buf)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create request: %v", err)
|
return fmt.Errorf("could not create request: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.logger.Debugf("making http request: %s", request.URL.String())
|
p.logger.Debugf("making %s http request: %s", method, request.URL.String())
|
||||||
|
|
||||||
resp, err := p.httpClient.Do(request)
|
resp, err := p.httpClient.Do(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -74,6 +69,28 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error {
|
||||||
|
|
||||||
return fmt.Errorf("patroni returned '%s'", string(bodyBytes))
|
return fmt.Errorf("patroni returned '%s'", string(bodyBytes))
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Switchover by calling Patroni REST API
|
||||||
|
func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not encode json: %v", err)
|
||||||
|
}
|
||||||
|
return p.httpPostOrPatch(http.MethodPost, apiURL(master)+failoverPath, buf)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO: add an option call /patroni to check if it is necessary to restart the server
|
||||||
|
// SetPostgresParameters sets Postgres options via Patroni patch API call.
|
||||||
|
func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not encode json: %v", err)
|
||||||
|
}
|
||||||
|
return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue