diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 0df86260c..62e572e7b 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -199,6 +199,9 @@ explanation of `ttl` and `loop_wait` parameters. automatically created by Patroni for cluster members and permanent replication slots. Optional. +* **standby** + initializes cluster as a standby creating a cascading replication, where standby leader is streaming from specified remote location + ## Postgres container resources Those parameters define [CPU and memory requests and diff --git a/docs/user.md b/docs/user.md index 4cce153aa..4f8162121 100644 --- a/docs/user.md +++ b/docs/user.md @@ -281,6 +281,23 @@ spec: s3_force_path_style: true ``` +## Setting up a standby cluster + +Standby clusters are like normal cluster but they are streaming from a remote cluster. As the first version of this feature, the only scenario covered by operator is to stream from a wal archive of the master. Following the more popular infrastructure of using Amazon's S3 buckets, it is mentioned as s3_wal_path here. To make a cluster as standby add a section standby in the YAML file as follows. + +```yaml +spec: + standby: + s3_wal_path: "s3 bucket path to the master" +``` + +Things to note: + +- An empty string is provided in s3_wal_path of the standby cluster will result in error and no statefulset will be created. +- Only one pod can be deployed for stand-by cluster. +- To manually promote the standby_cluster, use patronictl and remove config entry. +- There is no way to transform a non-standby cluster to standby cluster through operator. Hence, if a cluster is created without standby section in YAML and later modified by adding that section, there will be no effect on the cluster. However, it can be done through Patroni by adding the [standby_cluster] (https://github.com/zalando/patroni/blob/bd2c54581abb42a7d3a3da551edf0b8732eefd27/docs/replica_bootstrap.rst#standby-cluster) section using patronictl edit-config. Note that the transformed standby cluster will not be doing any streaming, rather will just be in standby mode and allow read-only transactions only. + ## Sidecar Support Each cluster can specify arbitrary sidecars to run. These containers could be used for diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index de66be6f7..85cae5e3c 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -66,7 +66,7 @@ spec: # cluster: "acid-batman" # timestamp: "2017-12-19T12:40:33+01:00" # timezone required (offset relative to UTC, see RFC 3339 section 5.6) # s3_wal_path: "s3://custom/path/to/bucket" - + # run periodic backups with k8s cron jobs # enableLogicalBackup: true # logicalBackupSchedule: "30 00 * * *" @@ -86,4 +86,3 @@ spec: # env: # - name: "USEFUL_VAR" # value: "perhaps-true" - diff --git a/manifests/standby-manifest.yaml b/manifests/standby-manifest.yaml new file mode 100644 index 000000000..a92045dba --- /dev/null +++ b/manifests/standby-manifest.yaml @@ -0,0 +1,20 @@ + +apiVersion: "acid.zalan.do/v1" +kind: postgresql +metadata: + name: acid-standby-cluster + namespace: default +spec: + teamId: "ACID" + volume: + size: 1Gi + numberOfInstances: 1 + postgresql: + version: "10" + # Make this a standby cluster and provide the s3 bucket path of source cluster for continuous streaming. + standby: + s3_wal_path: "s3://path/to/bucket/containing/wal/of/source/cluster/" + + maintenanceWindows: + - 01:00-06:00 #UTC + - Sat:00:00-04:00 \ No newline at end of file diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 165139be3..e971716bc 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -58,6 +58,7 @@ type PostgresSpec struct { ShmVolume *bool `json:"enableShmVolume,omitempty"` EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"` LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"` + StandbyCluster *StandbyDescription `json:"standby"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -114,6 +115,11 @@ type Patroni struct { Slots map[string]map[string]string `json:"slots"` } +//StandbyCluster +type StandbyDescription struct { + S3WalPath string `json:"s3_wal_path,omitempty"` +} + // CloneDescription describes which cluster the new should clone and up to which point in time type CloneDescription struct { ClusterName string `json:"cluster,omitempty"` diff --git a/pkg/apis/acid.zalan.do/v1/util_test.go b/pkg/apis/acid.zalan.do/v1/util_test.go index 1f8825090..3c087101b 100644 --- a/pkg/apis/acid.zalan.do/v1/util_test.go +++ b/pkg/apis/acid.zalan.do/v1/util_test.go @@ -330,7 +330,7 @@ var unmarshalCluster = []struct { Status: PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}, Error: errors.New("name must match {TEAM}-{NAME} format").Error(), }, - marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"teapot-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":{"PostgresClusterStatus":"Invalid"}}`), + marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"teapot-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null} ,"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":{"PostgresClusterStatus":"Invalid"}}`), err: nil}, // clone example { @@ -354,6 +354,28 @@ var unmarshalCluster = []struct { }, marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{"cluster":"team-batman"}},"status":{"PostgresClusterStatus":""}}`), err: nil}, + // standby example + { + in: []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": "acid", "standby": {"s3_wal_path": "s3://custom/path/to/bucket/"}}}`), + out: Postgresql{ + TypeMeta: metav1.TypeMeta{ + Kind: "Postgresql", + APIVersion: "acid.zalan.do/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-testcluster1", + }, + Spec: PostgresSpec{ + TeamID: "acid", + StandbyCluster: &StandbyDescription{ + S3WalPath: "s3://custom/path/to/bucket/", + }, + ClusterName: "testcluster1", + }, + Error: "", + }, + marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"standby":{"s3_wal_path":"s3://custom/path/to/bucket/"}},"status":{"PostgresClusterStatus":""}}`), + err: nil}, // erroneous examples { in: []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1"`), diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 6a554a2d8..7b0710eba 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -503,6 +503,11 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { *out = new(bool) **out = **in } + if in.StandbyCluster != nil { + in, out := &in.StandbyCluster, &out.StandbyCluster + *out = new(StandbyDescription) + **out = **in + } return } @@ -711,6 +716,22 @@ func (in *Sidecar) DeepCopy() *Sidecar { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StandbyDescription) DeepCopyInto(out *StandbyDescription) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StandbyDescription. +func (in *StandbyDescription) DeepCopy() *StandbyDescription { + if in == nil { + return nil + } + out := new(StandbyDescription) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TeamsAPIConfiguration) DeepCopyInto(out *TeamsAPIConfiguration) { *out = *in diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index a6644b868..e8eed15cc 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -287,7 +287,7 @@ func (c *Cluster) Create() error { c.logger.Infof("pods are ready") // create database objects unless we are running without pods or disabled that feature explicitly - if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0) { + if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) } @@ -626,7 +626,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { }() // Roles and Databases - if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0) { + if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { c.logger.Debugf("syncing roles") if err := c.syncRoles(); err != nil { c.logger.Errorf("could not sync roles: %v", err) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 8cb0ea0a3..3e103d677 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -500,7 +500,7 @@ func generatePodTemplate( } // generatePodEnvVars generates environment variables for the Spilo Pod -func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration string, cloneDescription *acidv1.CloneDescription, customPodEnvVarsList []v1.EnvVar) []v1.EnvVar { +func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration string, cloneDescription *acidv1.CloneDescription, standbyDescription *acidv1.StandbyDescription, customPodEnvVarsList []v1.EnvVar) []v1.EnvVar { envVars := []v1.EnvVar{ { Name: "SCOPE", @@ -604,6 +604,10 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri envVars = append(envVars, c.generateCloneEnvironment(cloneDescription)...) } + if c.Spec.StandbyCluster != nil { + envVars = append(envVars, c.generateStandbyEnvironment(standbyDescription)...) + } + if len(customPodEnvVarsList) > 0 { envVars = append(envVars, customPodEnvVarsList...) } @@ -793,6 +797,9 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*v1beta1.State sort.Slice(customPodEnvVarsList, func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name }) } + if spec.StandbyCluster != nil && spec.StandbyCluster.S3WalPath == "" { + return nil, fmt.Errorf("s3_wal_path is empty for standby cluster") + } spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, c.OpConfig.PamRoleName, c.logger) if err != nil { @@ -802,7 +809,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*v1beta1.State // generate environment variables for the spilo container spiloEnvVars := deduplicateEnvVars( c.generateSpiloPodEnvVars(c.Postgresql.GetUID(), spiloConfiguration, &spec.Clone, - customPodEnvVarsList), c.containerName(), c.logger) + spec.StandbyCluster, customPodEnvVarsList), c.containerName(), c.logger) // pickup the docker image for the spilo container effectiveDockerImage := util.Coalesce(spec.DockerImage, c.OpConfig.DockerImage) @@ -982,6 +989,11 @@ func (c *Cluster) getNumberOfInstances(spec *acidv1.PostgresSpec) int32 { cur := spec.NumberOfInstances newcur := cur + /* Limit the max number of pods to one, if this is standby-cluster */ + if spec.StandbyCluster != nil { + c.logger.Info("Standby cluster can have maximum of 1 pod") + max = 1 + } if max >= 0 && newcur > max { newcur = max } @@ -1328,6 +1340,27 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription) return result } +func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescription) []v1.EnvVar { + result := make([]v1.EnvVar, 0) + + if description.S3WalPath == "" { + return nil + } + // standby with S3, find out the bucket to setup standby + msg := "Standby from S3 bucket using custom parsed S3WalPath from the manifest %s " + c.logger.Infof(msg, description.S3WalPath) + + result = append(result, v1.EnvVar{ + Name: "STANDBY_WALE_S3_PREFIX", + Value: description.S3WalPath, + }) + + result = append(result, v1.EnvVar{Name: "STANDBY_METHOD", Value: "STANDBY_WITH_WALE"}) + result = append(result, v1.EnvVar{Name: "STANDBY_WAL_BUCKET_SCOPE_PREFIX", Value: ""}) + + return result +} + func (c *Cluster) generatePodDisruptionBudget() *policybeta1.PodDisruptionBudget { minAvailable := intstr.FromInt(1) pdbEnabled := c.OpConfig.EnablePodDisruptionBudget diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index f69bdd2d9..dd55cd04c 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -90,7 +90,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } // create database objects unless we are running without pods or disabled that feature explicitly - if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0) { + if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil) { c.logger.Debugf("syncing roles") if err = c.syncRoles(); err != nil { err = fmt.Errorf("could not sync roles: %v", err)