Enable promote/demote standby via operator
This commit is contained in:
parent
540d58d5bd
commit
0d3eeedb39
|
|
@ -118,6 +118,7 @@ type Patroni struct {
|
||||||
//StandbyCluster
|
//StandbyCluster
|
||||||
type StandbyDescription struct {
|
type StandbyDescription struct {
|
||||||
S3WalPath string `json:"s3_wal_path,omitempty"`
|
S3WalPath string `json:"s3_wal_path,omitempty"`
|
||||||
|
StandbyOptions map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// CloneDescription describes which cluster the new should clone and up to which point in time
|
// CloneDescription describes which cluster the new should clone and up to which point in time
|
||||||
|
|
|
||||||
|
|
@ -506,7 +506,7 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) {
|
||||||
if in.StandbyCluster != nil {
|
if in.StandbyCluster != nil {
|
||||||
in, out := &in.StandbyCluster, &out.StandbyCluster
|
in, out := &in.StandbyCluster, &out.StandbyCluster
|
||||||
*out = new(StandbyDescription)
|
*out = new(StandbyDescription)
|
||||||
**out = **in
|
(*in).DeepCopyInto(*out)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -719,6 +719,13 @@ func (in *Sidecar) DeepCopy() *Sidecar {
|
||||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
func (in *StandbyDescription) DeepCopyInto(out *StandbyDescription) {
|
func (in *StandbyDescription) DeepCopyInto(out *StandbyDescription) {
|
||||||
*out = *in
|
*out = *in
|
||||||
|
if in.StandbyOptions != nil {
|
||||||
|
in, out := &in.StandbyOptions, &out.StandbyOptions
|
||||||
|
*out = make(map[string]string, len(*in))
|
||||||
|
for key, val := range *in {
|
||||||
|
(*out)[key] = val
|
||||||
|
}
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,16 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
c.setSpec(newSpec)
|
//Find out how NewSpec requires to edit the standby options of this cluster
|
||||||
|
if newSpec.Spec.StandbyCluster != nil && c.Spec.StandbyCluster == nil {
|
||||||
|
newSpec.Spec.StandbyCluster.StandbyOptions["restore_command"] = ""
|
||||||
|
c.logger.Info("Setting this cluster as standby cluster")
|
||||||
|
} else if newSpec.Spec.StandbyCluster == nil && c.Spec.StandbyCluster != nil {
|
||||||
|
newSpec.Spec.StandbyCluster.StandbyOptions["restore_command"] = "envdir \"/home/postgres/etc/wal-e.d/env-standby\" /scripts/restore_command.sh \"%f\" \"%p\""
|
||||||
|
c.logger.Info("Promoting this cluster into master from standby")
|
||||||
|
}
|
||||||
|
|
||||||
|
c.setSpec(newSpec)
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warningf("error while syncing cluster state: %v", err)
|
c.logger.Warningf("error while syncing cluster state: %v", err)
|
||||||
|
|
@ -341,6 +349,8 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error {
|
||||||
optionsToSet := make(map[string]string)
|
optionsToSet := make(map[string]string)
|
||||||
pgOptions := c.Spec.Parameters
|
pgOptions := c.Spec.Parameters
|
||||||
|
|
||||||
|
standbyOptions := c.Spec.StandbyCluster.StandbyOptions
|
||||||
|
|
||||||
for k, v := range pgOptions {
|
for k, v := range pgOptions {
|
||||||
if isBootstrapOnlyParameter(k) {
|
if isBootstrapOnlyParameter(k) {
|
||||||
optionsToSet[k] = v
|
optionsToSet[k] = v
|
||||||
|
|
@ -361,6 +371,11 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error {
|
||||||
// carries the request to change configuration through
|
// carries the request to change configuration through
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
podName := util.NameFromMeta(pod.ObjectMeta)
|
podName := util.NameFromMeta(pod.ObjectMeta)
|
||||||
|
c.logger.Debugf("calling Patroni API on a pod %s to promote the Standby",
|
||||||
|
podName)
|
||||||
|
if err = c.patroni.EditStandby(&pod, standbyOptions); err != nil {
|
||||||
|
c.logger.Warningf("could not patch postgres for standby with a pod %s: %v", podName, err)
|
||||||
|
}
|
||||||
c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v",
|
c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v",
|
||||||
podName, optionsToSet)
|
podName, optionsToSet)
|
||||||
if err = c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil {
|
if err = c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil {
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -23,6 +23,7 @@ const (
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
Switchover(master *v1.Pod, candidate string) error
|
Switchover(master *v1.Pod, candidate string) error
|
||||||
SetPostgresParameters(server *v1.Pod, options map[string]string) error
|
SetPostgresParameters(server *v1.Pod, options map[string]string) error
|
||||||
|
EditStandby(server *v1.Pod, options map[string]string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Patroni API client
|
// Patroni API client
|
||||||
|
|
@ -102,3 +103,14 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
|
||||||
}
|
}
|
||||||
return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf)
|
return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//SetPostgresParameters sets Postgres options via Patroni patch API call.
|
||||||
|
func (p *Patroni) EditStandby(server *v1.Pod, options map[string]string) error {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"standby_cluster": {"restore_command": options}})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not encode json: %v", err)
|
||||||
|
}
|
||||||
|
return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue