merge with master
This commit is contained in:
commit
8cef3b2bee
|
|
@ -7,6 +7,8 @@
|
|||
_obj
|
||||
_test
|
||||
_manifests
|
||||
_tmp
|
||||
github.com
|
||||
|
||||
# Architecture specific extensions/prefixes
|
||||
*.[568vq]
|
||||
|
|
@ -26,6 +28,7 @@ _testmain.go
|
|||
/vendor/
|
||||
/build/
|
||||
/docker/build/
|
||||
/github.com/
|
||||
.idea
|
||||
|
||||
scm-source.json
|
||||
|
|
|
|||
|
|
@ -74,6 +74,28 @@ spec:
|
|||
- teamId
|
||||
- postgresql
|
||||
properties:
|
||||
additionalVolumes:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
required:
|
||||
- name
|
||||
- mountPath
|
||||
- volumeSource
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
mountPath:
|
||||
type: string
|
||||
targetContainers:
|
||||
type: array
|
||||
nullable: true
|
||||
items:
|
||||
type: string
|
||||
volumeSource:
|
||||
type: object
|
||||
subPath:
|
||||
type: string
|
||||
allowedSourceRanges:
|
||||
type: array
|
||||
nullable: true
|
||||
|
|
@ -218,6 +240,10 @@ spec:
|
|||
type: integer
|
||||
retry_timeout:
|
||||
type: integer
|
||||
synchronous_mode:
|
||||
type: boolean
|
||||
synchronous_mode_strict:
|
||||
type: boolean
|
||||
maximum_lag_on_failover:
|
||||
type: integer
|
||||
podAnnotations:
|
||||
|
|
@ -358,6 +384,21 @@ spec:
|
|||
type: string
|
||||
teamId:
|
||||
type: string
|
||||
tls:
|
||||
type: object
|
||||
required:
|
||||
- secretName
|
||||
properties:
|
||||
secretName:
|
||||
type: string
|
||||
certificateFile:
|
||||
type: string
|
||||
privateKeyFile:
|
||||
type: string
|
||||
caFile:
|
||||
type: string
|
||||
caSecretName:
|
||||
type: string
|
||||
tolerations:
|
||||
type: array
|
||||
items:
|
||||
|
|
|
|||
|
|
@ -277,11 +277,11 @@ configConnectionPooler:
|
|||
# docker image
|
||||
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer"
|
||||
# max db connections the pooler should hold
|
||||
connection_pooler_max_db_connections: 60
|
||||
connection_pooler_max_db_connections: "60"
|
||||
# default pooling mode
|
||||
connection_pooler_mode: "transaction"
|
||||
# number of pooler instances
|
||||
connection_pooler_number_of_instances: 2
|
||||
connection_pooler_number_of_instances: "2"
|
||||
# default resources
|
||||
connection_pooler_default_cpu_request: 500m
|
||||
connection_pooler_default_memory_request: 100Mi
|
||||
|
|
@ -294,6 +294,7 @@ rbac:
|
|||
|
||||
crd:
|
||||
# Specifies whether custom resource definitions should be created
|
||||
# When using helm3, this is ignored; instead use "--skip-crds" to skip.
|
||||
create: true
|
||||
|
||||
serviceAccount:
|
||||
|
|
|
|||
|
|
@ -254,11 +254,11 @@ configConnectionPooler:
|
|||
# docker image
|
||||
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer"
|
||||
# max db connections the pooler should hold
|
||||
connection_pooler_max_db_connections: 60
|
||||
connection_pooler_max_db_connections: "60"
|
||||
# default pooling mode
|
||||
connection_pooler_mode: "transaction"
|
||||
# number of pooler instances
|
||||
connection_pooler_number_of_instances: 2
|
||||
connection_pooler_number_of_instances: "2"
|
||||
# default resources
|
||||
connection_pooler_default_cpu_request: 500m
|
||||
connection_pooler_default_memory_request: 100Mi
|
||||
|
|
@ -271,6 +271,7 @@ rbac:
|
|||
|
||||
crd:
|
||||
# Specifies whether custom resource definitions should be created
|
||||
# When using helm3, this is ignored; instead use "--skip-crds" to skip.
|
||||
create: true
|
||||
|
||||
serviceAccount:
|
||||
|
|
|
|||
|
|
@ -154,6 +154,18 @@ These parameters are grouped directly under the `spec` key in the manifest.
|
|||
[the reference schedule format](https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/#schedule)
|
||||
into account. Optional. Default is: "30 00 \* \* \*"
|
||||
|
||||
* **additionalVolumes**
|
||||
List of additional volumes to mount in each container of the statefulset pod.
|
||||
Each item must contain a `name`, `mountPath`, and `volumeSource` which is a
|
||||
[kubernetes volumeSource](https://godoc.org/k8s.io/api/core/v1#VolumeSource).
|
||||
It allows you to mount existing PersistentVolumeClaims, ConfigMaps and Secrets inside the StatefulSet.
|
||||
Also an `emptyDir` volume can be shared between initContainer and statefulSet.
|
||||
Additionaly, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example).
|
||||
You can also specify in which container the additional Volumes will be mounted with the `targetContainers` array option.
|
||||
If `targetContainers` is empty, additional volumes will be mounted only in the `postgres` container.
|
||||
If you set the `all` special item, it will be mounted in all containers (postgres + sidecars).
|
||||
Else you can set the list of target containers in which the additional volumes will be mounted (eg : postgres, telegraf)
|
||||
|
||||
## Postgres parameters
|
||||
|
||||
Those parameters are grouped under the `postgresql` top-level key, which is
|
||||
|
|
@ -217,6 +229,12 @@ explanation of `ttl` and `loop_wait` parameters.
|
|||
automatically created by Patroni for cluster members and permanent replication
|
||||
slots. Optional.
|
||||
|
||||
* **synchronous_mode**
|
||||
Patroni `synchronous_mode` parameter value. The default is set to `false`. Optional.
|
||||
|
||||
* **synchronous_mode_strict**
|
||||
Patroni `synchronous_mode_strict` parameter value. Can be used in addition to `synchronous_mode`. The default is set to `false`. Optional.
|
||||
|
||||
## Postgres container resources
|
||||
|
||||
Those parameters define [CPU and memory requests and limits](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/)
|
||||
|
|
@ -376,10 +394,11 @@ present.
|
|||
How many instances of connection pooler to create.
|
||||
|
||||
* **schema**
|
||||
Schema to create for credentials lookup function.
|
||||
Database schema to create for credentials lookup function.
|
||||
|
||||
* **user**
|
||||
User to create for connection pooler to be able to connect to a database.
|
||||
You can also choose a role from the `users` section or a system user role.
|
||||
|
||||
* **dockerImage**
|
||||
Which docker image to use for connection pooler deployment.
|
||||
|
|
@ -412,5 +431,16 @@ Those parameters are grouped under the `tls` top-level key.
|
|||
Filename of the private key. Defaults to "tls.key".
|
||||
|
||||
* **caFile**
|
||||
Optional filename to the CA certificate. Useful when the client connects
|
||||
with `sslmode=verify-ca` or `sslmode=verify-full`. Default is empty.
|
||||
Optional filename to the CA certificate (e.g. "ca.crt"). Useful when the
|
||||
client connects with `sslmode=verify-ca` or `sslmode=verify-full`.
|
||||
Default is empty.
|
||||
|
||||
* **caSecretName**
|
||||
By setting the `caSecretName` value, the ca certificate file defined by the
|
||||
`caFile` will be fetched from this secret instead of `secretName` above.
|
||||
This secret has to hold a file with that name in its root.
|
||||
|
||||
Optionally one can provide full path for any of them. By default it is
|
||||
relative to the "/tls/", which is mount path of the tls secret.
|
||||
If `caSecretName` is defined, the ca.crt path is relative to "/tlsca/",
|
||||
otherwise to the same "/tls/".
|
||||
|
|
|
|||
|
|
@ -615,11 +615,14 @@ operator being able to provide some reasonable defaults.
|
|||
the required minimum.
|
||||
|
||||
* **connection_pooler_schema**
|
||||
Schema to create for credentials lookup function. Default is `pooler`.
|
||||
Database schema to create for credentials lookup function to be used by the
|
||||
connection pooler. Is is created in every database of the Postgres cluster.
|
||||
You can also choose an existing schema. Default schema is `pooler`.
|
||||
|
||||
* **connection_pooler_user**
|
||||
User to create for connection pooler to be able to connect to a database.
|
||||
Default is `pooler`.
|
||||
You can also choose an existing role, but make sure it has the `LOGIN`
|
||||
privilege. Default role is `pooler`.
|
||||
|
||||
* **connection_pooler_image**
|
||||
Docker image to use for connection pooler deployment.
|
||||
|
|
|
|||
43
docs/user.md
43
docs/user.md
|
|
@ -527,7 +527,7 @@ spec:
|
|||
This will tell the operator to create a connection pooler with default
|
||||
configuration, through which one can access the master via a separate service
|
||||
`{cluster-name}-pooler`. In most of the cases the
|
||||
[default configuration](reference/operator_parameters.md#connection-pool-configuration)
|
||||
[default configuration](reference/operator_parameters.md#connection-pooler-configuration)
|
||||
should be good enough. To configure a new connection pooler individually for
|
||||
each Postgres cluster, specify:
|
||||
|
||||
|
|
@ -540,7 +540,8 @@ spec:
|
|||
# in which mode to run, session or transaction
|
||||
mode: "transaction"
|
||||
|
||||
# schema, which operator will create to install credentials lookup function
|
||||
# schema, which operator will create in each database
|
||||
# to install credentials lookup function for connection pooler
|
||||
schema: "pooler"
|
||||
|
||||
# user, which operator will create for connection pooler
|
||||
|
|
@ -560,11 +561,11 @@ The `enableConnectionPooler` flag is not required when the `connectionPooler`
|
|||
section is present in the manifest. But, it can be used to disable/remove the
|
||||
pooler while keeping its configuration.
|
||||
|
||||
By default, `pgbouncer` is used as connection pooler. To find out about pooler
|
||||
modes read the `pgbouncer` [docs](https://www.pgbouncer.org/config.html#pooler_mode)
|
||||
By default, [`PgBouncer`](https://www.pgbouncer.org/) is used as connection pooler.
|
||||
To find out about pool modes read the `PgBouncer` [docs](https://www.pgbouncer.org/config.html#pooler_mode)
|
||||
(but it should be the general approach between different implementation).
|
||||
|
||||
Note, that using `pgbouncer` a meaningful resource CPU limit should be 1 core
|
||||
Note, that using `PgBouncer` a meaningful resource CPU limit should be 1 core
|
||||
or less (there is a way to utilize more than one, but in K8s it's easier just to
|
||||
spin up more instances).
|
||||
|
||||
|
|
@ -583,7 +584,8 @@ don't know the value, use `103` which is the GID from the default spilo image
|
|||
OpenShift allocates the users and groups dynamically (based on scc), and their
|
||||
range is different in every namespace. Due to this dynamic behaviour, it's not
|
||||
trivial to know at deploy time the uid/gid of the user in the cluster.
|
||||
This way, in OpenShift, you may want to skip the spilo_fsgroup setting.
|
||||
Therefore, instead of using a global `spilo_fsgroup` setting, use the `spiloFSGroup` field
|
||||
per Postgres cluster.
|
||||
|
||||
Upload the cert as a kubernetes secret:
|
||||
```sh
|
||||
|
|
@ -592,7 +594,7 @@ kubectl create secret tls pg-tls \
|
|||
--cert pg-tls.crt
|
||||
```
|
||||
|
||||
Or with a CA:
|
||||
When doing client auth, CA can come optionally from the same secret:
|
||||
```sh
|
||||
kubectl create secret generic pg-tls \
|
||||
--from-file=tls.crt=server.crt \
|
||||
|
|
@ -600,9 +602,6 @@ kubectl create secret generic pg-tls \
|
|||
--from-file=ca.crt=ca.crt
|
||||
```
|
||||
|
||||
Alternatively it is also possible to use
|
||||
[cert-manager](https://cert-manager.io/docs/) to generate these secrets.
|
||||
|
||||
Then configure the postgres resource with the TLS secret:
|
||||
|
||||
```yaml
|
||||
|
|
@ -617,5 +616,29 @@ spec:
|
|||
caFile: "ca.crt" # add this if the secret is configured with a CA
|
||||
```
|
||||
|
||||
Optionally, the CA can be provided by a different secret:
|
||||
```sh
|
||||
kubectl create secret generic pg-tls-ca \
|
||||
--from-file=ca.crt=ca.crt
|
||||
```
|
||||
|
||||
Then configure the postgres resource with the TLS secret:
|
||||
|
||||
```yaml
|
||||
apiVersion: "acid.zalan.do/v1"
|
||||
kind: postgresql
|
||||
|
||||
metadata:
|
||||
name: acid-test-cluster
|
||||
spec:
|
||||
tls:
|
||||
secretName: "pg-tls" # this should hold tls.key and tls.crt
|
||||
caSecretName: "pg-tls-ca" # this should hold ca.crt
|
||||
caFile: "ca.crt" # add this if the secret is configured with a CA
|
||||
```
|
||||
|
||||
Alternatively, it is also possible to use
|
||||
[cert-manager](https://cert-manager.io/docs/) to generate these secrets.
|
||||
|
||||
Certificate rotation is handled in the spilo image which checks every 5
|
||||
minutes if the certificates have changed and reloads postgres accordingly.
|
||||
|
|
|
|||
|
|
@ -344,7 +344,6 @@ class EndToEndTestCase(unittest.TestCase):
|
|||
'''
|
||||
k8s = self.k8s
|
||||
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
|
||||
labels = 'spilo-role=master,' + cluster_label
|
||||
readiness_label = 'lifecycle-status'
|
||||
readiness_value = 'ready'
|
||||
|
||||
|
|
@ -709,14 +708,16 @@ class K8s:
|
|||
def wait_for_logical_backup_job_creation(self):
|
||||
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
|
||||
|
||||
def update_config(self, config_map_patch):
|
||||
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
|
||||
|
||||
def delete_operator_pod(self):
|
||||
operator_pod = self.api.core_v1.list_namespaced_pod(
|
||||
'default', label_selector="name=postgres-operator").items[0].metadata.name
|
||||
self.api.core_v1.delete_namespaced_pod(operator_pod, "default") # restart reloads the conf
|
||||
self.wait_for_operator_pod_start()
|
||||
|
||||
def update_config(self, config_map_patch):
|
||||
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
|
||||
self.delete_operator_pod()
|
||||
|
||||
def create_with_kubectl(self, path):
|
||||
return subprocess.run(
|
||||
["kubectl", "create", "-f", path],
|
||||
|
|
|
|||
|
|
@ -9,9 +9,6 @@ metadata:
|
|||
spec:
|
||||
dockerImage: registry.opensource.zalan.do/acid/spilo-12:1.6-p2
|
||||
teamId: "acid"
|
||||
volume:
|
||||
size: 1Gi
|
||||
# storageClass: my-sc
|
||||
numberOfInstances: 2
|
||||
users: # Application/Robot users
|
||||
zalando:
|
||||
|
|
@ -41,6 +38,32 @@ spec:
|
|||
shared_buffers: "32MB"
|
||||
max_connections: "10"
|
||||
log_statement: "all"
|
||||
volume:
|
||||
size: 1Gi
|
||||
# storageClass: my-sc
|
||||
additionalVolumes:
|
||||
- name: data
|
||||
mountPath: /home/postgres/pgdata/partitions
|
||||
targetContainers:
|
||||
- postgres
|
||||
volumeSource:
|
||||
PersistentVolumeClaim:
|
||||
claimName: pvc-postgresql-data-partitions
|
||||
readyOnly: false
|
||||
- name: conf
|
||||
mountPath: /etc/telegraf
|
||||
subPath: telegraf.conf
|
||||
targetContainers:
|
||||
- telegraf-sidecar
|
||||
volumeSource:
|
||||
configMap:
|
||||
name: my-config-map
|
||||
- name: empty
|
||||
mountPath: /opt/empty
|
||||
targetContainers:
|
||||
- all
|
||||
volumeSource:
|
||||
emptyDir: {}
|
||||
|
||||
enableShmVolume: true
|
||||
# spiloFSGroup: 103
|
||||
|
|
@ -78,6 +101,8 @@ spec:
|
|||
ttl: 30
|
||||
loop_wait: &loop_wait 10
|
||||
retry_timeout: 10
|
||||
synchronous_mode: false
|
||||
synchronous_mode_strict: false
|
||||
maximum_lag_on_failover: 33554432
|
||||
|
||||
# restore a Postgres DB with point-in-time-recovery
|
||||
|
|
@ -134,5 +159,10 @@ spec:
|
|||
certificateFile: "tls.crt"
|
||||
privateKeyFile: "tls.key"
|
||||
caFile: "" # optionally configure Postgres with a CA certificate
|
||||
caSecretName: "" # optionally the ca.crt can come from this secret instead.
|
||||
# file names can be also defined with absolute path, and will no longer be relative
|
||||
# to the "/tls/" path where the secret is being mounted by default, and "/tlsca/"
|
||||
# where the caSecret is mounted by default.
|
||||
# When TLS is enabled, also set spiloFSGroup parameter above to the relevant value.
|
||||
# if unknown, set it to 103 which is the usual value in the default spilo images.
|
||||
# In Openshift, there is no need to set spiloFSGroup/spilo_fsgroup.
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ data:
|
|||
# connection_pooler_default_cpu_request: "500m"
|
||||
# connection_pooler_default_memory_limit: 100Mi
|
||||
# connection_pooler_default_memory_request: 100Mi
|
||||
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5"
|
||||
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-6"
|
||||
# connection_pooler_max_db_connections: 60
|
||||
# connection_pooler_mode: "transaction"
|
||||
# connection_pooler_number_of_instances: 2
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ configuration:
|
|||
connection_pooler_default_cpu_request: "500m"
|
||||
connection_pooler_default_memory_limit: 100Mi
|
||||
connection_pooler_default_memory_request: 100Mi
|
||||
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5"
|
||||
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-6"
|
||||
# connection_pooler_max_db_connections: 60
|
||||
connection_pooler_mode: "transaction"
|
||||
connection_pooler_number_of_instances: 2
|
||||
|
|
|
|||
|
|
@ -38,6 +38,28 @@ spec:
|
|||
- teamId
|
||||
- postgresql
|
||||
properties:
|
||||
additionalVolumes:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
required:
|
||||
- name
|
||||
- mountPath
|
||||
- volumeSource
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
mountPath:
|
||||
type: string
|
||||
targetContainers:
|
||||
type: array
|
||||
nullable: true
|
||||
items:
|
||||
type: string
|
||||
volumeSource:
|
||||
type: object
|
||||
subPath:
|
||||
type: string
|
||||
allowedSourceRanges:
|
||||
type: array
|
||||
nullable: true
|
||||
|
|
@ -184,6 +206,10 @@ spec:
|
|||
type: integer
|
||||
maximum_lag_on_failover:
|
||||
type: integer
|
||||
synchronous_mode:
|
||||
type: boolean
|
||||
synchronous_mode_strict:
|
||||
type: boolean
|
||||
podAnnotations:
|
||||
type: object
|
||||
additionalProperties:
|
||||
|
|
@ -335,6 +361,8 @@ spec:
|
|||
type: string
|
||||
caFile:
|
||||
type: string
|
||||
caSecretName:
|
||||
type: string
|
||||
tolerations:
|
||||
type: array
|
||||
items:
|
||||
|
|
|
|||
|
|
@ -358,6 +358,12 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
|
|||
"maximum_lag_on_failover": {
|
||||
Type: "integer",
|
||||
},
|
||||
"synchronous_mode": {
|
||||
Type: "boolean",
|
||||
},
|
||||
"synchronous_mode_strict": {
|
||||
Type: "boolean",
|
||||
},
|
||||
},
|
||||
},
|
||||
"podAnnotations": {
|
||||
|
|
@ -544,6 +550,9 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
|
|||
"caFile": {
|
||||
Type: "string",
|
||||
},
|
||||
"caSecretName": {
|
||||
Type: "string",
|
||||
},
|
||||
},
|
||||
},
|
||||
"tolerations": {
|
||||
|
|
@ -713,6 +722,37 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
|
|||
},
|
||||
},
|
||||
},
|
||||
"additionalVolumes": {
|
||||
Type: "array",
|
||||
Items: &apiextv1beta1.JSONSchemaPropsOrArray{
|
||||
Schema: &apiextv1beta1.JSONSchemaProps{
|
||||
Type: "object",
|
||||
Required: []string{"name", "mountPath", "volumeSource"},
|
||||
Properties: map[string]apiextv1beta1.JSONSchemaProps{
|
||||
"name": {
|
||||
Type: "string",
|
||||
},
|
||||
"mountPath": {
|
||||
Type: "string",
|
||||
},
|
||||
"targetContainers": {
|
||||
Type: "array",
|
||||
Items: &apiextv1beta1.JSONSchemaPropsOrArray{
|
||||
Schema: &apiextv1beta1.JSONSchemaProps{
|
||||
Type: "string",
|
||||
},
|
||||
},
|
||||
},
|
||||
"volumeSource": {
|
||||
Type: "object",
|
||||
},
|
||||
"subPath": {
|
||||
Type: "string",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"status": {
|
||||
|
|
|
|||
|
|
@ -68,6 +68,7 @@ type PostgresSpec struct {
|
|||
PodAnnotations map[string]string `json:"podAnnotations"`
|
||||
ServiceAnnotations map[string]string `json:"serviceAnnotations"`
|
||||
TLS *TLSDescription `json:"tls"`
|
||||
AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"`
|
||||
|
||||
// deprecated json tags
|
||||
InitContainersOld []v1.Container `json:"init_containers,omitempty"`
|
||||
|
|
@ -112,6 +113,14 @@ type Volume struct {
|
|||
SubPath string `json:"subPath,omitempty"`
|
||||
}
|
||||
|
||||
type AdditionalVolume struct {
|
||||
Name string `json:"name"`
|
||||
MountPath string `json:"mountPath"`
|
||||
SubPath string `json:"subPath"`
|
||||
TargetContainers []string `json:"targetContainers"`
|
||||
VolumeSource v1.VolumeSource `json:"volume"`
|
||||
}
|
||||
|
||||
// PostgresqlParam describes PostgreSQL version and pairs of configuration parameter name - values.
|
||||
type PostgresqlParam struct {
|
||||
PgVersion string `json:"version"`
|
||||
|
|
@ -139,6 +148,8 @@ type Patroni struct {
|
|||
RetryTimeout uint32 `json:"retry_timeout"`
|
||||
MaximumLagOnFailover float32 `json:"maximum_lag_on_failover"` // float32 because https://github.com/kubernetes/kubernetes/issues/30213
|
||||
Slots map[string]map[string]string `json:"slots"`
|
||||
SynchronousMode bool `json:"synchronous_mode"`
|
||||
SynchronousModeStrict bool `json:"synchronous_mode_strict"`
|
||||
}
|
||||
|
||||
//StandbyCluster
|
||||
|
|
@ -151,6 +162,7 @@ type TLSDescription struct {
|
|||
CertificateFile string `json:"certificateFile,omitempty"`
|
||||
PrivateKeyFile string `json:"privateKeyFile,omitempty"`
|
||||
CAFile string `json:"caFile,omitempty"`
|
||||
CASecretName string `json:"caSecretName,omitempty"`
|
||||
}
|
||||
|
||||
// CloneDescription describes which cluster the new should clone and up to which point in time
|
||||
|
|
|
|||
|
|
@ -47,6 +47,28 @@ func (in *AWSGCPConfiguration) DeepCopy() *AWSGCPConfiguration {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *AdditionalVolume) DeepCopyInto(out *AdditionalVolume) {
|
||||
*out = *in
|
||||
if in.TargetContainers != nil {
|
||||
in, out := &in.TargetContainers, &out.TargetContainers
|
||||
*out = make([]string, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
in.VolumeSource.DeepCopyInto(&out.VolumeSource)
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdditionalVolume.
|
||||
func (in *AdditionalVolume) DeepCopy() *AdditionalVolume {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(AdditionalVolume)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *CloneDescription) DeepCopyInto(out *CloneDescription) {
|
||||
*out = *in
|
||||
|
|
@ -598,6 +620,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) {
|
|||
*out = new(TLSDescription)
|
||||
**out = **in
|
||||
}
|
||||
if in.AdditionalVolumes != nil {
|
||||
in, out := &in.AdditionalVolumes, &out.AdditionalVolumes
|
||||
*out = make([]AdditionalVolume, len(*in))
|
||||
for i := range *in {
|
||||
(*in)[i].DeepCopyInto(&(*out)[i])
|
||||
}
|
||||
}
|
||||
if in.InitContainersOld != nil {
|
||||
in, out := &in.InitContainersOld, &out.InitContainersOld
|
||||
*out = make([]corev1.Container, len(*in))
|
||||
|
|
|
|||
|
|
@ -757,7 +757,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
}
|
||||
|
||||
// sync connection pooler
|
||||
if err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil {
|
||||
if _, err := c.syncConnectionPooler(oldSpec, newSpec,
|
||||
c.installLookupFunction); err != nil {
|
||||
return fmt.Errorf("could not sync connection pooler: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -799,9 +800,11 @@ func (c *Cluster) Delete() {
|
|||
|
||||
for _, role := range []PostgresRole{Master, Replica} {
|
||||
|
||||
if !c.patroniKubernetesUseConfigMaps() {
|
||||
if err := c.deleteEndpoint(role); err != nil {
|
||||
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.deleteService(role); err != nil {
|
||||
c.logger.Warningf("could not delete %s service: %v", role, err)
|
||||
|
|
@ -893,9 +896,20 @@ func (c *Cluster) initSystemUsers() {
|
|||
c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{}
|
||||
}
|
||||
|
||||
username := util.Coalesce(
|
||||
// Using superuser as pooler user is not a good idea. First of all it's
|
||||
// not going to be synced correctly with the current implementation,
|
||||
// and second it's a bad practice.
|
||||
username := c.OpConfig.ConnectionPooler.User
|
||||
|
||||
isSuperUser := c.Spec.ConnectionPooler.User == c.OpConfig.SuperUsername
|
||||
isProtectedUser := c.shouldAvoidProtectedOrSystemRole(
|
||||
c.Spec.ConnectionPooler.User, "connection pool role")
|
||||
|
||||
if !isSuperUser && !isProtectedUser {
|
||||
username = util.Coalesce(
|
||||
c.Spec.ConnectionPooler.User,
|
||||
c.OpConfig.ConnectionPooler.User)
|
||||
}
|
||||
|
||||
// connection pooler application should be able to login with this role
|
||||
connectionPoolerUser := spec.PgUser{
|
||||
|
|
@ -1261,11 +1275,19 @@ type clusterObjectDelete func(name string) error
|
|||
|
||||
func (c *Cluster) deletePatroniClusterObjects() error {
|
||||
// TODO: figure out how to remove leftover patroni objects in other cases
|
||||
var actionsList []simpleActionWithResult
|
||||
|
||||
if !c.patroniUsesKubernetes() {
|
||||
c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete")
|
||||
}
|
||||
c.logger.Debugf("removing leftover Patroni objects (endpoints, services and configmaps)")
|
||||
for _, deleter := range []simpleActionWithResult{c.deletePatroniClusterEndpoints, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps} {
|
||||
|
||||
if !c.patroniKubernetesUseConfigMaps() {
|
||||
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
|
||||
}
|
||||
actionsList = append(actionsList, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps)
|
||||
|
||||
c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)")
|
||||
for _, deleter := range actionsList {
|
||||
if err := deleter(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -721,4 +721,38 @@ func TestInitSystemUsers(t *testing.T) {
|
|||
if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; !exist {
|
||||
t.Errorf("%s, connection pooler user is not present", testName)
|
||||
}
|
||||
|
||||
// superuser is not allowed as connection pool user
|
||||
cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{
|
||||
User: "postgres",
|
||||
}
|
||||
cl.OpConfig.SuperUsername = "postgres"
|
||||
cl.OpConfig.ConnectionPooler.User = "pooler"
|
||||
|
||||
cl.initSystemUsers()
|
||||
if _, exist := cl.pgUsers["pooler"]; !exist {
|
||||
t.Errorf("%s, Superuser is not allowed to be a connection pool user", testName)
|
||||
}
|
||||
|
||||
// neither protected users are
|
||||
delete(cl.pgUsers, "pooler")
|
||||
cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{
|
||||
User: "admin",
|
||||
}
|
||||
cl.OpConfig.ProtectedRoles = []string{"admin"}
|
||||
|
||||
cl.initSystemUsers()
|
||||
if _, exist := cl.pgUsers["pooler"]; !exist {
|
||||
t.Errorf("%s, Protected user are not allowed to be a connection pool user", testName)
|
||||
}
|
||||
|
||||
delete(cl.pgUsers, "pooler")
|
||||
cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{
|
||||
User: "standby",
|
||||
}
|
||||
|
||||
cl.initSystemUsers()
|
||||
if _, exist := cl.pgUsers["pooler"]; !exist {
|
||||
t.Errorf("%s, System users are not allowed to be a connection pool user", testName)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,6 +49,8 @@ type patroniDCS struct {
|
|||
LoopWait uint32 `json:"loop_wait,omitempty"`
|
||||
RetryTimeout uint32 `json:"retry_timeout,omitempty"`
|
||||
MaximumLagOnFailover float32 `json:"maximum_lag_on_failover,omitempty"`
|
||||
SynchronousMode bool `json:"synchronous_mode,omitempty"`
|
||||
SynchronousModeStrict bool `json:"synchronous_mode_strict,omitempty"`
|
||||
PGBootstrapConfiguration map[string]interface{} `json:"postgresql,omitempty"`
|
||||
Slots map[string]map[string]string `json:"slots,omitempty"`
|
||||
}
|
||||
|
|
@ -283,6 +285,12 @@ PatroniInitDBParams:
|
|||
if patroni.Slots != nil {
|
||||
config.Bootstrap.DCS.Slots = patroni.Slots
|
||||
}
|
||||
if patroni.SynchronousMode {
|
||||
config.Bootstrap.DCS.SynchronousMode = patroni.SynchronousMode
|
||||
}
|
||||
if patroni.SynchronousModeStrict != false {
|
||||
config.Bootstrap.DCS.SynchronousModeStrict = patroni.SynchronousModeStrict
|
||||
}
|
||||
|
||||
config.PgLocalConfiguration = make(map[string]interface{})
|
||||
config.PgLocalConfiguration[patroniPGBinariesParameterName] = fmt.Sprintf(pgBinariesLocationTemplate, pg.PgVersion)
|
||||
|
|
@ -492,7 +500,7 @@ func mountShmVolumeNeeded(opConfig config.Config, spec *acidv1.PostgresSpec) *bo
|
|||
return opConfig.ShmVolume
|
||||
}
|
||||
|
||||
func generatePodTemplate(
|
||||
func (c *Cluster) generatePodTemplate(
|
||||
namespace string,
|
||||
labels labels.Set,
|
||||
annotations map[string]string,
|
||||
|
|
@ -511,7 +519,7 @@ func generatePodTemplate(
|
|||
podAntiAffinityTopologyKey string,
|
||||
additionalSecretMount string,
|
||||
additionalSecretMountPath string,
|
||||
volumes []v1.Volume,
|
||||
additionalVolumes []acidv1.AdditionalVolume,
|
||||
) (*v1.PodTemplateSpec, error) {
|
||||
|
||||
terminateGracePeriodSeconds := terminateGracePeriod
|
||||
|
|
@ -530,7 +538,6 @@ func generatePodTemplate(
|
|||
InitContainers: initContainers,
|
||||
Tolerations: *tolerationsSpec,
|
||||
SecurityContext: &securityContext,
|
||||
Volumes: volumes,
|
||||
}
|
||||
|
||||
if shmVolume != nil && *shmVolume {
|
||||
|
|
@ -551,6 +558,10 @@ func generatePodTemplate(
|
|||
addSecretVolume(&podSpec, additionalSecretMount, additionalSecretMountPath)
|
||||
}
|
||||
|
||||
if additionalVolumes != nil {
|
||||
c.addAdditionalVolumes(&podSpec, additionalVolumes)
|
||||
}
|
||||
|
||||
template := v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: labels,
|
||||
|
|
@ -841,7 +852,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
|
|||
sidecarContainers []v1.Container
|
||||
podTemplate *v1.PodTemplateSpec
|
||||
volumeClaimTemplate *v1.PersistentVolumeClaim
|
||||
volumes []v1.Volume
|
||||
additionalVolumes = spec.AdditionalVolumes
|
||||
)
|
||||
|
||||
// Improve me. Please.
|
||||
|
|
@ -994,8 +1005,10 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
|
|||
// this is combined with the FSGroup in the section above
|
||||
// to give read access to the postgres user
|
||||
defaultMode := int32(0640)
|
||||
volumes = append(volumes, v1.Volume{
|
||||
Name: "tls-secret",
|
||||
mountPath := "/tls"
|
||||
additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{
|
||||
Name: spec.TLS.SecretName,
|
||||
MountPath: mountPath,
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Secret: &v1.SecretVolumeSource{
|
||||
SecretName: spec.TLS.SecretName,
|
||||
|
|
@ -1004,13 +1017,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
|
|||
},
|
||||
})
|
||||
|
||||
mountPath := "/tls"
|
||||
volumeMounts = append(volumeMounts, v1.VolumeMount{
|
||||
MountPath: mountPath,
|
||||
Name: "tls-secret",
|
||||
ReadOnly: true,
|
||||
})
|
||||
|
||||
// use the same filenames as Secret resources by default
|
||||
certFile := ensurePath(spec.TLS.CertificateFile, mountPath, "tls.crt")
|
||||
privateKeyFile := ensurePath(spec.TLS.PrivateKeyFile, mountPath, "tls.key")
|
||||
|
|
@ -1021,11 +1027,31 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
|
|||
)
|
||||
|
||||
if spec.TLS.CAFile != "" {
|
||||
caFile := ensurePath(spec.TLS.CAFile, mountPath, "")
|
||||
// support scenario when the ca.crt resides in a different secret, diff path
|
||||
mountPathCA := mountPath
|
||||
if spec.TLS.CASecretName != "" {
|
||||
mountPathCA = mountPath + "ca"
|
||||
}
|
||||
|
||||
caFile := ensurePath(spec.TLS.CAFile, mountPathCA, "")
|
||||
spiloEnvVars = append(
|
||||
spiloEnvVars,
|
||||
v1.EnvVar{Name: "SSL_CA_FILE", Value: caFile},
|
||||
)
|
||||
|
||||
// the ca file from CASecretName secret takes priority
|
||||
if spec.TLS.CASecretName != "" {
|
||||
additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{
|
||||
Name: spec.TLS.CASecretName,
|
||||
MountPath: mountPathCA,
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Secret: &v1.SecretVolumeSource{
|
||||
SecretName: spec.TLS.CASecretName,
|
||||
DefaultMode: &defaultMode,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1076,7 +1102,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
|
|||
annotations := c.generatePodAnnotations(spec)
|
||||
|
||||
// generate pod template for the statefulset, based on the spilo container and sidecars
|
||||
podTemplate, err = generatePodTemplate(
|
||||
podTemplate, err = c.generatePodTemplate(
|
||||
c.Namespace,
|
||||
c.labelsSet(true),
|
||||
annotations,
|
||||
|
|
@ -1095,8 +1121,8 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
|
|||
c.OpConfig.PodAntiAffinityTopologyKey,
|
||||
c.OpConfig.AdditionalSecretMount,
|
||||
c.OpConfig.AdditionalSecretMountPath,
|
||||
volumes,
|
||||
)
|
||||
additionalVolumes)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate pod template: %v", err)
|
||||
}
|
||||
|
|
@ -1290,6 +1316,69 @@ func addSecretVolume(podSpec *v1.PodSpec, additionalSecretMount string, addition
|
|||
podSpec.Volumes = volumes
|
||||
}
|
||||
|
||||
func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec,
|
||||
additionalVolumes []acidv1.AdditionalVolume) {
|
||||
|
||||
volumes := podSpec.Volumes
|
||||
mountPaths := map[string]acidv1.AdditionalVolume{}
|
||||
for i, v := range additionalVolumes {
|
||||
if previousVolume, exist := mountPaths[v.MountPath]; exist {
|
||||
msg := "Volume %+v cannot be mounted to the same path as %+v"
|
||||
c.logger.Warningf(msg, v, previousVolume)
|
||||
continue
|
||||
}
|
||||
|
||||
if v.MountPath == constants.PostgresDataMount {
|
||||
msg := "Cannot mount volume on postgresql data directory, %+v"
|
||||
c.logger.Warningf(msg, v)
|
||||
continue
|
||||
}
|
||||
|
||||
if v.TargetContainers == nil {
|
||||
spiloContainer := podSpec.Containers[0]
|
||||
additionalVolumes[i].TargetContainers = []string{spiloContainer.Name}
|
||||
}
|
||||
|
||||
for _, target := range v.TargetContainers {
|
||||
if target == "all" && len(v.TargetContainers) != 1 {
|
||||
msg := `Target containers could be either "all" or a list
|
||||
of containers, mixing those is not allowed, %+v`
|
||||
c.logger.Warningf(msg, v)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
volumes = append(volumes,
|
||||
v1.Volume{
|
||||
Name: v.Name,
|
||||
VolumeSource: v.VolumeSource,
|
||||
},
|
||||
)
|
||||
|
||||
mountPaths[v.MountPath] = v
|
||||
}
|
||||
|
||||
c.logger.Infof("Mount additional volumes: %+v", additionalVolumes)
|
||||
|
||||
for i := range podSpec.Containers {
|
||||
mounts := podSpec.Containers[i].VolumeMounts
|
||||
for _, v := range additionalVolumes {
|
||||
for _, target := range v.TargetContainers {
|
||||
if podSpec.Containers[i].Name == target || target == "all" {
|
||||
mounts = append(mounts, v1.VolumeMount{
|
||||
Name: v.Name,
|
||||
MountPath: v.MountPath,
|
||||
SubPath: v.SubPath,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
podSpec.Containers[i].VolumeMounts = mounts
|
||||
}
|
||||
|
||||
podSpec.Volumes = volumes
|
||||
}
|
||||
|
||||
func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
|
||||
|
||||
var storageClassName *string
|
||||
|
|
@ -1544,11 +1633,11 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription)
|
|||
c.logger.Info(msg, description.S3WalPath)
|
||||
|
||||
envs := []v1.EnvVar{
|
||||
v1.EnvVar{
|
||||
{
|
||||
Name: "CLONE_WAL_S3_BUCKET",
|
||||
Value: c.OpConfig.WALES3Bucket,
|
||||
},
|
||||
v1.EnvVar{
|
||||
{
|
||||
Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX",
|
||||
Value: getBucketScopeSuffix(description.UID),
|
||||
},
|
||||
|
|
@ -1701,7 +1790,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
|
|||
annotations := c.generatePodAnnotations(&c.Spec)
|
||||
|
||||
// re-use the method that generates DB pod templates
|
||||
if podTemplate, err = generatePodTemplate(
|
||||
if podTemplate, err = c.generatePodTemplate(
|
||||
c.Namespace,
|
||||
labels,
|
||||
annotations,
|
||||
|
|
@ -1720,7 +1809,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
|
|||
"",
|
||||
c.OpConfig.AdditionalSecretMount,
|
||||
c.OpConfig.AdditionalSecretMountPath,
|
||||
nil); err != nil {
|
||||
[]acidv1.AdditionalVolume{}); err != nil {
|
||||
return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -70,11 +70,13 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) {
|
|||
LoopWait: 10,
|
||||
RetryTimeout: 10,
|
||||
MaximumLagOnFailover: 33554432,
|
||||
SynchronousMode: true,
|
||||
SynchronousModeStrict: true,
|
||||
Slots: map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "pgoutput"}},
|
||||
},
|
||||
role: "zalandos",
|
||||
opConfig: config.Config{},
|
||||
result: `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}}}}`,
|
||||
result: `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"synchronous_mode":true,"synchronous_mode_strict":true,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}}}}`,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
|
@ -959,6 +961,7 @@ func TestTLS(t *testing.T) {
|
|||
var spec acidv1.PostgresSpec
|
||||
var cluster *Cluster
|
||||
var spiloFSGroup = int64(103)
|
||||
var additionalVolumes = spec.AdditionalVolumes
|
||||
|
||||
makeSpec := func(tls acidv1.TLSDescription) acidv1.PostgresSpec {
|
||||
return acidv1.PostgresSpec{
|
||||
|
|
@ -998,8 +1001,20 @@ func TestTLS(t *testing.T) {
|
|||
assert.Equal(t, &fsGroup, s.Spec.Template.Spec.SecurityContext.FSGroup, "has a default FSGroup assigned")
|
||||
|
||||
defaultMode := int32(0640)
|
||||
mountPath := "/tls"
|
||||
additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{
|
||||
Name: spec.TLS.SecretName,
|
||||
MountPath: mountPath,
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Secret: &v1.SecretVolumeSource{
|
||||
SecretName: spec.TLS.SecretName,
|
||||
DefaultMode: &defaultMode,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
volume := v1.Volume{
|
||||
Name: "tls-secret",
|
||||
Name: "my-secret",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Secret: &v1.SecretVolumeSource{
|
||||
SecretName: "my-secret",
|
||||
|
|
@ -1011,11 +1026,179 @@ func TestTLS(t *testing.T) {
|
|||
|
||||
assert.Contains(t, s.Spec.Template.Spec.Containers[0].VolumeMounts, v1.VolumeMount{
|
||||
MountPath: "/tls",
|
||||
Name: "tls-secret",
|
||||
ReadOnly: true,
|
||||
Name: "my-secret",
|
||||
}, "the volume gets mounted in /tls")
|
||||
|
||||
assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CERTIFICATE_FILE", Value: "/tls/tls.crt"})
|
||||
assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_PRIVATE_KEY_FILE", Value: "/tls/tls.key"})
|
||||
assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CA_FILE", Value: "/tls/ca.crt"})
|
||||
}
|
||||
|
||||
func TestAdditionalVolume(t *testing.T) {
|
||||
testName := "TestAdditionalVolume"
|
||||
tests := []struct {
|
||||
subTest string
|
||||
podSpec *v1.PodSpec
|
||||
volumePos int
|
||||
}{
|
||||
{
|
||||
subTest: "empty PodSpec",
|
||||
podSpec: &v1.PodSpec{
|
||||
Volumes: []v1.Volume{},
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
VolumeMounts: []v1.VolumeMount{},
|
||||
},
|
||||
},
|
||||
},
|
||||
volumePos: 0,
|
||||
},
|
||||
{
|
||||
subTest: "non empty PodSpec",
|
||||
podSpec: &v1.PodSpec{
|
||||
Volumes: []v1.Volume{{}},
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "postgres",
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "data",
|
||||
ReadOnly: false,
|
||||
MountPath: "/data",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
volumePos: 1,
|
||||
},
|
||||
{
|
||||
subTest: "non empty PodSpec with sidecar",
|
||||
podSpec: &v1.PodSpec{
|
||||
Volumes: []v1.Volume{{}},
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "postgres",
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "data",
|
||||
ReadOnly: false,
|
||||
MountPath: "/data",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "sidecar",
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "data",
|
||||
ReadOnly: false,
|
||||
MountPath: "/data",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
volumePos: 1,
|
||||
},
|
||||
}
|
||||
|
||||
var cluster = New(
|
||||
Config{
|
||||
OpConfig: config.Config{
|
||||
ProtectedRoles: []string{"admin"},
|
||||
Auth: config.Auth{
|
||||
SuperUsername: superUserName,
|
||||
ReplicationUsername: replicationUserName,
|
||||
},
|
||||
},
|
||||
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
|
||||
|
||||
for _, tt := range tests {
|
||||
// Test with additional volume mounted in all containers
|
||||
additionalVolumeMount := []acidv1.AdditionalVolume{
|
||||
{
|
||||
Name: "test",
|
||||
MountPath: "/test",
|
||||
TargetContainers: []string{"all"},
|
||||
VolumeSource: v1.VolumeSource{
|
||||
EmptyDir: &v1.EmptyDirVolumeSource{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
numMounts := len(tt.podSpec.Containers[0].VolumeMounts)
|
||||
|
||||
cluster.addAdditionalVolumes(tt.podSpec, additionalVolumeMount)
|
||||
volumeName := tt.podSpec.Volumes[tt.volumePos].Name
|
||||
|
||||
if volumeName != additionalVolumeMount[0].Name {
|
||||
t.Errorf("%s %s: Expected volume %v was not created, have %s instead",
|
||||
testName, tt.subTest, additionalVolumeMount, volumeName)
|
||||
}
|
||||
|
||||
for i := range tt.podSpec.Containers {
|
||||
volumeMountName := tt.podSpec.Containers[i].VolumeMounts[tt.volumePos].Name
|
||||
|
||||
if volumeMountName != additionalVolumeMount[0].Name {
|
||||
t.Errorf("%s %s: Expected mount %v was not created, have %s instead",
|
||||
testName, tt.subTest, additionalVolumeMount, volumeMountName)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
numMountsCheck := len(tt.podSpec.Containers[0].VolumeMounts)
|
||||
|
||||
if numMountsCheck != numMounts+1 {
|
||||
t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v",
|
||||
numMountsCheck, numMounts+1)
|
||||
}
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
// Test with additional volume mounted only in first container
|
||||
additionalVolumeMount := []acidv1.AdditionalVolume{
|
||||
{
|
||||
Name: "test",
|
||||
MountPath: "/test",
|
||||
TargetContainers: []string{"postgres"},
|
||||
VolumeSource: v1.VolumeSource{
|
||||
EmptyDir: &v1.EmptyDirVolumeSource{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
numMounts := len(tt.podSpec.Containers[0].VolumeMounts)
|
||||
|
||||
cluster.addAdditionalVolumes(tt.podSpec, additionalVolumeMount)
|
||||
volumeName := tt.podSpec.Volumes[tt.volumePos].Name
|
||||
|
||||
if volumeName != additionalVolumeMount[0].Name {
|
||||
t.Errorf("%s %s: Expected volume %v was not created, have %s instead",
|
||||
testName, tt.subTest, additionalVolumeMount, volumeName)
|
||||
}
|
||||
|
||||
for _, container := range tt.podSpec.Containers {
|
||||
if container.Name == "postgres" {
|
||||
volumeMountName := container.VolumeMounts[tt.volumePos].Name
|
||||
|
||||
if volumeMountName != additionalVolumeMount[0].Name {
|
||||
t.Errorf("%s %s: Expected mount %v was not created, have %s instead",
|
||||
testName, tt.subTest, additionalVolumeMount, volumeMountName)
|
||||
}
|
||||
|
||||
numMountsCheck := len(container.VolumeMounts)
|
||||
if numMountsCheck != numMounts+1 {
|
||||
t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v",
|
||||
numMountsCheck, numMounts+1)
|
||||
}
|
||||
} else {
|
||||
numMountsCheck := len(container.VolumeMounts)
|
||||
if numMountsCheck == numMounts+1 {
|
||||
t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v",
|
||||
numMountsCheck, numMounts)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -294,6 +294,27 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
|||
return pod, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
|
||||
|
||||
/*
|
||||
Operator should not re-create pods if there is at least one replica being bootstrapped
|
||||
because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag).
|
||||
|
||||
XXX operator cannot forbid replica re-init, so we might still fail if re-init is started
|
||||
after this check succeeds but before a pod is re-created
|
||||
*/
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
state, err := c.patroni.GetPatroniMemberState(&pod)
|
||||
if err != nil || state == "creating replica" {
|
||||
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Cluster) recreatePods() error {
|
||||
c.setProcessName("starting to recreate pods")
|
||||
ls := c.labelsSet(false)
|
||||
|
|
@ -309,6 +330,10 @@ func (c *Cluster) recreatePods() error {
|
|||
}
|
||||
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
|
||||
|
||||
if !c.isSafeToRecreatePods(pods) {
|
||||
return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initilalized")
|
||||
}
|
||||
|
||||
var (
|
||||
masterPod, newMasterPod, newPod *v1.Pod
|
||||
)
|
||||
|
|
|
|||
|
|
@ -105,7 +105,12 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
|
|||
var msg string
|
||||
c.setProcessName("creating connection pooler")
|
||||
|
||||
if c.ConnectionPooler == nil {
|
||||
c.ConnectionPooler = &ConnectionPoolerObjects{}
|
||||
}
|
||||
|
||||
schema := c.Spec.ConnectionPooler.Schema
|
||||
|
||||
if schema == "" {
|
||||
schema = c.OpConfig.ConnectionPooler.Schema
|
||||
}
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
|||
}
|
||||
|
||||
// sync connection pooler
|
||||
if err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil {
|
||||
if _, err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil {
|
||||
return fmt.Errorf("could not sync connection pooler: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -128,10 +128,11 @@ func (c *Cluster) syncServices() error {
|
|||
for _, role := range []PostgresRole{Master, Replica} {
|
||||
c.logger.Debugf("syncing %s service", role)
|
||||
|
||||
if !c.patroniKubernetesUseConfigMaps() {
|
||||
if err := c.syncEndpoint(role); err != nil {
|
||||
return fmt.Errorf("could not sync %s endpoint: %v", role, err)
|
||||
}
|
||||
|
||||
}
|
||||
if err := c.syncService(role); err != nil {
|
||||
return fmt.Errorf("could not sync %s service: %v", role, err)
|
||||
}
|
||||
|
|
@ -738,7 +739,13 @@ func (c *Cluster) syncLogicalBackupJob() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error {
|
||||
func (c *Cluster) syncConnectionPooler(oldSpec,
|
||||
newSpec *acidv1.Postgresql,
|
||||
lookup InstallFunction) (SyncReason, error) {
|
||||
|
||||
var reason SyncReason
|
||||
var err error
|
||||
|
||||
if c.ConnectionPooler == nil {
|
||||
c.ConnectionPooler = &ConnectionPoolerObjects{}
|
||||
}
|
||||
|
|
@ -775,20 +782,20 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, look
|
|||
specUser,
|
||||
c.OpConfig.ConnectionPooler.User)
|
||||
|
||||
if err := lookup(schema, user); err != nil {
|
||||
return err
|
||||
if err = lookup(schema, user); err != nil {
|
||||
return NoSync, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {
|
||||
if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {
|
||||
c.logger.Errorf("could not sync connection pooler: %v", err)
|
||||
return err
|
||||
return reason, err
|
||||
}
|
||||
}
|
||||
|
||||
if oldNeedConnectionPooler && !newNeedConnectionPooler {
|
||||
// delete and cleanup resources
|
||||
if err := c.deleteConnectionPooler(); err != nil {
|
||||
if err = c.deleteConnectionPooler(); err != nil {
|
||||
c.logger.Warningf("could not remove connection pooler: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -799,20 +806,22 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, look
|
|||
(c.ConnectionPooler.Deployment != nil ||
|
||||
c.ConnectionPooler.Service != nil) {
|
||||
|
||||
if err := c.deleteConnectionPooler(); err != nil {
|
||||
if err = c.deleteConnectionPooler(); err != nil {
|
||||
c.logger.Warningf("could not remove connection pooler: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return reason, nil
|
||||
}
|
||||
|
||||
// Synchronize connection pooler resources. Effectively we're interested only in
|
||||
// synchronizing the corresponding deployment, but in case of deployment or
|
||||
// service is missing, create it. After checking, also remember an object for
|
||||
// the future references.
|
||||
func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||
func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) (
|
||||
SyncReason, error) {
|
||||
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(c.Namespace).
|
||||
Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{})
|
||||
|
|
@ -824,7 +833,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
|
|||
deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
|
||||
if err != nil {
|
||||
msg = "could not generate deployment for connection pooler: %v"
|
||||
return fmt.Errorf(msg, err)
|
||||
return NoSync, fmt.Errorf(msg, err)
|
||||
}
|
||||
|
||||
deployment, err := c.KubeClient.
|
||||
|
|
@ -832,18 +841,35 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
|
|||
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return NoSync, err
|
||||
}
|
||||
|
||||
c.ConnectionPooler.Deployment = deployment
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("could not get connection pooler deployment to sync: %v", err)
|
||||
msg := "could not get connection pooler deployment to sync: %v"
|
||||
return NoSync, fmt.Errorf(msg, err)
|
||||
} else {
|
||||
c.ConnectionPooler.Deployment = deployment
|
||||
|
||||
// actual synchronization
|
||||
oldConnectionPooler := oldSpec.Spec.ConnectionPooler
|
||||
newConnectionPooler := newSpec.Spec.ConnectionPooler
|
||||
|
||||
// sync implementation below assumes that both old and new specs are
|
||||
// not nil, but it can happen. To avoid any confusion like updating a
|
||||
// deployment because the specification changed from nil to an empty
|
||||
// struct (that was initialized somewhere before) replace any nil with
|
||||
// an empty spec.
|
||||
if oldConnectionPooler == nil {
|
||||
oldConnectionPooler = &acidv1.ConnectionPooler{}
|
||||
}
|
||||
|
||||
if newConnectionPooler == nil {
|
||||
newConnectionPooler = &acidv1.ConnectionPooler{}
|
||||
}
|
||||
|
||||
c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler)
|
||||
|
||||
specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler)
|
||||
defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment)
|
||||
reason := append(specReason, defaultsReason...)
|
||||
|
|
@ -854,7 +880,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
|
|||
newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
|
||||
if err != nil {
|
||||
msg := "could not generate deployment for connection pooler: %v"
|
||||
return fmt.Errorf(msg, err)
|
||||
return reason, fmt.Errorf(msg, err)
|
||||
}
|
||||
|
||||
oldDeploymentSpec := c.ConnectionPooler.Deployment
|
||||
|
|
@ -864,11 +890,11 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
|
|||
newDeploymentSpec)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return reason, err
|
||||
}
|
||||
|
||||
c.ConnectionPooler.Deployment = deployment
|
||||
return nil
|
||||
return reason, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -886,16 +912,17 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
|
|||
Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return NoSync, err
|
||||
}
|
||||
|
||||
c.ConnectionPooler.Service = service
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("could not get connection pooler service to sync: %v", err)
|
||||
msg := "could not get connection pooler service to sync: %v"
|
||||
return NoSync, fmt.Errorf(msg, err)
|
||||
} else {
|
||||
// Service updates are not supported and probably not that useful anyway
|
||||
c.ConnectionPooler.Service = service
|
||||
}
|
||||
|
||||
return nil
|
||||
return NoSync, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package cluster
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||
|
|
@ -17,7 +18,7 @@ func int32ToPointer(value int32) *int32 {
|
|||
return &value
|
||||
}
|
||||
|
||||
func deploymentUpdated(cluster *Cluster, err error) error {
|
||||
func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error {
|
||||
if cluster.ConnectionPooler.Deployment.Spec.Replicas == nil ||
|
||||
*cluster.ConnectionPooler.Deployment.Spec.Replicas != 2 {
|
||||
return fmt.Errorf("Wrong nubmer of instances")
|
||||
|
|
@ -26,7 +27,7 @@ func deploymentUpdated(cluster *Cluster, err error) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func objectsAreSaved(cluster *Cluster, err error) error {
|
||||
func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error {
|
||||
if cluster.ConnectionPooler == nil {
|
||||
return fmt.Errorf("Connection pooler resources are empty")
|
||||
}
|
||||
|
|
@ -42,7 +43,7 @@ func objectsAreSaved(cluster *Cluster, err error) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func objectsAreDeleted(cluster *Cluster, err error) error {
|
||||
func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error {
|
||||
if cluster.ConnectionPooler != nil {
|
||||
return fmt.Errorf("Connection pooler was not deleted")
|
||||
}
|
||||
|
|
@ -50,6 +51,16 @@ func objectsAreDeleted(cluster *Cluster, err error) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func noEmptySync(cluster *Cluster, err error, reason SyncReason) error {
|
||||
for _, msg := range reason {
|
||||
if strings.HasPrefix(msg, "update [] from '<nil>' to '") {
|
||||
return fmt.Errorf("There is an empty reason, %s", msg)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestConnectionPoolerSynchronization(t *testing.T) {
|
||||
testName := "Test connection pooler synchronization"
|
||||
var cluster = New(
|
||||
|
|
@ -91,15 +102,15 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
|
|||
|
||||
clusterNewDefaultsMock := *cluster
|
||||
clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient()
|
||||
cluster.OpConfig.ConnectionPooler.Image = "pooler:2.0"
|
||||
cluster.OpConfig.ConnectionPooler.NumberOfInstances = int32ToPointer(2)
|
||||
|
||||
tests := []struct {
|
||||
subTest string
|
||||
oldSpec *acidv1.Postgresql
|
||||
newSpec *acidv1.Postgresql
|
||||
cluster *Cluster
|
||||
check func(cluster *Cluster, err error) error
|
||||
defaultImage string
|
||||
defaultInstances int32
|
||||
check func(cluster *Cluster, err error, reason SyncReason) error
|
||||
}{
|
||||
{
|
||||
subTest: "create if doesn't exist",
|
||||
|
|
@ -114,6 +125,8 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
|
|||
},
|
||||
},
|
||||
cluster: &clusterMissingObjects,
|
||||
defaultImage: "pooler:1.0",
|
||||
defaultInstances: 1,
|
||||
check: objectsAreSaved,
|
||||
},
|
||||
{
|
||||
|
|
@ -127,6 +140,8 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
|
|||
},
|
||||
},
|
||||
cluster: &clusterMissingObjects,
|
||||
defaultImage: "pooler:1.0",
|
||||
defaultInstances: 1,
|
||||
check: objectsAreSaved,
|
||||
},
|
||||
{
|
||||
|
|
@ -140,6 +155,8 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
|
|||
},
|
||||
},
|
||||
cluster: &clusterMissingObjects,
|
||||
defaultImage: "pooler:1.0",
|
||||
defaultInstances: 1,
|
||||
check: objectsAreSaved,
|
||||
},
|
||||
{
|
||||
|
|
@ -153,6 +170,8 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
|
|||
Spec: acidv1.PostgresSpec{},
|
||||
},
|
||||
cluster: &clusterMock,
|
||||
defaultImage: "pooler:1.0",
|
||||
defaultInstances: 1,
|
||||
check: objectsAreDeleted,
|
||||
},
|
||||
{
|
||||
|
|
@ -164,6 +183,8 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
|
|||
Spec: acidv1.PostgresSpec{},
|
||||
},
|
||||
cluster: &clusterDirtyMock,
|
||||
defaultImage: "pooler:1.0",
|
||||
defaultInstances: 1,
|
||||
check: objectsAreDeleted,
|
||||
},
|
||||
{
|
||||
|
|
@ -183,6 +204,8 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
|
|||
},
|
||||
},
|
||||
cluster: &clusterMock,
|
||||
defaultImage: "pooler:1.0",
|
||||
defaultInstances: 1,
|
||||
check: deploymentUpdated,
|
||||
},
|
||||
{
|
||||
|
|
@ -198,13 +221,39 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
|
|||
},
|
||||
},
|
||||
cluster: &clusterNewDefaultsMock,
|
||||
defaultImage: "pooler:2.0",
|
||||
defaultInstances: 2,
|
||||
check: deploymentUpdated,
|
||||
},
|
||||
{
|
||||
subTest: "there is no sync from nil to an empty spec",
|
||||
oldSpec: &acidv1.Postgresql{
|
||||
Spec: acidv1.PostgresSpec{
|
||||
EnableConnectionPooler: boolToPointer(true),
|
||||
ConnectionPooler: nil,
|
||||
},
|
||||
},
|
||||
newSpec: &acidv1.Postgresql{
|
||||
Spec: acidv1.PostgresSpec{
|
||||
EnableConnectionPooler: boolToPointer(true),
|
||||
ConnectionPooler: &acidv1.ConnectionPooler{},
|
||||
},
|
||||
},
|
||||
cluster: &clusterMock,
|
||||
defaultImage: "pooler:1.0",
|
||||
defaultInstances: 1,
|
||||
check: noEmptySync,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
err := tt.cluster.syncConnectionPooler(tt.oldSpec, tt.newSpec, mockInstallLookupFunction)
|
||||
tt.cluster.OpConfig.ConnectionPooler.Image = tt.defaultImage
|
||||
tt.cluster.OpConfig.ConnectionPooler.NumberOfInstances =
|
||||
int32ToPointer(tt.defaultInstances)
|
||||
|
||||
if err := tt.check(tt.cluster, err); err != nil {
|
||||
reason, err := tt.cluster.syncConnectionPooler(tt.oldSpec,
|
||||
tt.newSpec, mockInstallLookupFunction)
|
||||
|
||||
if err := tt.check(tt.cluster, err, reason); err != nil {
|
||||
t.Errorf("%s [%s]: Could not synchronize, %+v",
|
||||
testName, tt.subTest, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,3 +73,8 @@ type ClusterStatus struct {
|
|||
type TemplateParams map[string]interface{}
|
||||
|
||||
type InstallFunction func(schema string, user string) error
|
||||
|
||||
type SyncReason []string
|
||||
|
||||
// no sync happened, empty value
|
||||
var NoSync SyncReason = []string{}
|
||||
|
|
|
|||
|
|
@ -169,6 +169,11 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
|
|||
fromCRD.ConnectionPooler.User,
|
||||
constants.ConnectionPoolerUserName)
|
||||
|
||||
if result.ConnectionPooler.User == result.SuperUsername {
|
||||
msg := "Connection pool user is not allowed to be the same as super user, username: %s"
|
||||
panic(fmt.Errorf(msg, result.ConnectionPooler.User))
|
||||
}
|
||||
|
||||
result.ConnectionPooler.Image = util.Coalesce(
|
||||
fromCRD.ConnectionPooler.Image,
|
||||
"registry.opensource.zalan.do/acid/pgbouncer")
|
||||
|
|
|
|||
|
|
@ -218,5 +218,11 @@ func validate(cfg *Config) (err error) {
|
|||
msg := "number of connection pooler instances should be higher than %d"
|
||||
err = fmt.Errorf(msg, constants.ConnectionPoolerMinInstances)
|
||||
}
|
||||
|
||||
if cfg.ConnectionPooler.User == cfg.SuperUsername {
|
||||
msg := "Connection pool user is not allowed to be the same as super user, username: %s"
|
||||
err = fmt.Errorf(msg, cfg.ConnectionPooler.User)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package patroni
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
|
|
@ -11,7 +12,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -25,6 +26,7 @@ const (
|
|||
type Interface interface {
|
||||
Switchover(master *v1.Pod, candidate string) error
|
||||
SetPostgresParameters(server *v1.Pod, options map[string]string) error
|
||||
GetPatroniMemberState(pod *v1.Pod) (string, error)
|
||||
}
|
||||
|
||||
// Patroni API client
|
||||
|
|
@ -123,3 +125,36 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
|
|||
}
|
||||
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
|
||||
}
|
||||
|
||||
//GetPatroniMemberState returns a state of member of a Patroni cluster
|
||||
func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) {
|
||||
|
||||
apiURLString, err := apiURL(server)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
response, err := p.httpClient.Get(apiURLString)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not perform Get request: %v", err)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not read response: %v", err)
|
||||
}
|
||||
|
||||
data := make(map[string]interface{})
|
||||
err = json.Unmarshal(body, &data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
state, ok := data["state"].(string)
|
||||
if !ok {
|
||||
return "", errors.New("Patroni Get call response contains wrong type for 'state' field")
|
||||
}
|
||||
|
||||
return state, nil
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue