Merge branch 'master' into safe-pod-delete

This commit is contained in:
Sergey Dudoladov 2020-04-20 14:02:54 +02:00
commit 3b382f6f34
24 changed files with 785 additions and 120 deletions

2
.gitignore vendored
View File

@ -7,6 +7,8 @@
_obj
_test
_manifests
_tmp
github.com
# Architecture specific extensions/prefixes
*.[568vq]

View File

@ -74,6 +74,28 @@ spec:
- teamId
- postgresql
properties:
additionalVolumes:
type: array
items:
type: object
required:
- name
- mountPath
- volumeSource
properties:
name:
type: string
mountPath:
type: string
targetContainers:
type: array
nullable: true
items:
type: string
volumeSource:
type: object
subPath:
type: string
allowedSourceRanges:
type: array
nullable: true
@ -218,6 +240,10 @@ spec:
type: integer
retry_timeout:
type: integer
synchronous_mode:
type: boolean
synchronous_mode_strict:
type: boolean
maximum_lag_on_failover:
type: integer
podAnnotations:
@ -338,6 +364,21 @@ spec:
type: string
teamId:
type: string
tls:
type: object
required:
- secretName
properties:
secretName:
type: string
certificateFile:
type: string
privateKeyFile:
type: string
caFile:
type: string
caSecretName:
type: string
tolerations:
type: array
items:

View File

@ -277,11 +277,11 @@ configConnectionPooler:
# docker image
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer"
# max db connections the pooler should hold
connection_pooler_max_db_connections: 60
connection_pooler_max_db_connections: "60"
# default pooling mode
connection_pooler_mode: "transaction"
# number of pooler instances
connection_pooler_number_of_instances: 2
connection_pooler_number_of_instances: "2"
# default resources
connection_pooler_default_cpu_request: 500m
connection_pooler_default_memory_request: 100Mi
@ -294,6 +294,7 @@ rbac:
crd:
# Specifies whether custom resource definitions should be created
# When using helm3, this is ignored; instead use "--skip-crds" to skip.
create: true
serviceAccount:

View File

@ -254,11 +254,11 @@ configConnectionPooler:
# docker image
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer"
# max db connections the pooler should hold
connection_pooler_max_db_connections: 60
connection_pooler_max_db_connections: "60"
# default pooling mode
connection_pooler_mode: "transaction"
# number of pooler instances
connection_pooler_number_of_instances: 2
connection_pooler_number_of_instances: "2"
# default resources
connection_pooler_default_cpu_request: 500m
connection_pooler_default_memory_request: 100Mi
@ -271,6 +271,7 @@ rbac:
crd:
# Specifies whether custom resource definitions should be created
# When using helm3, this is ignored; instead use "--skip-crds" to skip.
create: true
serviceAccount:

View File

@ -154,6 +154,18 @@ These parameters are grouped directly under the `spec` key in the manifest.
[the reference schedule format](https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/#schedule)
into account. Optional. Default is: "30 00 \* \* \*"
* **additionalVolumes**
List of additional volumes to mount in each container of the statefulset pod.
Each item must contain a `name`, `mountPath`, and `volumeSource` which is a
[kubernetes volumeSource](https://godoc.org/k8s.io/api/core/v1#VolumeSource).
It allows you to mount existing PersistentVolumeClaims, ConfigMaps and Secrets inside the StatefulSet.
Also an `emptyDir` volume can be shared between initContainer and statefulSet.
Additionaly, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example).
You can also specify in which container the additional Volumes will be mounted with the `targetContainers` array option.
If `targetContainers` is empty, additional volumes will be mounted only in the `postgres` container.
If you set the `all` special item, it will be mounted in all containers (postgres + sidecars).
Else you can set the list of target containers in which the additional volumes will be mounted (eg : postgres, telegraf)
## Postgres parameters
Those parameters are grouped under the `postgresql` top-level key, which is
@ -217,6 +229,12 @@ explanation of `ttl` and `loop_wait` parameters.
automatically created by Patroni for cluster members and permanent replication
slots. Optional.
* **synchronous_mode**
Patroni `synchronous_mode` parameter value. The default is set to `false`. Optional.
* **synchronous_mode_strict**
Patroni `synchronous_mode_strict` parameter value. Can be used in addition to `synchronous_mode`. The default is set to `false`. Optional.
## Postgres container resources
Those parameters define [CPU and memory requests and limits](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/)
@ -376,10 +394,11 @@ present.
How many instances of connection pooler to create.
* **schema**
Schema to create for credentials lookup function.
Database schema to create for credentials lookup function.
* **user**
User to create for connection pooler to be able to connect to a database.
You can also choose a role from the `users` section or a system user role.
* **dockerImage**
Which docker image to use for connection pooler deployment.
@ -412,5 +431,16 @@ Those parameters are grouped under the `tls` top-level key.
Filename of the private key. Defaults to "tls.key".
* **caFile**
Optional filename to the CA certificate. Useful when the client connects
with `sslmode=verify-ca` or `sslmode=verify-full`. Default is empty.
Optional filename to the CA certificate (e.g. "ca.crt"). Useful when the
client connects with `sslmode=verify-ca` or `sslmode=verify-full`.
Default is empty.
* **caSecretName**
By setting the `caSecretName` value, the ca certificate file defined by the
`caFile` will be fetched from this secret instead of `secretName` above.
This secret has to hold a file with that name in its root.
Optionally one can provide full path for any of them. By default it is
relative to the "/tls/", which is mount path of the tls secret.
If `caSecretName` is defined, the ca.crt path is relative to "/tlsca/",
otherwise to the same "/tls/".

View File

@ -83,7 +83,7 @@ Those are top-level keys, containing both leaf keys and groups.
* **kubernetes_use_configmaps**
Select if setup uses endpoints (default), or configmaps to manage leader when
DCS is kubernetes (not etcd or similar). In OpenShift it is not possible to
use endpoints option, and configmaps is required. By default,
use endpoints option, and configmaps is required. By default,
`kubernetes_use_configmaps: false`, meaning endpoints will be used.
* **docker_image**
@ -615,11 +615,14 @@ operator being able to provide some reasonable defaults.
the required minimum.
* **connection_pooler_schema**
Schema to create for credentials lookup function. Default is `pooler`.
Database schema to create for credentials lookup function to be used by the
connection pooler. Is is created in every database of the Postgres cluster.
You can also choose an existing schema. Default schema is `pooler`.
* **connection_pooler_user**
User to create for connection pooler to be able to connect to a database.
Default is `pooler`.
You can also choose an existing role, but make sure it has the `LOGIN`
privilege. Default role is `pooler`.
* **connection_pooler_image**
Docker image to use for connection pooler deployment.

View File

@ -527,7 +527,7 @@ spec:
This will tell the operator to create a connection pooler with default
configuration, through which one can access the master via a separate service
`{cluster-name}-pooler`. In most of the cases the
[default configuration](reference/operator_parameters.md#connection-pool-configuration)
[default configuration](reference/operator_parameters.md#connection-pooler-configuration)
should be good enough. To configure a new connection pooler individually for
each Postgres cluster, specify:
@ -540,7 +540,8 @@ spec:
# in which mode to run, session or transaction
mode: "transaction"
# schema, which operator will create to install credentials lookup function
# schema, which operator will create in each database
# to install credentials lookup function for connection pooler
schema: "pooler"
# user, which operator will create for connection pooler
@ -560,11 +561,11 @@ The `enableConnectionPooler` flag is not required when the `connectionPooler`
section is present in the manifest. But, it can be used to disable/remove the
pooler while keeping its configuration.
By default, `pgbouncer` is used as connection pooler. To find out about pooler
modes read the `pgbouncer` [docs](https://www.pgbouncer.org/config.html#pooler_mode)
By default, [`PgBouncer`](https://www.pgbouncer.org/) is used as connection pooler.
To find out about pool modes read the `PgBouncer` [docs](https://www.pgbouncer.org/config.html#pooler_mode)
(but it should be the general approach between different implementation).
Note, that using `pgbouncer` a meaningful resource CPU limit should be 1 core
Note, that using `PgBouncer` a meaningful resource CPU limit should be 1 core
or less (there is a way to utilize more than one, but in K8s it's easier just to
spin up more instances).
@ -583,7 +584,8 @@ don't know the value, use `103` which is the GID from the default spilo image
OpenShift allocates the users and groups dynamically (based on scc), and their
range is different in every namespace. Due to this dynamic behaviour, it's not
trivial to know at deploy time the uid/gid of the user in the cluster.
This way, in OpenShift, you may want to skip the spilo_fsgroup setting.
Therefore, instead of using a global `spilo_fsgroup` setting, use the `spiloFSGroup` field
per Postgres cluster.
Upload the cert as a kubernetes secret:
```sh
@ -592,7 +594,7 @@ kubectl create secret tls pg-tls \
--cert pg-tls.crt
```
Or with a CA:
When doing client auth, CA can come optionally from the same secret:
```sh
kubectl create secret generic pg-tls \
--from-file=tls.crt=server.crt \
@ -600,9 +602,6 @@ kubectl create secret generic pg-tls \
--from-file=ca.crt=ca.crt
```
Alternatively it is also possible to use
[cert-manager](https://cert-manager.io/docs/) to generate these secrets.
Then configure the postgres resource with the TLS secret:
```yaml
@ -617,5 +616,29 @@ spec:
caFile: "ca.crt" # add this if the secret is configured with a CA
```
Optionally, the CA can be provided by a different secret:
```sh
kubectl create secret generic pg-tls-ca \
--from-file=ca.crt=ca.crt
```
Then configure the postgres resource with the TLS secret:
```yaml
apiVersion: "acid.zalan.do/v1"
kind: postgresql
metadata:
name: acid-test-cluster
spec:
tls:
secretName: "pg-tls" # this should hold tls.key and tls.crt
caSecretName: "pg-tls-ca" # this should hold ca.crt
caFile: "ca.crt" # add this if the secret is configured with a CA
```
Alternatively, it is also possible to use
[cert-manager](https://cert-manager.io/docs/) to generate these secrets.
Certificate rotation is handled in the spilo image which checks every 5
minutes if the certificates have changed and reloads postgres accordingly.

View File

@ -9,9 +9,6 @@ metadata:
spec:
dockerImage: registry.opensource.zalan.do/acid/spilo-12:1.6-p2
teamId: "acid"
volume:
size: 1Gi
# storageClass: my-sc
numberOfInstances: 2
users: # Application/Robot users
zalando:
@ -30,6 +27,32 @@ spec:
shared_buffers: "32MB"
max_connections: "10"
log_statement: "all"
volume:
size: 1Gi
# storageClass: my-sc
additionalVolumes:
- name: data
mountPath: /home/postgres/pgdata/partitions
targetContainers:
- postgres
volumeSource:
PersistentVolumeClaim:
claimName: pvc-postgresql-data-partitions
readyOnly: false
- name: conf
mountPath: /etc/telegraf
subPath: telegraf.conf
targetContainers:
- telegraf-sidecar
volumeSource:
configMap:
name: my-config-map
- name: empty
mountPath: /opt/empty
targetContainers:
- all
volumeSource:
emptyDir: {}
enableShmVolume: true
# spiloFSGroup: 103
@ -67,6 +90,8 @@ spec:
ttl: 30
loop_wait: &loop_wait 10
retry_timeout: 10
synchronous_mode: false
synchronous_mode_strict: false
maximum_lag_on_failover: 33554432
# restore a Postgres DB with point-in-time-recovery
@ -123,5 +148,10 @@ spec:
certificateFile: "tls.crt"
privateKeyFile: "tls.key"
caFile: "" # optionally configure Postgres with a CA certificate
caSecretName: "" # optionally the ca.crt can come from this secret instead.
# file names can be also defined with absolute path, and will no longer be relative
# to the "/tls/" path where the secret is being mounted by default, and "/tlsca/"
# where the caSecret is mounted by default.
# When TLS is enabled, also set spiloFSGroup parameter above to the relevant value.
# if unknown, set it to 103 which is the usual value in the default spilo images.
# In Openshift, there is no need to set spiloFSGroup/spilo_fsgroup.

View File

@ -15,7 +15,7 @@ data:
# connection_pooler_default_cpu_request: "500m"
# connection_pooler_default_memory_limit: 100Mi
# connection_pooler_default_memory_request: 100Mi
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5"
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-6"
# connection_pooler_max_db_connections: 60
# connection_pooler_mode: "transaction"
# connection_pooler_number_of_instances: 2

View File

@ -127,7 +127,7 @@ configuration:
connection_pooler_default_cpu_request: "500m"
connection_pooler_default_memory_limit: 100Mi
connection_pooler_default_memory_request: 100Mi
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5"
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-6"
# connection_pooler_max_db_connections: 60
connection_pooler_mode: "transaction"
connection_pooler_number_of_instances: 2

View File

@ -38,6 +38,28 @@ spec:
- teamId
- postgresql
properties:
additionalVolumes:
type: array
items:
type: object
required:
- name
- mountPath
- volumeSource
properties:
name:
type: string
mountPath:
type: string
targetContainers:
type: array
nullable: true
items:
type: string
volumeSource:
type: object
subPath:
type: string
allowedSourceRanges:
type: array
nullable: true
@ -184,6 +206,10 @@ spec:
type: integer
maximum_lag_on_failover:
type: integer
synchronous_mode:
type: boolean
synchronous_mode_strict:
type: boolean
podAnnotations:
type: object
additionalProperties:
@ -315,6 +341,8 @@ spec:
type: string
caFile:
type: string
caSecretName:
type: string
tolerations:
type: array
items:

View File

@ -358,6 +358,12 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
"maximum_lag_on_failover": {
Type: "integer",
},
"synchronous_mode": {
Type: "boolean",
},
"synchronous_mode_strict": {
Type: "boolean",
},
},
},
"podAnnotations": {
@ -507,6 +513,9 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
"caFile": {
Type: "string",
},
"caSecretName": {
Type: "string",
},
},
},
"tolerations": {
@ -676,6 +685,37 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
},
},
},
"additionalVolumes": {
Type: "array",
Items: &apiextv1beta1.JSONSchemaPropsOrArray{
Schema: &apiextv1beta1.JSONSchemaProps{
Type: "object",
Required: []string{"name", "mountPath", "volumeSource"},
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"name": {
Type: "string",
},
"mountPath": {
Type: "string",
},
"targetContainers": {
Type: "array",
Items: &apiextv1beta1.JSONSchemaPropsOrArray{
Schema: &apiextv1beta1.JSONSchemaProps{
Type: "string",
},
},
},
"volumeSource": {
Type: "object",
},
"subPath": {
Type: "string",
},
},
},
},
},
},
},
"status": {

View File

@ -67,6 +67,7 @@ type PostgresSpec struct {
PodAnnotations map[string]string `json:"podAnnotations"`
ServiceAnnotations map[string]string `json:"serviceAnnotations"`
TLS *TLSDescription `json:"tls"`
AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"`
// deprecated json tags
InitContainersOld []v1.Container `json:"init_containers,omitempty"`
@ -98,6 +99,14 @@ type Volume struct {
SubPath string `json:"subPath,omitempty"`
}
type AdditionalVolume struct {
Name string `json:"name"`
MountPath string `json:"mountPath"`
SubPath string `json:"subPath"`
TargetContainers []string `json:"targetContainers"`
VolumeSource v1.VolumeSource `json:"volume"`
}
// PostgresqlParam describes PostgreSQL version and pairs of configuration parameter name - values.
type PostgresqlParam struct {
PgVersion string `json:"version"`
@ -118,13 +127,15 @@ type Resources struct {
// Patroni contains Patroni-specific configuration
type Patroni struct {
InitDB map[string]string `json:"initdb"`
PgHba []string `json:"pg_hba"`
TTL uint32 `json:"ttl"`
LoopWait uint32 `json:"loop_wait"`
RetryTimeout uint32 `json:"retry_timeout"`
MaximumLagOnFailover float32 `json:"maximum_lag_on_failover"` // float32 because https://github.com/kubernetes/kubernetes/issues/30213
Slots map[string]map[string]string `json:"slots"`
InitDB map[string]string `json:"initdb"`
PgHba []string `json:"pg_hba"`
TTL uint32 `json:"ttl"`
LoopWait uint32 `json:"loop_wait"`
RetryTimeout uint32 `json:"retry_timeout"`
MaximumLagOnFailover float32 `json:"maximum_lag_on_failover"` // float32 because https://github.com/kubernetes/kubernetes/issues/30213
Slots map[string]map[string]string `json:"slots"`
SynchronousMode bool `json:"synchronous_mode"`
SynchronousModeStrict bool `json:"synchronous_mode_strict"`
}
//StandbyCluster
@ -137,6 +148,7 @@ type TLSDescription struct {
CertificateFile string `json:"certificateFile,omitempty"`
PrivateKeyFile string `json:"privateKeyFile,omitempty"`
CAFile string `json:"caFile,omitempty"`
CASecretName string `json:"caSecretName,omitempty"`
}
// CloneDescription describes which cluster the new should clone and up to which point in time

View File

@ -47,6 +47,28 @@ func (in *AWSGCPConfiguration) DeepCopy() *AWSGCPConfiguration {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AdditionalVolume) DeepCopyInto(out *AdditionalVolume) {
*out = *in
if in.TargetContainers != nil {
in, out := &in.TargetContainers, &out.TargetContainers
*out = make([]string, len(*in))
copy(*out, *in)
}
in.VolumeSource.DeepCopyInto(&out.VolumeSource)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdditionalVolume.
func (in *AdditionalVolume) DeepCopy() *AdditionalVolume {
if in == nil {
return nil
}
out := new(AdditionalVolume)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CloneDescription) DeepCopyInto(out *CloneDescription) {
*out = *in
@ -591,6 +613,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) {
*out = new(TLSDescription)
**out = **in
}
if in.AdditionalVolumes != nil {
in, out := &in.AdditionalVolumes, &out.AdditionalVolumes
*out = make([]AdditionalVolume, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.InitContainersOld != nil {
in, out := &in.InitContainersOld, &out.InitContainersOld
*out = make([]corev1.Container, len(*in))

View File

@ -741,7 +741,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
// sync connection pooler
if err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil {
if _, err := c.syncConnectionPooler(oldSpec, newSpec,
c.installLookupFunction); err != nil {
return fmt.Errorf("could not sync connection pooler: %v", err)
}
@ -783,8 +784,10 @@ func (c *Cluster) Delete() {
for _, role := range []PostgresRole{Master, Replica} {
if err := c.deleteEndpoint(role); err != nil {
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
if !c.patroniKubernetesUseConfigMaps() {
if err := c.deleteEndpoint(role); err != nil {
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
}
}
if err := c.deleteService(role); err != nil {
@ -877,9 +880,20 @@ func (c *Cluster) initSystemUsers() {
c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{}
}
username := util.Coalesce(
c.Spec.ConnectionPooler.User,
c.OpConfig.ConnectionPooler.User)
// Using superuser as pooler user is not a good idea. First of all it's
// not going to be synced correctly with the current implementation,
// and second it's a bad practice.
username := c.OpConfig.ConnectionPooler.User
isSuperUser := c.Spec.ConnectionPooler.User == c.OpConfig.SuperUsername
isProtectedUser := c.shouldAvoidProtectedOrSystemRole(
c.Spec.ConnectionPooler.User, "connection pool role")
if !isSuperUser && !isProtectedUser {
username = util.Coalesce(
c.Spec.ConnectionPooler.User,
c.OpConfig.ConnectionPooler.User)
}
// connection pooler application should be able to login with this role
connectionPoolerUser := spec.PgUser{
@ -1149,11 +1163,19 @@ type clusterObjectDelete func(name string) error
func (c *Cluster) deletePatroniClusterObjects() error {
// TODO: figure out how to remove leftover patroni objects in other cases
var actionsList []simpleActionWithResult
if !c.patroniUsesKubernetes() {
c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete")
}
c.logger.Debugf("removing leftover Patroni objects (endpoints, services and configmaps)")
for _, deleter := range []simpleActionWithResult{c.deletePatroniClusterEndpoints, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps} {
if !c.patroniKubernetesUseConfigMaps() {
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
}
actionsList = append(actionsList, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps)
c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)")
for _, deleter := range actionsList {
if err := deleter(); err != nil {
return err
}

View File

@ -721,4 +721,38 @@ func TestInitSystemUsers(t *testing.T) {
if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; !exist {
t.Errorf("%s, connection pooler user is not present", testName)
}
// superuser is not allowed as connection pool user
cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{
User: "postgres",
}
cl.OpConfig.SuperUsername = "postgres"
cl.OpConfig.ConnectionPooler.User = "pooler"
cl.initSystemUsers()
if _, exist := cl.pgUsers["pooler"]; !exist {
t.Errorf("%s, Superuser is not allowed to be a connection pool user", testName)
}
// neither protected users are
delete(cl.pgUsers, "pooler")
cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{
User: "admin",
}
cl.OpConfig.ProtectedRoles = []string{"admin"}
cl.initSystemUsers()
if _, exist := cl.pgUsers["pooler"]; !exist {
t.Errorf("%s, Protected user are not allowed to be a connection pool user", testName)
}
delete(cl.pgUsers, "pooler")
cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{
User: "standby",
}
cl.initSystemUsers()
if _, exist := cl.pgUsers["pooler"]; !exist {
t.Errorf("%s, System users are not allowed to be a connection pool user", testName)
}
}

View File

@ -49,6 +49,8 @@ type patroniDCS struct {
LoopWait uint32 `json:"loop_wait,omitempty"`
RetryTimeout uint32 `json:"retry_timeout,omitempty"`
MaximumLagOnFailover float32 `json:"maximum_lag_on_failover,omitempty"`
SynchronousMode bool `json:"synchronous_mode,omitempty"`
SynchronousModeStrict bool `json:"synchronous_mode_strict,omitempty"`
PGBootstrapConfiguration map[string]interface{} `json:"postgresql,omitempty"`
Slots map[string]map[string]string `json:"slots,omitempty"`
}
@ -283,6 +285,12 @@ PatroniInitDBParams:
if patroni.Slots != nil {
config.Bootstrap.DCS.Slots = patroni.Slots
}
if patroni.SynchronousMode {
config.Bootstrap.DCS.SynchronousMode = patroni.SynchronousMode
}
if patroni.SynchronousModeStrict != false {
config.Bootstrap.DCS.SynchronousModeStrict = patroni.SynchronousModeStrict
}
config.PgLocalConfiguration = make(map[string]interface{})
config.PgLocalConfiguration[patroniPGBinariesParameterName] = fmt.Sprintf(pgBinariesLocationTemplate, pg.PgVersion)
@ -492,7 +500,7 @@ func mountShmVolumeNeeded(opConfig config.Config, spec *acidv1.PostgresSpec) *bo
return opConfig.ShmVolume
}
func generatePodTemplate(
func (c *Cluster) generatePodTemplate(
namespace string,
labels labels.Set,
annotations map[string]string,
@ -511,7 +519,7 @@ func generatePodTemplate(
podAntiAffinityTopologyKey string,
additionalSecretMount string,
additionalSecretMountPath string,
volumes []v1.Volume,
additionalVolumes []acidv1.AdditionalVolume,
) (*v1.PodTemplateSpec, error) {
terminateGracePeriodSeconds := terminateGracePeriod
@ -530,7 +538,6 @@ func generatePodTemplate(
InitContainers: initContainers,
Tolerations: *tolerationsSpec,
SecurityContext: &securityContext,
Volumes: volumes,
}
if shmVolume != nil && *shmVolume {
@ -551,6 +558,10 @@ func generatePodTemplate(
addSecretVolume(&podSpec, additionalSecretMount, additionalSecretMountPath)
}
if additionalVolumes != nil {
c.addAdditionalVolumes(&podSpec, additionalVolumes)
}
template := v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
@ -841,7 +852,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
sidecarContainers []v1.Container
podTemplate *v1.PodTemplateSpec
volumeClaimTemplate *v1.PersistentVolumeClaim
volumes []v1.Volume
additionalVolumes = spec.AdditionalVolumes
)
// Improve me. Please.
@ -994,8 +1005,10 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
// this is combined with the FSGroup in the section above
// to give read access to the postgres user
defaultMode := int32(0640)
volumes = append(volumes, v1.Volume{
Name: "tls-secret",
mountPath := "/tls"
additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{
Name: spec.TLS.SecretName,
MountPath: mountPath,
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: spec.TLS.SecretName,
@ -1004,13 +1017,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
},
})
mountPath := "/tls"
volumeMounts = append(volumeMounts, v1.VolumeMount{
MountPath: mountPath,
Name: "tls-secret",
ReadOnly: true,
})
// use the same filenames as Secret resources by default
certFile := ensurePath(spec.TLS.CertificateFile, mountPath, "tls.crt")
privateKeyFile := ensurePath(spec.TLS.PrivateKeyFile, mountPath, "tls.key")
@ -1021,11 +1027,31 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
)
if spec.TLS.CAFile != "" {
caFile := ensurePath(spec.TLS.CAFile, mountPath, "")
// support scenario when the ca.crt resides in a different secret, diff path
mountPathCA := mountPath
if spec.TLS.CASecretName != "" {
mountPathCA = mountPath + "ca"
}
caFile := ensurePath(spec.TLS.CAFile, mountPathCA, "")
spiloEnvVars = append(
spiloEnvVars,
v1.EnvVar{Name: "SSL_CA_FILE", Value: caFile},
)
// the ca file from CASecretName secret takes priority
if spec.TLS.CASecretName != "" {
additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{
Name: spec.TLS.CASecretName,
MountPath: mountPathCA,
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: spec.TLS.CASecretName,
DefaultMode: &defaultMode,
},
},
})
}
}
}
@ -1076,7 +1102,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
annotations := c.generatePodAnnotations(spec)
// generate pod template for the statefulset, based on the spilo container and sidecars
podTemplate, err = generatePodTemplate(
podTemplate, err = c.generatePodTemplate(
c.Namespace,
c.labelsSet(true),
annotations,
@ -1095,8 +1121,8 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
c.OpConfig.PodAntiAffinityTopologyKey,
c.OpConfig.AdditionalSecretMount,
c.OpConfig.AdditionalSecretMountPath,
volumes,
)
additionalVolumes)
if err != nil {
return nil, fmt.Errorf("could not generate pod template: %v", err)
}
@ -1290,6 +1316,69 @@ func addSecretVolume(podSpec *v1.PodSpec, additionalSecretMount string, addition
podSpec.Volumes = volumes
}
func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec,
additionalVolumes []acidv1.AdditionalVolume) {
volumes := podSpec.Volumes
mountPaths := map[string]acidv1.AdditionalVolume{}
for i, v := range additionalVolumes {
if previousVolume, exist := mountPaths[v.MountPath]; exist {
msg := "Volume %+v cannot be mounted to the same path as %+v"
c.logger.Warningf(msg, v, previousVolume)
continue
}
if v.MountPath == constants.PostgresDataMount {
msg := "Cannot mount volume on postgresql data directory, %+v"
c.logger.Warningf(msg, v)
continue
}
if v.TargetContainers == nil {
spiloContainer := podSpec.Containers[0]
additionalVolumes[i].TargetContainers = []string{spiloContainer.Name}
}
for _, target := range v.TargetContainers {
if target == "all" && len(v.TargetContainers) != 1 {
msg := `Target containers could be either "all" or a list
of containers, mixing those is not allowed, %+v`
c.logger.Warningf(msg, v)
continue
}
}
volumes = append(volumes,
v1.Volume{
Name: v.Name,
VolumeSource: v.VolumeSource,
},
)
mountPaths[v.MountPath] = v
}
c.logger.Infof("Mount additional volumes: %+v", additionalVolumes)
for i := range podSpec.Containers {
mounts := podSpec.Containers[i].VolumeMounts
for _, v := range additionalVolumes {
for _, target := range v.TargetContainers {
if podSpec.Containers[i].Name == target || target == "all" {
mounts = append(mounts, v1.VolumeMount{
Name: v.Name,
MountPath: v.MountPath,
SubPath: v.SubPath,
})
}
}
}
podSpec.Containers[i].VolumeMounts = mounts
}
podSpec.Volumes = volumes
}
func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
var storageClassName *string
@ -1537,11 +1626,11 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription)
c.logger.Info(msg, description.S3WalPath)
envs := []v1.EnvVar{
v1.EnvVar{
{
Name: "CLONE_WAL_S3_BUCKET",
Value: c.OpConfig.WALES3Bucket,
},
v1.EnvVar{
{
Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX",
Value: getBucketScopeSuffix(description.UID),
},
@ -1694,7 +1783,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
annotations := c.generatePodAnnotations(&c.Spec)
// re-use the method that generates DB pod templates
if podTemplate, err = generatePodTemplate(
if podTemplate, err = c.generatePodTemplate(
c.Namespace,
labels,
annotations,
@ -1713,8 +1802,8 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
"",
c.OpConfig.AdditionalSecretMount,
c.OpConfig.AdditionalSecretMountPath,
nil); err != nil {
return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err)
[]acidv1.AdditionalVolume{}); err != nil {
return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err)
}
// overwrite specific params of logical backups pods

View File

@ -65,16 +65,18 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) {
"locale": "en_US.UTF-8",
"data-checksums": "true",
},
PgHba: []string{"hostssl all all 0.0.0.0/0 md5", "host all all 0.0.0.0/0 md5"},
TTL: 30,
LoopWait: 10,
RetryTimeout: 10,
MaximumLagOnFailover: 33554432,
Slots: map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "pgoutput"}},
PgHba: []string{"hostssl all all 0.0.0.0/0 md5", "host all all 0.0.0.0/0 md5"},
TTL: 30,
LoopWait: 10,
RetryTimeout: 10,
MaximumLagOnFailover: 33554432,
SynchronousMode: true,
SynchronousModeStrict: true,
Slots: map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "pgoutput"}},
},
role: "zalandos",
opConfig: config.Config{},
result: `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}}}}`,
result: `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"synchronous_mode":true,"synchronous_mode_strict":true,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}}}}`,
},
}
for _, tt := range tests {
@ -959,6 +961,7 @@ func TestTLS(t *testing.T) {
var spec acidv1.PostgresSpec
var cluster *Cluster
var spiloFSGroup = int64(103)
var additionalVolumes = spec.AdditionalVolumes
makeSpec := func(tls acidv1.TLSDescription) acidv1.PostgresSpec {
return acidv1.PostgresSpec{
@ -998,8 +1001,20 @@ func TestTLS(t *testing.T) {
assert.Equal(t, &fsGroup, s.Spec.Template.Spec.SecurityContext.FSGroup, "has a default FSGroup assigned")
defaultMode := int32(0640)
mountPath := "/tls"
additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{
Name: spec.TLS.SecretName,
MountPath: mountPath,
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: spec.TLS.SecretName,
DefaultMode: &defaultMode,
},
},
})
volume := v1.Volume{
Name: "tls-secret",
Name: "my-secret",
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: "my-secret",
@ -1011,11 +1026,179 @@ func TestTLS(t *testing.T) {
assert.Contains(t, s.Spec.Template.Spec.Containers[0].VolumeMounts, v1.VolumeMount{
MountPath: "/tls",
Name: "tls-secret",
ReadOnly: true,
Name: "my-secret",
}, "the volume gets mounted in /tls")
assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CERTIFICATE_FILE", Value: "/tls/tls.crt"})
assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_PRIVATE_KEY_FILE", Value: "/tls/tls.key"})
assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CA_FILE", Value: "/tls/ca.crt"})
}
func TestAdditionalVolume(t *testing.T) {
testName := "TestAdditionalVolume"
tests := []struct {
subTest string
podSpec *v1.PodSpec
volumePos int
}{
{
subTest: "empty PodSpec",
podSpec: &v1.PodSpec{
Volumes: []v1.Volume{},
Containers: []v1.Container{
{
VolumeMounts: []v1.VolumeMount{},
},
},
},
volumePos: 0,
},
{
subTest: "non empty PodSpec",
podSpec: &v1.PodSpec{
Volumes: []v1.Volume{{}},
Containers: []v1.Container{
{
Name: "postgres",
VolumeMounts: []v1.VolumeMount{
{
Name: "data",
ReadOnly: false,
MountPath: "/data",
},
},
},
},
},
volumePos: 1,
},
{
subTest: "non empty PodSpec with sidecar",
podSpec: &v1.PodSpec{
Volumes: []v1.Volume{{}},
Containers: []v1.Container{
{
Name: "postgres",
VolumeMounts: []v1.VolumeMount{
{
Name: "data",
ReadOnly: false,
MountPath: "/data",
},
},
},
{
Name: "sidecar",
VolumeMounts: []v1.VolumeMount{
{
Name: "data",
ReadOnly: false,
MountPath: "/data",
},
},
},
},
},
volumePos: 1,
},
}
var cluster = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
for _, tt := range tests {
// Test with additional volume mounted in all containers
additionalVolumeMount := []acidv1.AdditionalVolume{
{
Name: "test",
MountPath: "/test",
TargetContainers: []string{"all"},
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
},
}
numMounts := len(tt.podSpec.Containers[0].VolumeMounts)
cluster.addAdditionalVolumes(tt.podSpec, additionalVolumeMount)
volumeName := tt.podSpec.Volumes[tt.volumePos].Name
if volumeName != additionalVolumeMount[0].Name {
t.Errorf("%s %s: Expected volume %v was not created, have %s instead",
testName, tt.subTest, additionalVolumeMount, volumeName)
}
for i := range tt.podSpec.Containers {
volumeMountName := tt.podSpec.Containers[i].VolumeMounts[tt.volumePos].Name
if volumeMountName != additionalVolumeMount[0].Name {
t.Errorf("%s %s: Expected mount %v was not created, have %s instead",
testName, tt.subTest, additionalVolumeMount, volumeMountName)
}
}
numMountsCheck := len(tt.podSpec.Containers[0].VolumeMounts)
if numMountsCheck != numMounts+1 {
t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v",
numMountsCheck, numMounts+1)
}
}
for _, tt := range tests {
// Test with additional volume mounted only in first container
additionalVolumeMount := []acidv1.AdditionalVolume{
{
Name: "test",
MountPath: "/test",
TargetContainers: []string{"postgres"},
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
},
}
numMounts := len(tt.podSpec.Containers[0].VolumeMounts)
cluster.addAdditionalVolumes(tt.podSpec, additionalVolumeMount)
volumeName := tt.podSpec.Volumes[tt.volumePos].Name
if volumeName != additionalVolumeMount[0].Name {
t.Errorf("%s %s: Expected volume %v was not created, have %s instead",
testName, tt.subTest, additionalVolumeMount, volumeName)
}
for _, container := range tt.podSpec.Containers {
if container.Name == "postgres" {
volumeMountName := container.VolumeMounts[tt.volumePos].Name
if volumeMountName != additionalVolumeMount[0].Name {
t.Errorf("%s %s: Expected mount %v was not created, have %s instead",
testName, tt.subTest, additionalVolumeMount, volumeMountName)
}
numMountsCheck := len(container.VolumeMounts)
if numMountsCheck != numMounts+1 {
t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v",
numMountsCheck, numMounts+1)
}
} else {
numMountsCheck := len(container.VolumeMounts)
if numMountsCheck == numMounts+1 {
t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v",
numMountsCheck, numMounts)
}
}
}
}
}

View File

@ -105,7 +105,12 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
var msg string
c.setProcessName("creating connection pooler")
if c.ConnectionPooler == nil {
c.ConnectionPooler = &ConnectionPoolerObjects{}
}
schema := c.Spec.ConnectionPooler.Schema
if schema == "" {
schema = c.OpConfig.ConnectionPooler.Schema
}

View File

@ -111,7 +111,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
// sync connection pooler
if err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil {
if _, err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil {
return fmt.Errorf("could not sync connection pooler: %v", err)
}
@ -122,10 +122,11 @@ func (c *Cluster) syncServices() error {
for _, role := range []PostgresRole{Master, Replica} {
c.logger.Debugf("syncing %s service", role)
if err := c.syncEndpoint(role); err != nil {
return fmt.Errorf("could not sync %s endpoint: %v", role, err)
if !c.patroniKubernetesUseConfigMaps() {
if err := c.syncEndpoint(role); err != nil {
return fmt.Errorf("could not sync %s endpoint: %v", role, err)
}
}
if err := c.syncService(role); err != nil {
return fmt.Errorf("could not sync %s service: %v", role, err)
}
@ -620,7 +621,13 @@ func (c *Cluster) syncLogicalBackupJob() error {
return nil
}
func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error {
func (c *Cluster) syncConnectionPooler(oldSpec,
newSpec *acidv1.Postgresql,
lookup InstallFunction) (SyncReason, error) {
var reason SyncReason
var err error
if c.ConnectionPooler == nil {
c.ConnectionPooler = &ConnectionPoolerObjects{}
}
@ -657,20 +664,20 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, look
specUser,
c.OpConfig.ConnectionPooler.User)
if err := lookup(schema, user); err != nil {
return err
if err = lookup(schema, user); err != nil {
return NoSync, err
}
}
if err := c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {
if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {
c.logger.Errorf("could not sync connection pooler: %v", err)
return err
return reason, err
}
}
if oldNeedConnectionPooler && !newNeedConnectionPooler {
// delete and cleanup resources
if err := c.deleteConnectionPooler(); err != nil {
if err = c.deleteConnectionPooler(); err != nil {
c.logger.Warningf("could not remove connection pooler: %v", err)
}
}
@ -681,20 +688,22 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, look
(c.ConnectionPooler.Deployment != nil ||
c.ConnectionPooler.Service != nil) {
if err := c.deleteConnectionPooler(); err != nil {
if err = c.deleteConnectionPooler(); err != nil {
c.logger.Warningf("could not remove connection pooler: %v", err)
}
}
}
return nil
return reason, nil
}
// Synchronize connection pooler resources. Effectively we're interested only in
// synchronizing the corresponding deployment, but in case of deployment or
// service is missing, create it. After checking, also remember an object for
// the future references.
func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) error {
func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) (
SyncReason, error) {
deployment, err := c.KubeClient.
Deployments(c.Namespace).
Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{})
@ -706,7 +715,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
if err != nil {
msg = "could not generate deployment for connection pooler: %v"
return fmt.Errorf(msg, err)
return NoSync, fmt.Errorf(msg, err)
}
deployment, err := c.KubeClient.
@ -714,18 +723,35 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
if err != nil {
return err
return NoSync, err
}
c.ConnectionPooler.Deployment = deployment
} else if err != nil {
return fmt.Errorf("could not get connection pooler deployment to sync: %v", err)
msg := "could not get connection pooler deployment to sync: %v"
return NoSync, fmt.Errorf(msg, err)
} else {
c.ConnectionPooler.Deployment = deployment
// actual synchronization
oldConnectionPooler := oldSpec.Spec.ConnectionPooler
newConnectionPooler := newSpec.Spec.ConnectionPooler
// sync implementation below assumes that both old and new specs are
// not nil, but it can happen. To avoid any confusion like updating a
// deployment because the specification changed from nil to an empty
// struct (that was initialized somewhere before) replace any nil with
// an empty spec.
if oldConnectionPooler == nil {
oldConnectionPooler = &acidv1.ConnectionPooler{}
}
if newConnectionPooler == nil {
newConnectionPooler = &acidv1.ConnectionPooler{}
}
c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler)
specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler)
defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment)
reason := append(specReason, defaultsReason...)
@ -736,7 +762,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
if err != nil {
msg := "could not generate deployment for connection pooler: %v"
return fmt.Errorf(msg, err)
return reason, fmt.Errorf(msg, err)
}
oldDeploymentSpec := c.ConnectionPooler.Deployment
@ -746,11 +772,11 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
newDeploymentSpec)
if err != nil {
return err
return reason, err
}
c.ConnectionPooler.Deployment = deployment
return nil
return reason, nil
}
}
@ -768,16 +794,17 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
if err != nil {
return err
return NoSync, err
}
c.ConnectionPooler.Service = service
} else if err != nil {
return fmt.Errorf("could not get connection pooler service to sync: %v", err)
msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err)
} else {
// Service updates are not supported and probably not that useful anyway
c.ConnectionPooler.Service = service
}
return nil
return NoSync, nil
}

View File

@ -2,6 +2,7 @@ package cluster
import (
"fmt"
"strings"
"testing"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
@ -17,7 +18,7 @@ func int32ToPointer(value int32) *int32 {
return &value
}
func deploymentUpdated(cluster *Cluster, err error) error {
func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error {
if cluster.ConnectionPooler.Deployment.Spec.Replicas == nil ||
*cluster.ConnectionPooler.Deployment.Spec.Replicas != 2 {
return fmt.Errorf("Wrong nubmer of instances")
@ -26,7 +27,7 @@ func deploymentUpdated(cluster *Cluster, err error) error {
return nil
}
func objectsAreSaved(cluster *Cluster, err error) error {
func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error {
if cluster.ConnectionPooler == nil {
return fmt.Errorf("Connection pooler resources are empty")
}
@ -42,7 +43,7 @@ func objectsAreSaved(cluster *Cluster, err error) error {
return nil
}
func objectsAreDeleted(cluster *Cluster, err error) error {
func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error {
if cluster.ConnectionPooler != nil {
return fmt.Errorf("Connection pooler was not deleted")
}
@ -50,6 +51,16 @@ func objectsAreDeleted(cluster *Cluster, err error) error {
return nil
}
func noEmptySync(cluster *Cluster, err error, reason SyncReason) error {
for _, msg := range reason {
if strings.HasPrefix(msg, "update [] from '<nil>' to '") {
return fmt.Errorf("There is an empty reason, %s", msg)
}
}
return nil
}
func TestConnectionPoolerSynchronization(t *testing.T) {
testName := "Test connection pooler synchronization"
var cluster = New(
@ -91,15 +102,15 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
clusterNewDefaultsMock := *cluster
clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient()
cluster.OpConfig.ConnectionPooler.Image = "pooler:2.0"
cluster.OpConfig.ConnectionPooler.NumberOfInstances = int32ToPointer(2)
tests := []struct {
subTest string
oldSpec *acidv1.Postgresql
newSpec *acidv1.Postgresql
cluster *Cluster
check func(cluster *Cluster, err error) error
subTest string
oldSpec *acidv1.Postgresql
newSpec *acidv1.Postgresql
cluster *Cluster
defaultImage string
defaultInstances int32
check func(cluster *Cluster, err error, reason SyncReason) error
}{
{
subTest: "create if doesn't exist",
@ -113,8 +124,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
ConnectionPooler: &acidv1.ConnectionPooler{},
},
},
cluster: &clusterMissingObjects,
check: objectsAreSaved,
cluster: &clusterMissingObjects,
defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreSaved,
},
{
subTest: "create if doesn't exist with a flag",
@ -126,8 +139,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
EnableConnectionPooler: boolToPointer(true),
},
},
cluster: &clusterMissingObjects,
check: objectsAreSaved,
cluster: &clusterMissingObjects,
defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreSaved,
},
{
subTest: "create from scratch",
@ -139,8 +154,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
ConnectionPooler: &acidv1.ConnectionPooler{},
},
},
cluster: &clusterMissingObjects,
check: objectsAreSaved,
cluster: &clusterMissingObjects,
defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreSaved,
},
{
subTest: "delete if not needed",
@ -152,8 +169,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{},
},
cluster: &clusterMock,
check: objectsAreDeleted,
cluster: &clusterMock,
defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreDeleted,
},
{
subTest: "cleanup if still there",
@ -163,8 +182,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{},
},
cluster: &clusterDirtyMock,
check: objectsAreDeleted,
cluster: &clusterDirtyMock,
defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreDeleted,
},
{
subTest: "update deployment",
@ -182,8 +203,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
},
},
},
cluster: &clusterMock,
check: deploymentUpdated,
cluster: &clusterMock,
defaultImage: "pooler:1.0",
defaultInstances: 1,
check: deploymentUpdated,
},
{
subTest: "update image from changed defaults",
@ -197,14 +220,40 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
ConnectionPooler: &acidv1.ConnectionPooler{},
},
},
cluster: &clusterNewDefaultsMock,
check: deploymentUpdated,
cluster: &clusterNewDefaultsMock,
defaultImage: "pooler:2.0",
defaultInstances: 2,
check: deploymentUpdated,
},
{
subTest: "there is no sync from nil to an empty spec",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true),
ConnectionPooler: nil,
},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true),
ConnectionPooler: &acidv1.ConnectionPooler{},
},
},
cluster: &clusterMock,
defaultImage: "pooler:1.0",
defaultInstances: 1,
check: noEmptySync,
},
}
for _, tt := range tests {
err := tt.cluster.syncConnectionPooler(tt.oldSpec, tt.newSpec, mockInstallLookupFunction)
tt.cluster.OpConfig.ConnectionPooler.Image = tt.defaultImage
tt.cluster.OpConfig.ConnectionPooler.NumberOfInstances =
int32ToPointer(tt.defaultInstances)
if err := tt.check(tt.cluster, err); err != nil {
reason, err := tt.cluster.syncConnectionPooler(tt.oldSpec,
tt.newSpec, mockInstallLookupFunction)
if err := tt.check(tt.cluster, err, reason); err != nil {
t.Errorf("%s [%s]: Could not synchronize, %+v",
testName, tt.subTest, err)
}

View File

@ -73,3 +73,8 @@ type ClusterStatus struct {
type TemplateParams map[string]interface{}
type InstallFunction func(schema string, user string) error
type SyncReason []string
// no sync happened, empty value
var NoSync SyncReason = []string{}

View File

@ -169,6 +169,11 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
fromCRD.ConnectionPooler.User,
constants.ConnectionPoolerUserName)
if result.ConnectionPooler.User == result.SuperUsername {
msg := "Connection pool user is not allowed to be the same as super user, username: %s"
panic(fmt.Errorf(msg, result.ConnectionPooler.User))
}
result.ConnectionPooler.Image = util.Coalesce(
fromCRD.ConnectionPooler.Image,
"registry.opensource.zalan.do/acid/pgbouncer")

View File

@ -218,5 +218,11 @@ func validate(cfg *Config) (err error) {
msg := "number of connection pooler instances should be higher than %d"
err = fmt.Errorf(msg, constants.ConnectionPoolerMinInstances)
}
if cfg.ConnectionPooler.User == cfg.SuperUsername {
msg := "Connection pool user is not allowed to be the same as super user, username: %s"
err = fmt.Errorf(msg, cfg.ConnectionPooler.User)
}
return
}