Standby cluster promotion by changing manifest (#2472)
* Standby cluster promotion by changing manifest * Updated the documentation --------- Co-authored-by: Senthilnathan M <snathanm@vmware.com>
This commit is contained in:
parent
bbba15f9bf
commit
dad5b132ec
38
docs/user.md
38
docs/user.md
|
|
@ -940,33 +940,25 @@ established between standby replica(s).
|
|||
One big advantage of standby clusters is that they can be promoted to a proper
|
||||
database cluster. This means it will stop replicating changes from the source,
|
||||
and start accept writes itself. This mechanism makes it possible to move
|
||||
databases from one place to another with minimal downtime. Currently, the
|
||||
operator does not support promoting a standby cluster. It has to be done
|
||||
manually using `patronictl edit-config` inside the postgres container of the
|
||||
standby leader pod. Remove the following lines from the YAML structure and the
|
||||
leader promotion happens immediately. Before doing so, make sure that the
|
||||
standby is not behind the source database.
|
||||
databases from one place to another with minimal downtime.
|
||||
|
||||
```yaml
|
||||
standby_cluster:
|
||||
create_replica_methods:
|
||||
- bootstrap_standby_with_wale
|
||||
- basebackup_fast_xlog
|
||||
restore_command: envdir "/home/postgres/etc/wal-e.d/env-standby" /scripts/restore_command.sh
|
||||
"%f" "%p"
|
||||
```
|
||||
Before promoting a standby cluster, make sure that the standby is not behind
|
||||
the source database. You should ideally stop writes to your source cluster and
|
||||
then create a dummy database object that you check for being replicated in the
|
||||
target to verify all data has been copied.
|
||||
|
||||
Finally, remove the `standby` section from the postgres cluster manifest.
|
||||
To promote, remove the `standby` section from the postgres cluster manifest.
|
||||
A rolling update will be triggered removing the `STANDBY_*` environment
|
||||
variables from the pods, followed by a Patroni config update that promotes the
|
||||
cluster.
|
||||
|
||||
### Turn a normal cluster into a standby
|
||||
### Adding standby section after promotion
|
||||
|
||||
There is no way to transform a non-standby cluster to a standby cluster through
|
||||
the operator. Adding the `standby` section to the manifest of a running
|
||||
Postgres cluster will have no effect. But, as explained in the previous
|
||||
paragraph it can be done manually through `patronictl edit-config`. This time,
|
||||
by adding the `standby_cluster` section to the Patroni configuration. However,
|
||||
the transformed standby cluster will not be doing any streaming. It will be in
|
||||
standby mode and allow read-only transactions only.
|
||||
Turning a running cluster into a standby is not easily possible and should be
|
||||
avoided. The best way is to remove the cluster and resubmit the manifest
|
||||
after a short wait of a few minutes. Adding the `standby` section would turn
|
||||
the database cluster in read-only mode on next operator SYNC cycle but it
|
||||
does not sync automatically with the source cluster again.
|
||||
|
||||
## Sidecar Support
|
||||
|
||||
|
|
|
|||
|
|
@ -880,6 +880,13 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
}
|
||||
}()
|
||||
|
||||
// add or remove standby_cluster section from Patroni config depending on changes in standby section
|
||||
if reflect.DeepEqual(oldSpec.Spec.StandbyCluster, newSpec.Spec.StandbyCluster) {
|
||||
if err := c.syncStandbyClusterConfiguration(); err != nil {
|
||||
return fmt.Errorf("could not set StandbyCluster configuration options: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// pod disruption budget
|
||||
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
|
||||
c.logger.Debug("syncing pod disruption budgets")
|
||||
|
|
|
|||
|
|
@ -84,6 +84,13 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
|||
}
|
||||
}
|
||||
|
||||
// add or remove standby_cluster section from Patroni config depending on changes in standby section
|
||||
if reflect.DeepEqual(oldSpec.Spec.StandbyCluster, newSpec.Spec.StandbyCluster) {
|
||||
if err := c.syncStandbyClusterConfiguration(); err != nil {
|
||||
return fmt.Errorf("could not sync StandbyCluster configuration: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.logger.Debug("syncing pod disruption budgets")
|
||||
if err = c.syncPodDisruptionBudget(false); err != nil {
|
||||
err = fmt.Errorf("could not sync pod disruption budget: %v", err)
|
||||
|
|
@ -710,6 +717,46 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
|
|||
return configPatched, requiresMasterRestart, nil
|
||||
}
|
||||
|
||||
// syncStandbyClusterConfiguration checks whether standby cluster
|
||||
// parameters have changed and if necessary sets it via the Patroni API
|
||||
func (c *Cluster) syncStandbyClusterConfiguration() error {
|
||||
var (
|
||||
err error
|
||||
pods []v1.Pod
|
||||
)
|
||||
|
||||
standbyOptionsToSet := make(map[string]interface{})
|
||||
if c.Spec.StandbyCluster != nil {
|
||||
c.logger.Infof("turning %q into a standby cluster", c.Name)
|
||||
standbyOptionsToSet["create_replica_methods"] = []string{"bootstrap_standby_with_wale", "basebackup_fast_xlog"}
|
||||
standbyOptionsToSet["restore_command"] = "envdir \"/run/etc/wal-e.d/env-standby\" /scripts/restore_command.sh \"%f\" \"%p\""
|
||||
|
||||
} else {
|
||||
c.logger.Infof("promoting standby cluster and detach from source")
|
||||
standbyOptionsToSet = nil
|
||||
}
|
||||
|
||||
if pods, err = c.listPods(); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(pods) == 0 {
|
||||
return fmt.Errorf("could not call Patroni API: cluster has no pods")
|
||||
}
|
||||
// try all pods until the first one that is successful, as it doesn't matter which pod
|
||||
// carries the request to change configuration through
|
||||
for _, pod := range pods {
|
||||
podName := util.NameFromMeta(pod.ObjectMeta)
|
||||
c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s",
|
||||
podName, standbyOptionsToSet)
|
||||
if err = c.patroni.SetStandbyClusterParameters(&pod, standbyOptionsToSet); err == nil {
|
||||
return nil
|
||||
}
|
||||
c.logger.Warningf("could not patch postgres parameters within pod %s: %v", podName, err)
|
||||
}
|
||||
return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)",
|
||||
len(pods))
|
||||
}
|
||||
|
||||
func (c *Cluster) syncSecrets() error {
|
||||
c.logger.Info("syncing secrets")
|
||||
c.setProcessName("syncing secrets")
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package cluster
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
|
@ -480,6 +481,140 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSyncStandbyClusterConfiguration(t *testing.T) {
|
||||
client, _ := newFakeK8sSyncClient()
|
||||
clusterName := "acid-standby-cluster"
|
||||
applicationLabel := "spilo"
|
||||
namespace := "default"
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
pg := acidv1.Postgresql{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: clusterName,
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: acidv1.PostgresSpec{
|
||||
NumberOfInstances: int32(1),
|
||||
Volume: acidv1.Volume{
|
||||
Size: "1Gi",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var cluster = New(
|
||||
Config{
|
||||
OpConfig: config.Config{
|
||||
PatroniAPICheckInterval: time.Duration(1),
|
||||
PatroniAPICheckTimeout: time.Duration(5),
|
||||
PodManagementPolicy: "ordered_ready",
|
||||
Resources: config.Resources{
|
||||
ClusterLabels: map[string]string{"application": applicationLabel},
|
||||
ClusterNameLabel: "cluster-name",
|
||||
DefaultCPURequest: "300m",
|
||||
DefaultCPULimit: "300m",
|
||||
DefaultMemoryRequest: "300Mi",
|
||||
DefaultMemoryLimit: "300Mi",
|
||||
MinInstances: int32(-1),
|
||||
MaxInstances: int32(-1),
|
||||
PodRoleLabel: "spilo-role",
|
||||
ResourceCheckInterval: time.Duration(3),
|
||||
ResourceCheckTimeout: time.Duration(10),
|
||||
},
|
||||
},
|
||||
}, client, pg, logger, eventRecorder)
|
||||
|
||||
cluster.Name = clusterName
|
||||
cluster.Namespace = namespace
|
||||
|
||||
// mocking a config after getConfig is called
|
||||
mockClient := mocks.NewMockHTTPClient(ctrl)
|
||||
configJson := `{"ttl": 20}`
|
||||
r := ioutil.NopCloser(bytes.NewReader([]byte(configJson)))
|
||||
response := http.Response{
|
||||
StatusCode: 200,
|
||||
Body: r,
|
||||
}
|
||||
mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil).AnyTimes()
|
||||
|
||||
// mocking a config after setConfig is called
|
||||
standbyJson := `{"standby_cluster":{"create_replica_methods":["bootstrap_standby_with_wale","basebackup_fast_xlog"],"restore_command":"envdir \"/run/etc/wal-e.d/env-standby\" /scripts/restore_command.sh \"%f\" \"%p\""}}`
|
||||
r = ioutil.NopCloser(bytes.NewReader([]byte(standbyJson)))
|
||||
response = http.Response{
|
||||
StatusCode: 200,
|
||||
Body: r,
|
||||
}
|
||||
mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes()
|
||||
p := patroni.New(patroniLogger, mockClient)
|
||||
cluster.patroni = p
|
||||
|
||||
mockPod := newMockPod("192.168.100.1")
|
||||
mockPod.Name = fmt.Sprintf("%s-0", clusterName)
|
||||
mockPod.Namespace = namespace
|
||||
podLabels := map[string]string{
|
||||
"cluster-name": clusterName,
|
||||
"application": applicationLabel,
|
||||
"spilo-role": "master",
|
||||
}
|
||||
mockPod.Labels = podLabels
|
||||
client.PodsGetter.Pods(namespace).Create(context.TODO(), mockPod, metav1.CreateOptions{})
|
||||
|
||||
// create a statefulset
|
||||
sts, err := cluster.createStatefulSet()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// check that pods do not have a STANDBY_* environment variable
|
||||
assert.NotContains(t, sts.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "STANDBY_METHOD", Value: "STANDBY_WITH_WALE"})
|
||||
|
||||
// add standby section
|
||||
cluster.Spec.StandbyCluster = &acidv1.StandbyDescription{
|
||||
S3WalPath: "s3://custom/path/to/bucket/",
|
||||
}
|
||||
cluster.syncStatefulSet()
|
||||
updatedSts := cluster.Statefulset
|
||||
|
||||
// check that pods do not have a STANDBY_* environment variable
|
||||
assert.Contains(t, updatedSts.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "STANDBY_METHOD", Value: "STANDBY_WITH_WALE"})
|
||||
|
||||
// this should update the Patroni config
|
||||
err = cluster.syncStandbyClusterConfiguration()
|
||||
assert.NoError(t, err)
|
||||
|
||||
configJson = `{"standby_cluster":{"create_replica_methods":["bootstrap_standby_with_wale","basebackup_fast_xlog"],"restore_command":"envdir \"/run/etc/wal-e.d/env-standby\" /scripts/restore_command.sh \"%f\" \"%p\""}, "ttl": 20}`
|
||||
r = ioutil.NopCloser(bytes.NewReader([]byte(configJson)))
|
||||
response = http.Response{
|
||||
StatusCode: 200,
|
||||
Body: r,
|
||||
}
|
||||
mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil).AnyTimes()
|
||||
|
||||
pods, err := cluster.listPods()
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, _, err = cluster.patroni.GetConfig(&pods[0])
|
||||
assert.NoError(t, err)
|
||||
// ToDo extend GetConfig to return standy_cluster setting to compare
|
||||
/*
|
||||
defaultStandbyParameters := map[string]interface{}{
|
||||
"create_replica_methods": []string{"bootstrap_standby_with_wale", "basebackup_fast_xlog"},
|
||||
"restore_command": "envdir \"/run/etc/wal-e.d/env-standby\" /scripts/restore_command.sh \"%f\" \"%p\"",
|
||||
}
|
||||
assert.True(t, reflect.DeepEqual(defaultStandbyParameters, standbyCluster))
|
||||
*/
|
||||
// remove standby section
|
||||
cluster.Spec.StandbyCluster = &acidv1.StandbyDescription{}
|
||||
cluster.syncStatefulSet()
|
||||
updatedSts2 := cluster.Statefulset
|
||||
|
||||
// check that pods do not have a STANDBY_* environment variable
|
||||
assert.NotContains(t, updatedSts2.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "STANDBY_METHOD", Value: "STANDBY_WITH_WALE"})
|
||||
|
||||
// this should update the Patroni config again
|
||||
err = cluster.syncStandbyClusterConfiguration()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestUpdateSecret(t *testing.T) {
|
||||
testName := "test syncing secrets"
|
||||
client, _ := newFakeK8sSyncSecretsClient()
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ type Interface interface {
|
|||
GetClusterMembers(master *v1.Pod) ([]ClusterMember, error)
|
||||
Switchover(master *v1.Pod, candidate string) error
|
||||
SetPostgresParameters(server *v1.Pod, options map[string]string) error
|
||||
SetStandbyClusterParameters(server *v1.Pod, options map[string]interface{}) error
|
||||
GetMemberData(server *v1.Pod) (MemberData, error)
|
||||
Restart(server *v1.Pod) error
|
||||
GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error)
|
||||
|
|
@ -150,7 +151,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
|
|||
|
||||
//TODO: add an option call /patroni to check if it is necessary to restart the server
|
||||
|
||||
//SetPostgresParameters sets Postgres options via Patroni patch API call.
|
||||
// SetPostgresParameters sets Postgres options via Patroni patch API call.
|
||||
func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error {
|
||||
buf := &bytes.Buffer{}
|
||||
err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}})
|
||||
|
|
@ -164,7 +165,12 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
|
|||
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
|
||||
}
|
||||
|
||||
//SetConfig sets Patroni options via Patroni patch API call.
|
||||
// SetStandbyClusterParameters sets StandbyCluster options via Patroni patch API call.
|
||||
func (p *Patroni) SetStandbyClusterParameters(server *v1.Pod, parameters map[string]interface{}) error {
|
||||
return p.SetConfig(server, map[string]interface{}{"standby_cluster": parameters})
|
||||
}
|
||||
|
||||
// SetConfig sets Patroni options via Patroni patch API call.
|
||||
func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error {
|
||||
buf := &bytes.Buffer{}
|
||||
err := json.NewEncoder(buf).Encode(config)
|
||||
|
|
|
|||
Loading…
Reference in New Issue