add the possibility to create a standby cluster that streams from a remote primary
This commit is contained in:
parent
ca0c27a51b
commit
eb990fd456
|
|
@ -474,6 +474,10 @@ spec:
|
||||||
type: string
|
type: string
|
||||||
gs_wal_path:
|
gs_wal_path:
|
||||||
type: string
|
type: string
|
||||||
|
standby_host:
|
||||||
|
type: string
|
||||||
|
standby_port:
|
||||||
|
type: string
|
||||||
streams:
|
streams:
|
||||||
type: array
|
type: array
|
||||||
nullable: true
|
nullable: true
|
||||||
|
|
|
||||||
|
|
@ -385,17 +385,25 @@ under the `clone` top-level key and do not affect the already running cluster.
|
||||||
## Standby cluster
|
## Standby cluster
|
||||||
|
|
||||||
On startup, an existing `standby` top-level key creates a standby Postgres
|
On startup, an existing `standby` top-level key creates a standby Postgres
|
||||||
cluster streaming from a remote location. So far streaming from S3 and GCS WAL
|
cluster streaming from a remote location. Either from a S3 or GCS WAL
|
||||||
archives is supported.
|
archive or a remote primary. When both of them are set, `standby_host`
|
||||||
|
takes precedence.
|
||||||
|
|
||||||
* **s3_wal_path**
|
* **s3_wal_path**
|
||||||
the url to S3 bucket containing the WAL archive of the remote primary.
|
the url to S3 bucket containing the WAL archive of the remote primary.
|
||||||
Optional, but `s3_wal_path` or `gs_wal_path` is required.
|
Required when the `standby` section is present even when `standby_host` is set.
|
||||||
|
|
||||||
* **gs_wal_path**
|
* **gs_wal_path**
|
||||||
the url to GS bucket containing the WAL archive of the remote primary.
|
the url to GS bucket containing the WAL archive of the remote primary.
|
||||||
Optional, but `s3_wal_path` or `gs_wal_path` is required.
|
Optional, but `s3_wal_path` or `gs_wal_path` is required.
|
||||||
|
|
||||||
|
* **standby_host**
|
||||||
|
hostname or IP address of the primary to stream from.
|
||||||
|
When set, `s3_wal_path` is ignored.
|
||||||
|
|
||||||
|
* **standby_port**
|
||||||
|
TCP port on which the primary is listening for connections.
|
||||||
|
|
||||||
## Volume properties
|
## Volume properties
|
||||||
|
|
||||||
Those parameters are grouped under the `volume` top-level key and define the
|
Those parameters are grouped under the `volume` top-level key and define the
|
||||||
|
|
|
||||||
|
|
@ -472,6 +472,10 @@ spec:
|
||||||
type: string
|
type: string
|
||||||
gs_wal_path:
|
gs_wal_path:
|
||||||
type: string
|
type: string
|
||||||
|
standby_host:
|
||||||
|
type: string
|
||||||
|
standby_port:
|
||||||
|
type: string
|
||||||
streams:
|
streams:
|
||||||
type: array
|
type: array
|
||||||
nullable: true
|
nullable: true
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,8 @@ spec:
|
||||||
numberOfInstances: 1
|
numberOfInstances: 1
|
||||||
postgresql:
|
postgresql:
|
||||||
version: "14"
|
version: "14"
|
||||||
# Make this a standby cluster and provide the s3 bucket path of source cluster for continuous streaming.
|
# Make this a standby cluster and provide either the s3 bucket path of source cluster or the remote primary host for continuous streaming.
|
||||||
standby:
|
standby:
|
||||||
s3_wal_path: "s3://path/to/bucket/containing/wal/of/source/cluster/"
|
s3_wal_path: "s3://path/to/bucket/containing/wal/of/source/cluster/"
|
||||||
|
# standby_host: ""
|
||||||
|
# standby_port: ""
|
||||||
|
|
@ -714,6 +714,12 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
|
||||||
"gs_wal_path": {
|
"gs_wal_path": {
|
||||||
Type: "string",
|
Type: "string",
|
||||||
},
|
},
|
||||||
|
"standby_host": {
|
||||||
|
Type: "string",
|
||||||
|
},
|
||||||
|
"standby_port": {
|
||||||
|
Type: "string",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"streams": {
|
"streams": {
|
||||||
|
|
|
||||||
|
|
@ -168,10 +168,12 @@ type Patroni struct {
|
||||||
SynchronousNodeCount uint32 `json:"synchronous_node_count,omitempty" defaults:1`
|
SynchronousNodeCount uint32 `json:"synchronous_node_count,omitempty" defaults:1`
|
||||||
}
|
}
|
||||||
|
|
||||||
// StandbyDescription contains s3 wal path
|
// StandbyDescription contains remote primary config or s3 wal path
|
||||||
type StandbyDescription struct {
|
type StandbyDescription struct {
|
||||||
S3WalPath string `json:"s3_wal_path,omitempty"`
|
S3WalPath string `json:"s3_wal_path,omitempty"`
|
||||||
GSWalPath string `json:"gs_wal_path,omitempty"`
|
GSWalPath string `json:"gs_wal_path,omitempty"`
|
||||||
|
StandbyHost string `json:"standby_host,omitempty"`
|
||||||
|
StandbyPort string `json:"standby_port,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TLSDescription specs TLS properties
|
// TLSDescription specs TLS properties
|
||||||
|
|
|
||||||
|
|
@ -1093,9 +1093,10 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
|
||||||
sort.Slice(customPodEnvVarsList,
|
sort.Slice(customPodEnvVarsList,
|
||||||
func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name })
|
func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name })
|
||||||
|
|
||||||
if spec.StandbyCluster != nil && spec.StandbyCluster.S3WalPath == "" &&
|
if spec.StandbyCluster != nil {
|
||||||
spec.StandbyCluster.GSWalPath == "" {
|
if spec.StandbyCluster.S3WalPath == "" && spec.StandbyCluster.GSWalPath == "" && spec.StandbyCluster.StandbyHost == "" {
|
||||||
return nil, fmt.Errorf("one of s3_wal_path or gs_wal_path must be set for standby cluster")
|
return nil, fmt.Errorf("s3_wal_path, gs_wal_path and standby_host are empty for standby cluster")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// backward compatible check for InitContainers
|
// backward compatible check for InitContainers
|
||||||
|
|
@ -1905,10 +1906,19 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription)
|
||||||
func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescription) []v1.EnvVar {
|
func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescription) []v1.EnvVar {
|
||||||
result := make([]v1.EnvVar, 0)
|
result := make([]v1.EnvVar, 0)
|
||||||
|
|
||||||
if description.S3WalPath == "" && description.GSWalPath == "" {
|
if description.StandbyHost != "" {
|
||||||
return nil
|
// standby from remote primary
|
||||||
|
result = append(result, v1.EnvVar{
|
||||||
|
Name: "STANDBY_HOST",
|
||||||
|
Value: description.StandbyHost,
|
||||||
|
})
|
||||||
|
if description.StandbyPort != "" {
|
||||||
|
result = append(result, v1.EnvVar{
|
||||||
|
Name: "STANDBY_PORT",
|
||||||
|
Value: description.StandbyPort,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
if description.S3WalPath != "" {
|
if description.S3WalPath != "" {
|
||||||
// standby with S3, find out the bucket to setup standby
|
// standby with S3, find out the bucket to setup standby
|
||||||
msg := "Standby from S3 bucket using custom parsed S3WalPath from the manifest %s "
|
msg := "Standby from S3 bucket using custom parsed S3WalPath from the manifest %s "
|
||||||
|
|
@ -1933,12 +1943,13 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
result = append(result, envs...)
|
result = append(result, envs...)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
result = append(result, v1.EnvVar{Name: "STANDBY_METHOD", Value: "STANDBY_WITH_WALE"})
|
result = append(result, v1.EnvVar{Name: "STANDBY_METHOD", Value: "STANDBY_WITH_WALE"})
|
||||||
result = append(result, v1.EnvVar{Name: "STANDBY_WAL_BUCKET_SCOPE_PREFIX", Value: ""})
|
result = append(result, v1.EnvVar{Name: "STANDBY_WAL_BUCKET_SCOPE_PREFIX", Value: ""})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -529,6 +529,83 @@ func TestCloneEnv(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStandbyEnv(t *testing.T) {
|
||||||
|
testName := "TestStandbyEnv"
|
||||||
|
tests := []struct {
|
||||||
|
subTest string
|
||||||
|
standbyOpts *acidv1.StandbyDescription
|
||||||
|
env v1.EnvVar
|
||||||
|
envPos int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
subTest: "from custom s3 path",
|
||||||
|
standbyOpts: &acidv1.StandbyDescription{
|
||||||
|
S3WalPath: "s3://some/path/",
|
||||||
|
},
|
||||||
|
env: v1.EnvVar{
|
||||||
|
Name: "STANDBY_WALE_S3_PREFIX",
|
||||||
|
Value: "s3://some/path/",
|
||||||
|
},
|
||||||
|
envPos: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
subTest: "from custom gs path",
|
||||||
|
standbyOpts: &acidv1.StandbyDescription{
|
||||||
|
GSWalPath: "gs://some/path/",
|
||||||
|
},
|
||||||
|
env: v1.EnvVar{
|
||||||
|
Name: "STANDBY_WALE_GS_PREFIX",
|
||||||
|
Value: "gs://some/path/",
|
||||||
|
},
|
||||||
|
envPos: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
subTest: "from remote primary",
|
||||||
|
standbyOpts: &acidv1.StandbyDescription{
|
||||||
|
S3WalPath: "s3://some/path/",
|
||||||
|
StandbyHost: "remote-primary",
|
||||||
|
},
|
||||||
|
env: v1.EnvVar{
|
||||||
|
Name: "STANDBY_HOST",
|
||||||
|
Value: "remote-primary",
|
||||||
|
},
|
||||||
|
envPos: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
subTest: "from remote primary with port",
|
||||||
|
standbyOpts: &acidv1.StandbyDescription{
|
||||||
|
S3WalPath: "s3://some/path/",
|
||||||
|
StandbyHost: "remote-primary",
|
||||||
|
StandbyPort: "9876",
|
||||||
|
},
|
||||||
|
env: v1.EnvVar{
|
||||||
|
Name: "STANDBY_PORT",
|
||||||
|
Value: "9876",
|
||||||
|
},
|
||||||
|
envPos: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var cluster = New(
|
||||||
|
Config{}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
envs := cluster.generateStandbyEnvironment(tt.standbyOpts)
|
||||||
|
|
||||||
|
env := envs[tt.envPos]
|
||||||
|
|
||||||
|
if env.Name != tt.env.Name {
|
||||||
|
t.Errorf("%s %s: Expected env name %s, have %s instead",
|
||||||
|
testName, tt.subTest, tt.env.Name, env.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if env.Value != tt.env.Value {
|
||||||
|
t.Errorf("%s %s: Expected env value %s, have %s instead",
|
||||||
|
testName, tt.subTest, tt.env.Value, env.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestExtractPgVersionFromBinPath(t *testing.T) {
|
func TestExtractPgVersionFromBinPath(t *testing.T) {
|
||||||
testName := "TestExtractPgVersionFromBinPath"
|
testName := "TestExtractPgVersionFromBinPath"
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue