standby cluster that streams from a remote primary (#1830)

* add the possibility to create a standby cluster that streams from a remote primary
* extending unit tests
* add more docs and e2e test

Co-authored-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
Felix Kunde 2022-04-04 15:41:11 +02:00 committed by GitHub
parent 2dfb11ad4c
commit 0dc370f15d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 303 additions and 84 deletions

View File

@ -460,6 +460,17 @@ spec:
type: string
gs_wal_path:
type: string
standby_host:
type: string
standby_port:
type: string
oneOf:
- required:
- s3_wal_path
- required:
- gs_wal_path
- required:
- standby_host
streams:
type: array
nullable: true

View File

@ -1087,12 +1087,16 @@ data:
### Standby clusters
The setup for [standby clusters](user.md#setting-up-a-standby-cluster) is very
similar to cloning. At the moment, the operator only allows for streaming from
the S3 WAL archive of the master specified in the manifest. Like with cloning,
if you are using [additional environment variables](#custom-pod-environment-variables)
to access your backup location you have to copy those variables and prepend the
`STANDBY_` prefix for Spilo to find the backups and WAL files to stream.
The setup for [standby clusters](user.md#setting-up-a-standby-cluster) is
similar to cloning when they stream changes from a WAL archive (S3 or GCS).
If you are using [additional environment variables](#custom-pod-environment-variables)
to access your backup location you have to copy those variables and prepend
the `STANDBY_` prefix for Spilo to find the backups and WAL files to stream.
Alternatively, standby clusters can also stream from a remote primary cluster.
You have to specify the host address. Port is optional and defaults to 5432.
Note, that only one of the options (`s3_wal_path`, `gs_wal_path`,
`standby_host`) can be present under the `standby` top-level key.
## Logical backups

View File

@ -395,16 +395,22 @@ under the `clone` top-level key and do not affect the already running cluster.
## Standby cluster
On startup, an existing `standby` top-level key creates a standby Postgres
cluster streaming from a remote location. So far streaming from S3 and GCS WAL
archives is supported.
cluster streaming from a remote location - either from a S3 or GCS WAL
archive or a remote primary. Only one of options is allowed and required
if the `standby` key is present.
* **s3_wal_path**
the url to S3 bucket containing the WAL archive of the remote primary.
Optional, but `s3_wal_path` or `gs_wal_path` is required.
* **gs_wal_path**
the url to GS bucket containing the WAL archive of the remote primary.
Optional, but `s3_wal_path` or `gs_wal_path` is required.
* **standby_host**
hostname or IP address of the primary to stream from.
* **standby_port**
TCP port on which the primary is listening for connections. Patroni will
use `"5432"` if not set.
## Volume properties

View File

@ -838,15 +838,15 @@ point you should restore.
## Setting up a standby cluster
Standby cluster is a [Patroni feature](https://github.com/zalando/patroni/blob/master/docs/replica_bootstrap.rst#standby-cluster)
that first clones a database, and keeps replicating changes afterwards. As the
replication is happening by the means of archived WAL files (stored on S3 or
the equivalent of other cloud providers), the standby cluster can exist in a
different location than its source database. Unlike cloning, the PostgreSQL
version between source and target cluster has to be the same.
that first clones a database, and keeps replicating changes afterwards. It can
exist in a different location than its source database, but unlike cloning,
the PostgreSQL version between source and target cluster has to be the same.
To start a cluster as standby, add the following `standby` section in the YAML
file. Specify the S3/GS bucket path. Omitting both settings will result in an error
and no statefulset will be created.
file. You can stream changes from archived WAL files (AWS S3 or Google Cloud
Storage) or from a remote primary where you specify the host address and port.
If you leave out the port, Patroni will use `"5432"`. Only one option can be
specfied in the manifest:
```yaml
spec:
@ -860,32 +860,42 @@ spec:
gs_wal_path: "gs://<bucketname>/spilo/<source_db_cluster>/<UID>/wal/<PGVERSION>"
```
At the moment, the operator only allows to stream from the WAL archive of the
master. Thus, it is recommended to deploy standby clusters with only [one pod](https://github.com/zalando/postgres-operator/blob/master/manifests/standby-manifest.yaml#L10).
You can raise the instance count when detaching. Note, that the same pod role
labels like for normal clusters are used: The standby leader is labeled as
`master`.
```yaml
spec:
standby:
standby_host: "acid-minimal-cluster.default"
standby_port: "5433"
```
Note, that the pods and services use the same role labels like for normal clusters:
The standby leader is labeled as `master`. When using the `standby_host` option
you have to copy the credentials from the source cluster's secrets to successfully
bootstrap a standby cluster (see next chapter).
### Providing credentials of source cluster
A standby cluster is replicating the data (including users and passwords) from
the source database and is read-only. The system and application users (like
standby, postgres etc.) all have a password that does not match the credentials
stored in secrets which are created by the operator. One solution is to create
secrets beforehand and paste in the credentials of the source cluster.
stored in secrets which are created by the operator. You have two options:
a. Create secrets manually beforehand and paste the credentials of the source
cluster
b. Let the operator create the secrets when it bootstraps the standby cluster.
Patch the secrets with the credentials of the source cluster. Replace the
spilo pods.
Otherwise, you will see errors in the Postgres logs saying users cannot log in
and the operator logs will complain about not being able to sync resources.
If you stream changes from a remote primary you have to align the secrets or
the standby cluster will not start up.
When you only run a standby leader, you can safely ignore this, as it will be
sorted out once the cluster is detached from the source. It is also harmless if
you dont plan it. But, when you created a standby replica, too, fix the
credentials right away. WAL files will pile up on the standby leader if no
connection can be established between standby replica(s). You can also edit the
secrets after their creation. Find them by:
```bash
kubectl get secrets --all-namespaces | grep <standby-cluster-name>
```
If you stream changes from WAL files and you only run a standby leader, you
can safely ignore the secret mismatch, as it will be sorted out once the
cluster is detached from the source. It is also harmless if you do not plan it.
But, when you create a standby replica, too, fix the credentials right away.
WAL files will pile up on the standby leader if no connection can be
established between standby replica(s).
### Promote the standby

View File

@ -321,9 +321,15 @@ class K8s:
def get_cluster_replica_pod(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
return self.get_cluster_pod('replica', labels, namespace)
def get_secret_data(self, username, clustername='acid-minimal-cluster', namespace='default'):
return self.api.core_v1.read_namespaced_secret(
"{}.{}.credentials.postgresql.acid.zalan.do".format(username.replace("_","-"), clustername), namespace).data
def get_secret(self, username, clustername='acid-minimal-cluster', namespace='default'):
secret = self.api.core_v1.read_namespaced_secret(
"{}.{}.credentials.postgresql.acid.zalan.do".format(username.replace("_","-"), clustername), namespace)
secret.metadata.resource_version = None
secret.metadata.uid = None
return secret
def create_secret(self, secret, namespace='default'):
return self.api.core_v1.create_namespaced_secret(namespace, secret)
class K8sBase:
'''

View File

@ -1319,8 +1319,8 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# check if next rotation date was set in secret
secret_data = k8s.get_secret_data("zalando")
next_rotation_timestamp = datetime.strptime(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ")
zalando_secret = k8s.get_secret("zalando")
next_rotation_timestamp = datetime.strptime(str(base64.b64decode(zalando_secret.data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ")
today90days = today+timedelta(days=90)
self.assertEqual(today90days, next_rotation_timestamp.date(),
"Unexpected rotation date in secret of zalando user: expected {}, got {}".format(today90days, next_rotation_timestamp.date()))
@ -1361,9 +1361,9 @@ class EndToEndTestCase(unittest.TestCase):
"Operator does not get in sync")
# check if next rotation date and username have been replaced
secret_data = k8s.get_secret_data("foo_user")
secret_username = str(base64.b64decode(secret_data["username"]), 'utf-8')
next_rotation_timestamp = datetime.strptime(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ")
foo_user_secret = k8s.get_secret("foo_user")
secret_username = str(base64.b64decode(foo_user_secret.data["username"]), 'utf-8')
next_rotation_timestamp = datetime.strptime(str(base64.b64decode(foo_user_secret.data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ")
rotation_user = "foo_user"+today.strftime("%y%m%d")
today30days = today+timedelta(days=30)
@ -1396,9 +1396,9 @@ class EndToEndTestCase(unittest.TestCase):
"Operator does not get in sync")
# check if username in foo_user secret is reset
secret_data = k8s.get_secret_data("foo_user")
secret_username = str(base64.b64decode(secret_data["username"]), 'utf-8')
next_rotation_timestamp = str(base64.b64decode(secret_data["nextRotation"]), 'utf-8')
foo_user_secret = k8s.get_secret("foo_user")
secret_username = str(base64.b64decode(foo_user_secret.data["username"]), 'utf-8')
next_rotation_timestamp = str(base64.b64decode(foo_user_secret.data["nextRotation"]), 'utf-8')
self.assertEqual("foo_user", secret_username,
"Unexpected username in secret of foo_user: expected {}, got {}".format("foo_user", secret_username))
self.assertEqual('', next_rotation_timestamp,
@ -1644,6 +1644,42 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing")
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_standby_cluster(self):
'''
Create standby cluster streaming from remote primary
'''
k8s = self.k8s
standby_cluster_name = 'acid-standby-cluster'
cluster_name_label = 'cluster-name'
cluster_label = 'application=spilo,{}={}'.format(cluster_name_label, standby_cluster_name)
superuser_name = 'postgres'
replication_user = 'standby'
secret_suffix = 'credentials.postgresql.acid.zalan.do'
# copy secrets from remote cluster before operator creates them when bootstrapping the standby cluster
postgres_secret = k8s.get_secret(superuser_name)
postgres_secret.metadata.name = '{}.{}.{}'.format(superuser_name, standby_cluster_name, secret_suffix)
postgres_secret.metadata.labels[cluster_name_label] = standby_cluster_name
k8s.create_secret(postgres_secret)
standby_secret = k8s.get_secret(replication_user)
standby_secret.metadata.name = '{}.{}.{}'.format(replication_user, standby_cluster_name, secret_suffix)
standby_secret.metadata.labels[cluster_name_label] = standby_cluster_name
k8s.create_secret(standby_secret)
try:
k8s.create_with_kubectl("manifests/standby-manifest.yaml")
k8s.wait_for_pod_start("spilo-role=master," + cluster_label)
except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
raise
finally:
# delete the standby cluster so that the k8s_api.get_operator_state works correctly in subsequent tests
k8s.api.custom_objects_api.delete_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-standby-cluster")
time.sleep(5)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_taint_based_eviction(self):
'''

View File

@ -458,6 +458,17 @@ spec:
type: string
gs_wal_path:
type: string
standby_host:
type: string
standby_port:
type: string
oneOf:
- required:
- s3_wal_path
- required:
- gs_wal_path
- required:
- standby_host
streams:
type: array
nullable: true

View File

@ -10,6 +10,8 @@ spec:
numberOfInstances: 1
postgresql:
version: "14"
# Make this a standby cluster and provide the s3 bucket path of source cluster for continuous streaming.
# Make this a standby cluster and provide either the s3 bucket path of source cluster or the remote primary host for continuous streaming.
standby:
s3_wal_path: "s3://path/to/bucket/containing/wal/of/source/cluster/"
# s3_wal_path: "s3://mybucket/spilo/acid-minimal-cluster/abcd1234-2a4b-4b2a-8c9c-c1234defg567/wal/14/"
standby_host: "acid-minimal-cluster.default"
# standby_port: "5432"

View File

@ -714,6 +714,17 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
"gs_wal_path": {
Type: "string",
},
"standby_host": {
Type: "string",
},
"standby_port": {
Type: "string",
},
},
OneOf: []apiextv1.JSONSchemaProps{
apiextv1.JSONSchemaProps{Required: []string{"s3_wal_path"}},
apiextv1.JSONSchemaProps{Required: []string{"gs_wal_path"}},
apiextv1.JSONSchemaProps{Required: []string{"standby_host"}},
},
},
"streams": {

View File

@ -170,10 +170,12 @@ type Patroni struct {
SynchronousNodeCount uint32 `json:"synchronous_node_count,omitempty" defaults:"1"`
}
// StandbyDescription contains s3 wal path
// StandbyDescription contains remote primary config or s3/gs wal path
type StandbyDescription struct {
S3WalPath string `json:"s3_wal_path,omitempty"`
GSWalPath string `json:"gs_wal_path,omitempty"`
S3WalPath string `json:"s3_wal_path,omitempty"`
GSWalPath string `json:"gs_wal_path,omitempty"`
StandbyHost string `json:"standby_host,omitempty"`
StandbyPort string `json:"standby_port,omitempty"`
}
// TLSDescription specs TLS properties

View File

@ -763,7 +763,12 @@ func (c *Cluster) generatePodTemplate(
}
// generatePodEnvVars generates environment variables for the Spilo Pod
func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration string, cloneDescription *acidv1.CloneDescription, standbyDescription *acidv1.StandbyDescription, customPodEnvVarsList []v1.EnvVar) []v1.EnvVar {
func (c *Cluster) generateSpiloPodEnvVars(
uid types.UID,
spiloConfiguration string,
cloneDescription *acidv1.CloneDescription,
standbyDescription *acidv1.StandbyDescription,
customPodEnvVarsList []v1.EnvVar) []v1.EnvVar {
envVars := []v1.EnvVar{
{
Name: "SCOPE",
@ -1128,11 +1133,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
sort.Slice(customPodEnvVarsList,
func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name })
if spec.StandbyCluster != nil && spec.StandbyCluster.S3WalPath == "" &&
spec.StandbyCluster.GSWalPath == "" {
return nil, fmt.Errorf("one of s3_wal_path or gs_wal_path must be set for standby cluster")
}
// backward compatible check for InitContainers
if spec.InitContainersOld != nil {
msg := "manifest parameter init_containers is deprecated."
@ -1944,40 +1944,49 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription)
func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescription) []v1.EnvVar {
result := make([]v1.EnvVar, 0)
if description.S3WalPath == "" && description.GSWalPath == "" {
return nil
}
if description.S3WalPath != "" {
// standby with S3, find out the bucket to setup standby
msg := "standby from S3 bucket using custom parsed S3WalPath from the manifest %s "
c.logger.Infof(msg, description.S3WalPath)
if description.StandbyHost != "" {
// standby from remote primary
result = append(result, v1.EnvVar{
Name: "STANDBY_WALE_S3_PREFIX",
Value: description.S3WalPath,
Name: "STANDBY_HOST",
Value: description.StandbyHost,
})
} else if description.GSWalPath != "" {
msg := "standby from GS bucket using custom parsed GSWalPath from the manifest %s "
c.logger.Infof(msg, description.GSWalPath)
envs := []v1.EnvVar{
{
Name: "STANDBY_WALE_GS_PREFIX",
Value: description.GSWalPath,
},
{
Name: "STANDBY_GOOGLE_APPLICATION_CREDENTIALS",
Value: c.OpConfig.GCPCredentials,
},
if description.StandbyPort != "" {
result = append(result, v1.EnvVar{
Name: "STANDBY_PORT",
Value: description.StandbyPort,
})
}
result = append(result, envs...)
} else {
if description.S3WalPath != "" {
// standby with S3, find out the bucket to setup standby
msg := "Standby from S3 bucket using custom parsed S3WalPath from the manifest %s "
c.logger.Infof(msg, description.S3WalPath)
result = append(result, v1.EnvVar{
Name: "STANDBY_WALE_S3_PREFIX",
Value: description.S3WalPath,
})
} else if description.GSWalPath != "" {
msg := "Standby from GS bucket using custom parsed GSWalPath from the manifest %s "
c.logger.Infof(msg, description.GSWalPath)
envs := []v1.EnvVar{
{
Name: "STANDBY_WALE_GS_PREFIX",
Value: description.GSWalPath,
},
{
Name: "STANDBY_GOOGLE_APPLICATION_CREDENTIALS",
Value: c.OpConfig.GCPCredentials,
},
}
result = append(result, envs...)
}
result = append(result, v1.EnvVar{Name: "STANDBY_METHOD", Value: "STANDBY_WITH_WALE"})
result = append(result, v1.EnvVar{Name: "STANDBY_WAL_BUCKET_SCOPE_PREFIX", Value: ""})
}
result = append(result, v1.EnvVar{Name: "STANDBY_METHOD", Value: "STANDBY_WITH_WALE"})
result = append(result, v1.EnvVar{Name: "STANDBY_WAL_BUCKET_SCOPE_PREFIX", Value: ""})
return result
}

View File

@ -531,6 +531,117 @@ func TestCloneEnv(t *testing.T) {
}
}
func TestStandbyEnv(t *testing.T) {
testName := "TestStandbyEnv"
tests := []struct {
subTest string
standbyOpts *acidv1.StandbyDescription
env v1.EnvVar
envPos int
envLen int
}{
{
subTest: "from custom s3 path",
standbyOpts: &acidv1.StandbyDescription{
S3WalPath: "s3://some/path/",
},
env: v1.EnvVar{
Name: "STANDBY_WALE_S3_PREFIX",
Value: "s3://some/path/",
},
envPos: 0,
envLen: 3,
},
{
subTest: "from custom gs path",
standbyOpts: &acidv1.StandbyDescription{
GSWalPath: "gs://some/path/",
},
env: v1.EnvVar{
Name: "STANDBY_GOOGLE_APPLICATION_CREDENTIALS",
Value: "",
},
envPos: 1,
envLen: 4,
},
{
subTest: "ignore gs path if s3 is set",
standbyOpts: &acidv1.StandbyDescription{
S3WalPath: "s3://some/path/",
GSWalPath: "gs://some/path/",
},
env: v1.EnvVar{
Name: "STANDBY_METHOD",
Value: "STANDBY_WITH_WALE",
},
envPos: 1,
envLen: 3,
},
{
subTest: "from remote primary",
standbyOpts: &acidv1.StandbyDescription{
StandbyHost: "remote-primary",
},
env: v1.EnvVar{
Name: "STANDBY_HOST",
Value: "remote-primary",
},
envPos: 0,
envLen: 1,
},
{
subTest: "from remote primary with port",
standbyOpts: &acidv1.StandbyDescription{
StandbyHost: "remote-primary",
StandbyPort: "9876",
},
env: v1.EnvVar{
Name: "STANDBY_PORT",
Value: "9876",
},
envPos: 1,
envLen: 2,
},
{
subTest: "from remote primary - ignore WAL path",
standbyOpts: &acidv1.StandbyDescription{
GSWalPath: "gs://some/path/",
StandbyHost: "remote-primary",
},
env: v1.EnvVar{
Name: "STANDBY_HOST",
Value: "remote-primary",
},
envPos: 0,
envLen: 1,
},
}
var cluster = New(
Config{}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests {
envs := cluster.generateStandbyEnvironment(tt.standbyOpts)
env := envs[tt.envPos]
if env.Name != tt.env.Name {
t.Errorf("%s %s: Expected env name %s, have %s instead",
testName, tt.subTest, tt.env.Name, env.Name)
}
if env.Value != tt.env.Value {
t.Errorf("%s %s: Expected env value %s, have %s instead",
testName, tt.subTest, tt.env.Value, env.Value)
}
if len(envs) != tt.envLen {
t.Errorf("%s %s: Expected number of env variables %d, have %d instead",
testName, tt.subTest, tt.envLen, len(envs))
}
}
}
func TestExtractPgVersionFromBinPath(t *testing.T) {
testName := "TestExtractPgVersionFromBinPath"
tests := []struct {