From 07c5da35e3572b8302f33e3dd8fbf5e8ca846229 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 18 Mar 2020 15:02:13 +0100 Subject: [PATCH 1/4] fix minor issues in docs and manifests (#866) * fix minor issues in docs and manifests * double retry_timeout_sec --- docs/reference/cluster_manifest.md | 20 ++++++++++---------- docs/reference/operator_parameters.md | 10 +++++----- docs/user.md | 8 ++++---- e2e/tests/test_e2e.py | 2 +- manifests/complete-postgres-manifest.yaml | 2 +- manifests/minimal-postgres-manifest.yaml | 2 +- manifests/standby-manifest.yaml | 2 +- ui/manifests/deployment.yaml | 2 +- 8 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 92e457d7e..955622843 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -111,12 +111,12 @@ These parameters are grouped directly under the `spec` key in the manifest. value overrides the `pod_toleration` setting from the operator. Optional. * **podPriorityClassName** - a name of the [priority - class](https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/#priorityclass) - that should be assigned to the cluster pods. When not specified, the value - is taken from the `pod_priority_class_name` operator parameter, if not set - then the default priority class is taken. The priority class itself must be - defined in advance. Optional. + a name of the [priority + class](https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/#priorityclass) + that should be assigned to the cluster pods. When not specified, the value + is taken from the `pod_priority_class_name` operator parameter, if not set + then the default priority class is taken. The priority class itself must be + defined in advance. Optional. * **podAnnotations** A map of key value pairs that gets attached as [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) @@ -184,9 +184,9 @@ explanation of `ttl` and `loop_wait` parameters. ``` hostssl all +pamrole all pam ``` - , where pamrole is the name of the role for the pam authentication; any - custom `pg_hba` should include the pam line to avoid breaking pam - authentication. Optional. + where pamrole is the name of the role for the pam authentication; any + custom `pg_hba` should include the pam line to avoid breaking pam + authentication. Optional. * **ttl** Patroni `ttl` parameter value, optional. The default is set by the Spilo @@ -379,4 +379,4 @@ Those parameters are grouped under the `tls` top-level key. * **caFile** Optional filename to the CA certificate. Useful when the client connects - with `sslmode=verify-ca` or `sslmode=verify-full`. + with `sslmode=verify-ca` or `sslmode=verify-full`. Default is empty. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index ba8e73cf8..86eedd33c 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -285,11 +285,11 @@ configuration they are grouped under the `kubernetes` key. capability. The default is `false`. * **master_pod_move_timeout** - The period of time to wait for the success of migration of master pods from - an unschedulable node. The migration includes Patroni switchovers to - respective replicas on healthy nodes. The situation where master pods still - exist on the old node after this timeout expires has to be fixed manually. - The default is 20 minutes. + The period of time to wait for the success of migration of master pods from + an unschedulable node. The migration includes Patroni switchovers to + respective replicas on healthy nodes. The situation where master pods still + exist on the old node after this timeout expires has to be fixed manually. + The default is 20 minutes. * **enable_pod_antiaffinity** toggles [pod anti affinity](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/) diff --git a/docs/user.md b/docs/user.md index 6e71d0404..9a6752185 100644 --- a/docs/user.md +++ b/docs/user.md @@ -30,7 +30,7 @@ spec: databases: foo: zalando postgresql: - version: "11" + version: "12" ``` Once you cloned the Postgres Operator [repository](https://github.com/zalando/postgres-operator) @@ -515,9 +515,9 @@ executed. ## Custom TLS certificates By default, the spilo image generates its own TLS certificate during startup. -This certificate is not secure since it cannot be verified and thus doesn't -protect from active MITM attacks. In this section we show how a Kubernete -Secret resources can be loaded with a custom TLS certificate. +However, this certificate cannot be verified and thus doesn't protect from +active MITM attacks. In this section we show how to specify a custom TLS +certificate which is mounted in the database pods via a K8s Secret. Before applying these changes, the operator must also be configured with the `spilo_fsgroup` set to the GID matching the postgres user group. If the value diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index f6be8a600..f0d8a0b23 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -473,7 +473,7 @@ class K8s: Wraps around K8 api client and helper methods. ''' - RETRY_TIMEOUT_SEC = 5 + RETRY_TIMEOUT_SEC = 10 def __init__(self): self.api = K8sApi() diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index ceb27a5c3..c82f1eac5 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -24,7 +24,7 @@ spec: databases: foo: zalando postgresql: - version: "11" + version: "12" parameters: # Expert section shared_buffers: "32MB" max_connections: "10" diff --git a/manifests/minimal-postgres-manifest.yaml b/manifests/minimal-postgres-manifest.yaml index 75dfdf07f..af0add8e6 100644 --- a/manifests/minimal-postgres-manifest.yaml +++ b/manifests/minimal-postgres-manifest.yaml @@ -16,4 +16,4 @@ spec: databases: foo: zalando # dbname: owner postgresql: - version: "11" + version: "12" diff --git a/manifests/standby-manifest.yaml b/manifests/standby-manifest.yaml index 2b621bd10..4c8d09650 100644 --- a/manifests/standby-manifest.yaml +++ b/manifests/standby-manifest.yaml @@ -9,7 +9,7 @@ spec: size: 1Gi numberOfInstances: 1 postgresql: - version: "11" + version: "12" # Make this a standby cluster and provide the s3 bucket path of source cluster for continuous streaming. standby: s3_wal_path: "s3://path/to/bucket/containing/wal/of/source/cluster/" diff --git a/ui/manifests/deployment.yaml b/ui/manifests/deployment.yaml index 477e4d655..6138ca1a8 100644 --- a/ui/manifests/deployment.yaml +++ b/ui/manifests/deployment.yaml @@ -20,7 +20,7 @@ spec: serviceAccountName: postgres-operator-ui containers: - name: "service" - image: registry.opensource.zalan.do/acid/postgres-operator-ui:v1.3.0 + image: registry.opensource.zalan.do/acid/postgres-operator-ui:v1.4.0 ports: - containerPort: 8081 protocol: "TCP" From cc1ffdc7b6ed83ef2d1be575db8e4aa76f8c6e57 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 25 Mar 2020 09:31:30 +0100 Subject: [PATCH 2/4] enable controllerID for chart and allow configurable pod cluster role (#876) --- charts/postgres-operator/templates/_helpers.tpl | 14 ++++++++++++++ .../templates/clusterrole-postgres-pod.yaml | 2 +- .../postgres-operator/templates/configmap.yaml | 1 + .../postgres-operator/templates/deployment.yaml | 4 ++++ .../templates/operatorconfiguration.yaml | 1 + charts/postgres-operator/values-crd.yaml | 16 ++++++++++++++-- charts/postgres-operator/values.yaml | 16 ++++++++++++++-- 7 files changed, 49 insertions(+), 5 deletions(-) diff --git a/charts/postgres-operator/templates/_helpers.tpl b/charts/postgres-operator/templates/_helpers.tpl index 306613ac3..e49670763 100644 --- a/charts/postgres-operator/templates/_helpers.tpl +++ b/charts/postgres-operator/templates/_helpers.tpl @@ -31,6 +31,20 @@ Create a service account name. {{ default (include "postgres-operator.fullname" .) .Values.serviceAccount.name }} {{- end -}} +{{/* +Create a pod service account name. +*/}} +{{- define "postgres-pod.serviceAccountName" -}} +{{ default (printf "%s-%v" (include "postgres-operator.fullname" .) "pod") .Values.podServiceAccount.name }} +{{- end -}} + +{{/* +Create a controller ID. +*/}} +{{- define "postgres-operator.controllerID" -}} +{{ default (include "postgres-operator.fullname" .) .Values.controllerID.name }} +{{- end -}} + {{/* Create chart name and version as used by the chart label. */}} diff --git a/charts/postgres-operator/templates/clusterrole-postgres-pod.yaml b/charts/postgres-operator/templates/clusterrole-postgres-pod.yaml index c327d9101..ef607ae3c 100644 --- a/charts/postgres-operator/templates/clusterrole-postgres-pod.yaml +++ b/charts/postgres-operator/templates/clusterrole-postgres-pod.yaml @@ -2,7 +2,7 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: postgres-pod + name: {{ include "postgres-pod.serviceAccountName" . }} labels: app.kubernetes.io/name: {{ template "postgres-operator.name" . }} helm.sh/chart: {{ template "postgres-operator.chart" . }} diff --git a/charts/postgres-operator/templates/configmap.yaml b/charts/postgres-operator/templates/configmap.yaml index 0b976294e..00ebc6676 100644 --- a/charts/postgres-operator/templates/configmap.yaml +++ b/charts/postgres-operator/templates/configmap.yaml @@ -9,6 +9,7 @@ metadata: app.kubernetes.io/managed-by: {{ .Release.Service }} app.kubernetes.io/instance: {{ .Release.Name }} data: + pod_service_account_name: {{ include "postgres-pod.serviceAccountName" . }} {{ toYaml .Values.configGeneral | indent 2 }} {{ toYaml .Values.configUsers | indent 2 }} {{ toYaml .Values.configKubernetes | indent 2 }} diff --git a/charts/postgres-operator/templates/deployment.yaml b/charts/postgres-operator/templates/deployment.yaml index 1f7e39bbc..2d8eebcb3 100644 --- a/charts/postgres-operator/templates/deployment.yaml +++ b/charts/postgres-operator/templates/deployment.yaml @@ -43,6 +43,10 @@ spec: {{- else }} - name: POSTGRES_OPERATOR_CONFIGURATION_OBJECT value: {{ template "postgres-operator.fullname" . }} + {{- end }} + {{- if .Values.controllerID.create }} + - name: CONTROLLER_ID + value: {{ template "postgres-operator.controllerID" . }} {{- end }} resources: {{ toYaml .Values.resources | indent 10 }} diff --git a/charts/postgres-operator/templates/operatorconfiguration.yaml b/charts/postgres-operator/templates/operatorconfiguration.yaml index 06e9c7605..ccbbf59c6 100644 --- a/charts/postgres-operator/templates/operatorconfiguration.yaml +++ b/charts/postgres-operator/templates/operatorconfiguration.yaml @@ -13,6 +13,7 @@ configuration: users: {{ toYaml .Values.configUsers | indent 4 }} kubernetes: + pod_service_account_name: {{ include "postgres-pod.serviceAccountName" . }} oauth_token_secret_name: {{ template "postgres-operator.fullname" . }} {{ toYaml .Values.configKubernetes | indent 4 }} postgres_pod_resources: diff --git a/charts/postgres-operator/values-crd.yaml b/charts/postgres-operator/values-crd.yaml index d170e0b77..f9c20d8d6 100644 --- a/charts/postgres-operator/values-crd.yaml +++ b/charts/postgres-operator/values-crd.yaml @@ -103,8 +103,6 @@ configKubernetes: # service account definition as JSON/YAML string to be used by postgres cluster pods # pod_service_account_definition: "" - # name of service account to be used by postgres cluster pods - pod_service_account_name: "postgres-pod" # role binding definition as JSON/YAML string to be used by pod service account # pod_service_account_role_binding_definition: "" @@ -284,6 +282,11 @@ serviceAccount: # If not set and create is true, a name is generated using the fullname template name: +podServiceAccount: + # The name of the ServiceAccount to be used by postgres cluster pods + # If not set a name is generated using the fullname template and "-pod" suffix + name: "postgres-pod" + priorityClassName: "" resources: @@ -305,3 +308,12 @@ tolerations: [] # Node labels for pod assignment # Ref: https://kubernetes.io/docs/user-guide/node-selection/ nodeSelector: {} + +controllerID: + # Specifies whether a controller ID should be defined for the operator + # Note, all postgres manifest must then contain the following annotation to be found by this operator + # "acid.zalan.do/controller": + create: false + # The name of the controller ID to use. + # If not set and create is true, a name is generated using the fullname template + name: diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index b6f18f305..ea1028f23 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -96,8 +96,6 @@ configKubernetes: # service account definition as JSON/YAML string to be used by postgres cluster pods # pod_service_account_definition: "" - # name of service account to be used by postgres cluster pods - pod_service_account_name: "postgres-pod" # role binding definition as JSON/YAML string to be used by pod service account # pod_service_account_role_binding_definition: "" @@ -260,6 +258,11 @@ serviceAccount: # If not set and create is true, a name is generated using the fullname template name: +podServiceAccount: + # The name of the ServiceAccount to be used by postgres cluster pods + # If not set a name is generated using the fullname template and "-pod" suffix + name: "postgres-pod" + priorityClassName: "" resources: @@ -281,3 +284,12 @@ tolerations: [] # Node labels for pod assignment # Ref: https://kubernetes.io/docs/user-guide/node-selection/ nodeSelector: {} + +controllerID: + # Specifies whether a controller ID should be defined for the operator + # Note, all postgres manifest must then contain the following annotation to be found by this operator + # "acid.zalan.do/controller": + create: false + # The name of the controller ID to use. + # If not set and create is true, a name is generated using the fullname template + name: From 579f78864bbe67ad4068e02967c272d6c68c19a9 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 25 Mar 2020 09:59:54 +0100 Subject: [PATCH 3/4] pass cluster labels as JSON to Spilo (#877) --- pkg/cluster/k8sres.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index aaa27384a..39b5c16a0 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -549,10 +549,6 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri Name: "KUBERNETES_ROLE_LABEL", Value: c.OpConfig.PodRoleLabel, }, - { - Name: "KUBERNETES_LABELS", - Value: labels.Set(c.OpConfig.ClusterLabels).String(), - }, { Name: "PGPASSWORD_SUPERUSER", ValueFrom: &v1.EnvVarSource{ @@ -588,6 +584,12 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri Value: c.OpConfig.PamRoleName, }, } + // Spilo expects cluster labels as JSON + if clusterLabels, err := json.Marshal(labels.Set(c.OpConfig.ClusterLabels)); err != nil { + envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_LABELS", Value: labels.Set(c.OpConfig.ClusterLabels).String()}) + } else { + envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_LABELS", Value: string(clusterLabels)}) + } if spiloConfiguration != "" { envVars = append(envVars, v1.EnvVar{Name: "SPILO_CONFIGURATION", Value: spiloConfiguration}) } From 9dfa433363cbe997baf69a99c7046166fa966fa3 Mon Sep 17 00:00:00 2001 From: Dmitry Dolgov <9erthalion6@gmail.com> Date: Wed, 25 Mar 2020 12:57:26 +0100 Subject: [PATCH 4/4] Connection pooler (#799) Connection pooler support Add support for a connection pooler. The idea is to make it generic enough to be able to switch between different implementations (e.g. pgbouncer or odyssey). Operator needs to create a deployment with pooler and a service for it to access. For connection pool to work properly, a database needs to be prepared by operator, namely a separate user have to be created with an access to an installed lookup function (to fetch credential for other users). This setups is supposed to be used only by robot/application users. Usually a connection pool implementation is more CPU bounded, so it makes sense to create several pods for connection pool with more emphasize on cpu resources. At the moment there are no special affinity or tolerations assigned to bring those pods closer to the database. For availability purposes minimal number of connection pool pods is 2, ideally they have to be distributed between different nodes/AZ, but it's not enforced in the operator itself. Available configuration supposed to be ergonomic and in the normal case require minimum changes to a manifest to enable connection pool. To have more control over the configuration and functionality on the pool side one can customize the corresponding docker image. Co-authored-by: Felix Kunde --- .../crds/operatorconfigurations.yaml | 41 ++ .../postgres-operator/crds/postgresqls.yaml | 51 +++ .../templates/clusterrole.yaml | 1 + .../templates/configmap.yaml | 1 + .../templates/operatorconfiguration.yaml | 2 + charts/postgres-operator/values-crd.yaml | 19 + charts/postgres-operator/values.yaml | 20 + docker/DebugDockerfile | 13 +- docs/reference/cluster_manifest.md | 34 ++ docs/reference/operator_parameters.md | 37 ++ docs/user.md | 53 +++ e2e/tests/test_e2e.py | 165 +++++++- manifests/configmap.yaml | 10 + manifests/operator-service-account-rbac.yaml | 1 + manifests/operatorconfiguration.crd.yaml | 41 ++ ...gresql-operator-default-configuration.yaml | 11 + manifests/postgresql.crd.yaml | 51 +++ pkg/apis/acid.zalan.do/v1/crds.go | 124 +++++- .../v1/operator_configuration_type.go | 29 +- pkg/apis/acid.zalan.do/v1/postgresql_type.go | 26 ++ .../acid.zalan.do/v1/zz_generated.deepcopy.go | 64 +++ pkg/cluster/cluster.go | 205 +++++++++- pkg/cluster/cluster_test.go | 18 + pkg/cluster/database.go | 125 +++++- pkg/cluster/k8sres.go | 380 +++++++++++++++++- pkg/cluster/k8sres_test.go | 372 +++++++++++++++++ pkg/cluster/resources.go | 157 ++++++++ pkg/cluster/resources_test.go | 127 ++++++ pkg/cluster/sync.go | 182 ++++++++- pkg/cluster/sync_test.go | 212 ++++++++++ pkg/cluster/types.go | 4 + pkg/cluster/util.go | 39 +- pkg/controller/operator_config.go | 51 +++ pkg/spec/types.go | 6 +- pkg/util/config/config.go | 21 + pkg/util/constants/pooler.go | 18 + pkg/util/constants/roles.go | 23 +- pkg/util/k8sutil/k8sutil.go | 170 +++++++- pkg/util/util.go | 34 ++ 39 files changed, 2885 insertions(+), 53 deletions(-) create mode 100644 pkg/cluster/resources_test.go create mode 100644 pkg/cluster/sync_test.go create mode 100644 pkg/util/constants/pooler.go diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml index 9725c2708..7e3b607c0 100644 --- a/charts/postgres-operator/crds/operatorconfigurations.yaml +++ b/charts/postgres-operator/crds/operatorconfigurations.yaml @@ -318,6 +318,47 @@ spec: pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' scalyr_server_url: type: string + connection_pool: + type: object + properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_image: + type: string + #default: "registry.opensource.zalan.do/acid/pgbouncer" + connection_pool_max_db_connections: + type: integer + #default: 60 + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_number_of_instances: + type: integer + minimum: 2 + #default: 2 + connection_pool_default_cpu_limit: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_cpu_request: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "500m" + connection_pool_default_memory_limit: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" + connection_pool_default_memory_request: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" status: type: object additionalProperties: diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index af535e2c8..a4c0e4f3a 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -106,6 +106,55 @@ spec: uid: format: uuid type: string + connectionPool: + type: object + properties: + dockerImage: + type: string + maxDBConnections: + type: integer + mode: + type: string + enum: + - "session" + - "transaction" + numberOfInstances: + type: integer + minimum: 2 + resources: + type: object + required: + - requests + - limits + properties: + limits: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + requests: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + schema: + type: string + user: + type: string databases: type: object additionalProperties: @@ -113,6 +162,8 @@ spec: # Note: usernames specified here as database owners must be declared in the users key of the spec key. dockerImage: type: string + enableConnectionPool: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index 7b3dd462d..38ce85e7a 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -128,6 +128,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/charts/postgres-operator/templates/configmap.yaml b/charts/postgres-operator/templates/configmap.yaml index 00ebc6676..e8a805db7 100644 --- a/charts/postgres-operator/templates/configmap.yaml +++ b/charts/postgres-operator/templates/configmap.yaml @@ -20,4 +20,5 @@ data: {{ toYaml .Values.configDebug | indent 2 }} {{ toYaml .Values.configLoggingRestApi | indent 2 }} {{ toYaml .Values.configTeamsApi | indent 2 }} +{{ toYaml .Values.configConnectionPool | indent 2 }} {{- end }} diff --git a/charts/postgres-operator/templates/operatorconfiguration.yaml b/charts/postgres-operator/templates/operatorconfiguration.yaml index ccbbf59c6..b52b3d664 100644 --- a/charts/postgres-operator/templates/operatorconfiguration.yaml +++ b/charts/postgres-operator/templates/operatorconfiguration.yaml @@ -34,4 +34,6 @@ configuration: {{ toYaml .Values.configLoggingRestApi | indent 4 }} scalyr: {{ toYaml .Values.configScalyr | indent 4 }} + connection_pool: +{{ toYaml .Values.configConnectionPool | indent 4 }} {{- end }} diff --git a/charts/postgres-operator/values-crd.yaml b/charts/postgres-operator/values-crd.yaml index f9c20d8d6..cf9fbab15 100644 --- a/charts/postgres-operator/values-crd.yaml +++ b/charts/postgres-operator/values-crd.yaml @@ -267,6 +267,25 @@ configScalyr: # Memory request value for the Scalyr sidecar scalyr_memory_request: 50Mi +configConnectionPool: + # db schema to install lookup function into + connection_pool_schema: "pooler" + # db user for pooler to use + connection_pool_user: "pooler" + # docker image + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" + # max db connections the pooler should hold + connection_pool_max_db_connections: 60 + # default pooling mode + connection_pool_mode: "transaction" + # number of pooler instances + connection_pool_number_of_instances: 2 + # default resources + connection_pool_default_cpu_request: 500m + connection_pool_default_memory_request: 100Mi + connection_pool_default_cpu_limit: "1" + connection_pool_default_memory_limit: 100Mi + rbac: # Specifies whether RBAC resources should be created create: true diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index ea1028f23..503bf4562 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -243,6 +243,26 @@ configTeamsApi: # URL of the Teams API service # teams_api_url: http://fake-teams-api.default.svc.cluster.local +# configure connection pooler deployment created by the operator +configConnectionPool: + # db schema to install lookup function into + connection_pool_schema: "pooler" + # db user for pooler to use + connection_pool_user: "pooler" + # docker image + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" + # max db connections the pooler should hold + connection_pool_max_db_connections: 60 + # default pooling mode + connection_pool_mode: "transaction" + # number of pooler instances + connection_pool_number_of_instances: 2 + # default resources + connection_pool_default_cpu_request: 500m + connection_pool_default_memory_request: 100Mi + connection_pool_default_cpu_limit: "1" + connection_pool_default_memory_limit: 100Mi + rbac: # Specifies whether RBAC resources should be created create: true diff --git a/docker/DebugDockerfile b/docker/DebugDockerfile index 76dadf6df..0c11fe3b4 100644 --- a/docker/DebugDockerfile +++ b/docker/DebugDockerfile @@ -3,8 +3,17 @@ MAINTAINER Team ACID @ Zalando # We need root certificates to deal with teams api over https RUN apk --no-cache add ca-certificates go git musl-dev -RUN go get github.com/derekparker/delve/cmd/dlv COPY build/* / -CMD ["/root/go/bin/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] +RUN addgroup -g 1000 pgo +RUN adduser -D -u 1000 -G pgo -g 'Postgres Operator' pgo + +RUN go get github.com/derekparker/delve/cmd/dlv +RUN cp /root/go/bin/dlv /dlv +RUN chown -R pgo:pgo /dlv + +USER pgo:pgo +RUN ls -l / + +CMD ["/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 955622843..4400cb666 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -140,6 +140,11 @@ These parameters are grouped directly under the `spec` key in the manifest. is `false`, then no volume will be mounted no matter how operator was configured (so you can override the operator configuration). Optional. +* **enableConnectionPool** + Tells the operator to create a connection pool with a database. If this + field is true, a connection pool deployment will be created even if + `connectionPool` section is empty. Optional, not set by default. + * **enableLogicalBackup** Determines if the logical backup of this cluster should be taken and uploaded to S3. Default: false. Optional. @@ -360,6 +365,35 @@ CPU and memory limits for the sidecar container. memory limits for the sidecar container. Optional, overrides the `default_memory_limits` operator configuration parameter. Optional. +## Connection pool + +Parameters are grouped under the `connectionPool` top-level key and specify +configuration for connection pool. If this section is not empty, a connection +pool will be created for a database even if `enableConnectionPool` is not +present. + +* **numberOfInstances** + How many instances of connection pool to create. + +* **schema** + Schema to create for credentials lookup function. + +* **user** + User to create for connection pool to be able to connect to a database. + +* **dockerImage** + Which docker image to use for connection pool deployment. + +* **maxDBConnections** + How many connections the pooler can max hold. This value is divided among the + pooler pods. + +* **mode** + In which mode to run connection pool, transaction or session. + +* **resources** + Resource configuration for connection pool deployment. + ## Custom TLS certificates Those parameters are grouped under the `tls` top-level key. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 86eedd33c..848fa1cf2 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -595,3 +595,40 @@ scalyr sidecar. In the CRD-based configuration they are grouped under the * **scalyr_memory_limit** Memory limit value for the Scalyr sidecar. The default is `500Mi`. + +## Connection pool configuration + +Parameters are grouped under the `connection_pool` top-level key and specify +default configuration for connection pool, if a postgres manifest requests it +but do not specify some of the parameters. All of them are optional with the +operator being able to provide some reasonable defaults. + +* **connection_pool_number_of_instances** + How many instances of connection pool to create. Default is 2 which is also + the required minimum. + +* **connection_pool_schema** + Schema to create for credentials lookup function. Default is `pooler`. + +* **connection_pool_user** + User to create for connection pool to be able to connect to a database. + Default is `pooler`. + +* **connection_pool_image** + Docker image to use for connection pool deployment. + Default: "registry.opensource.zalan.do/acid/pgbouncer" + +* **connection_pool_max_db_connections** + How many connections the pooler can max hold. This value is divided among the + pooler pods. Default is 60 which will make up 30 connections per pod for the + default setup with two instances. + +* **connection_pool_mode** + Default pool mode, `session` or `transaction`. Default is `transaction`. + +* **connection_pool_default_cpu_request** + **connection_pool_default_memory_reques** + **connection_pool_default_cpu_limit** + **connection_pool_default_memory_limit** + Default resource configuration for connection pool deployment. The internal + default for memory request and limit is `100Mi`, for CPU it is `500m` and `1`. diff --git a/docs/user.md b/docs/user.md index 9a6752185..8c79bb485 100644 --- a/docs/user.md +++ b/docs/user.md @@ -512,6 +512,59 @@ monitoring is outside the scope of operator responsibilities. See [administrator documentation](administrator.md) for details on how backups are executed. +## Connection pool + +The operator can create a database side connection pool for those applications, +where an application side pool is not feasible, but a number of connections is +high. To create a connection pool together with a database, modify the +manifest: + +```yaml +spec: + enableConnectionPool: true +``` + +This will tell the operator to create a connection pool with default +configuration, through which one can access the master via a separate service +`{cluster-name}-pooler`. In most of the cases provided default configuration +should be good enough. + +To configure a new connection pool, specify: + +``` +spec: + connectionPool: + # how many instances of connection pool to create + number_of_instances: 2 + + # in which mode to run, session or transaction + mode: "transaction" + + # schema, which operator will create to install credentials lookup + # function + schema: "pooler" + + # user, which operator will create for connection pool + user: "pooler" + + # resources for each instance + resources: + requests: + cpu: 500m + memory: 100Mi + limits: + cpu: "1" + memory: 100Mi +``` + +By default `pgbouncer` is used to create a connection pool. To find out about +pool modes see [docs](https://www.pgbouncer.org/config.html#pool_mode) (but it +should be general approach between different implementation). + +Note, that using `pgbouncer` means meaningful resource CPU limit should be less +than 1 core (there is a way to utilize more than one, but in K8S it's easier +just to spin up more instances). + ## Custom TLS certificates By default, the spilo image generates its own TLS certificate during startup. diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index f0d8a0b23..cc90aa5e2 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -9,6 +9,10 @@ import yaml from kubernetes import client, config +def to_selector(labels): + return ",".join(["=".join(l) for l in labels.items()]) + + class EndToEndTestCase(unittest.TestCase): ''' Test interaction of the operator with multiple K8s components. @@ -47,7 +51,8 @@ class EndToEndTestCase(unittest.TestCase): for filename in ["operator-service-account-rbac.yaml", "configmap.yaml", "postgres-operator.yaml"]: - k8s.create_with_kubectl("manifests/" + filename) + result = k8s.create_with_kubectl("manifests/" + filename) + print("stdout: {}, stderr: {}".format(result.stdout, result.stderr)) k8s.wait_for_operator_pod_start() @@ -55,9 +60,14 @@ class EndToEndTestCase(unittest.TestCase): 'default', label_selector='name=postgres-operator').items[0].spec.containers[0].image print("Tested operator image: {}".format(actual_operator_image)) # shows up after tests finish - k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") - k8s.wait_for_pod_start('spilo-role=master') - k8s.wait_for_pod_start('spilo-role=replica') + result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") + print('stdout: {}, stderr: {}'.format(result.stdout, result.stderr)) + try: + k8s.wait_for_pod_start('spilo-role=master') + k8s.wait_for_pod_start('spilo-role=replica') + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_enable_load_balancer(self): @@ -66,7 +76,7 @@ class EndToEndTestCase(unittest.TestCase): ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' # enable load balancer services pg_patch_enable_lbs = { @@ -178,7 +188,7 @@ class EndToEndTestCase(unittest.TestCase): Lower resource limits below configured minimum and let operator fix it ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' labels = 'spilo-role=master,' + cluster_label _, failover_targets = k8s.get_pg_nodes(cluster_label) @@ -247,7 +257,7 @@ class EndToEndTestCase(unittest.TestCase): Remove node readiness label from master node. This must cause a failover. ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' labels = 'spilo-role=master,' + cluster_label readiness_label = 'lifecycle-status' readiness_value = 'ready' @@ -289,7 +299,7 @@ class EndToEndTestCase(unittest.TestCase): Scale up from 2 to 3 and back to 2 pods by updating the Postgres manifest at runtime. ''' k8s = self.k8s - labels = "cluster-name=acid-minimal-cluster" + labels = "application=spilo,cluster-name=acid-minimal-cluster" k8s.wait_for_pg_to_scale(3) self.assertEqual(3, k8s.count_pods_with_label(labels)) @@ -339,13 +349,99 @@ class EndToEndTestCase(unittest.TestCase): } k8s.update_config(unpatch_custom_service_annotations) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_enable_disable_connection_pool(self): + ''' + For a database without connection pool, then turns it on, scale up, + turn off and on again. Test with different ways of doing this (via + enableConnectionPool or connectionPool configuration section). At the + end turn the connection pool off to not interfere with other tests. + ''' + k8s = self.k8s + service_labels = { + 'cluster-name': 'acid-minimal-cluster', + } + pod_labels = dict({ + 'connection-pool': 'acid-minimal-cluster-pooler', + }) + + pod_selector = to_selector(pod_labels) + service_selector = to_selector(service_labels) + + try: + # enable connection pool + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPool': True, + } + }) + k8s.wait_for_pod_start(pod_selector) + + pods = k8s.api.core_v1.list_namespaced_pod( + 'default', label_selector=pod_selector + ).items + + self.assertTrue(pods, 'No connection pool pods') + + k8s.wait_for_service(service_selector) + services = k8s.api.core_v1.list_namespaced_service( + 'default', label_selector=service_selector + ).items + services = [ + s for s in services + if s.metadata.name.endswith('pooler') + ] + + self.assertTrue(services, 'No connection pool service') + + # scale up connection pool deployment + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'connectionPool': { + 'numberOfInstances': 2, + }, + } + }) + + k8s.wait_for_running_pods(pod_selector, 2) + + # turn it off, keeping configuration section + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPool': False, + } + }) + k8s.wait_for_pods_to_stop(pod_selector) + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPool': True, + } + }) + k8s.wait_for_pod_start(pod_selector) + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_taint_based_eviction(self): ''' Add taint "postgres=:NoExecute" to node with master. This must cause a failover. ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' # get nodes of master and replica(s) (expected target of new master) current_master_node, current_replica_nodes = k8s.get_pg_nodes(cluster_label) @@ -496,12 +592,39 @@ class K8s: # for local execution ~ 10 seconds suffices time.sleep(60) + def get_operator_pod(self): + pods = self.api.core_v1.list_namespaced_pod( + 'default', label_selector='name=postgres-operator' + ).items + + if pods: + return pods[0] + + return None + + def get_operator_log(self): + operator_pod = self.get_operator_pod() + pod_name = operator_pod.metadata.name + return self.api.core_v1.read_namespaced_pod_log( + name=pod_name, + namespace='default' + ) + def wait_for_pod_start(self, pod_labels, namespace='default'): pod_phase = 'No pod running' while pod_phase != 'Running': pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items if pods: pod_phase = pods[0].status.phase + + if pods and pod_phase != 'Running': + pod_name = pods[0].metadata.name + response = self.api.core_v1.read_namespaced_pod( + name=pod_name, + namespace=namespace + ) + print("Pod description {}".format(response)) + time.sleep(self.RETRY_TIMEOUT_SEC) def get_service_type(self, svc_labels, namespace='default'): @@ -531,10 +654,27 @@ class K8s: _ = self.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body) - labels = 'cluster-name=acid-minimal-cluster' + labels = 'application=spilo,cluster-name=acid-minimal-cluster' while self.count_pods_with_label(labels) != number_of_instances: time.sleep(self.RETRY_TIMEOUT_SEC) + def wait_for_running_pods(self, labels, number, namespace=''): + while self.count_pods_with_label(labels) != number: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_pods_to_stop(self, labels, namespace=''): + while self.count_pods_with_label(labels) != 0: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_service(self, labels, namespace='default'): + def get_services(): + return self.api.core_v1.list_namespaced_service( + namespace, label_selector=labels + ).items + + while not get_services(): + time.sleep(self.RETRY_TIMEOUT_SEC) + def count_pods_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items) @@ -571,7 +711,10 @@ class K8s: self.wait_for_operator_pod_start() def create_with_kubectl(self, path): - subprocess.run(["kubectl", "create", "-f", path]) + return subprocess.run( + ["kubectl", "create", "-f", path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) if __name__ == '__main__': diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index 0300b5495..fdc2d5d56 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -11,6 +11,16 @@ data: cluster_history_entries: "1000" cluster_labels: application:spilo cluster_name_label: cluster-name + # connection_pool_default_cpu_limit: "1" + # connection_pool_default_cpu_request: "500m" + # connection_pool_default_memory_limit: 100Mi + # connection_pool_default_memory_request: 100Mi + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5" + # connection_pool_max_db_connections: 60 + # connection_pool_mode: "transaction" + # connection_pool_number_of_instances: 2 + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" # custom_service_annotations: "keyx:valuez,keya:valuea" # custom_pod_annotations: "keya:valuea,keyb:valueb" db_hosted_zone: db.example.com diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index e5bc49f83..83cd721e7 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -129,6 +129,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index 7bd5c529c..4e6858af8 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -294,6 +294,47 @@ spec: pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' scalyr_server_url: type: string + connection_pool: + type: object + properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_image: + type: string + #default: "registry.opensource.zalan.do/acid/pgbouncer" + connection_pool_max_db_connections: + type: integer + #default: 60 + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_number_of_instances: + type: integer + minimum: 2 + #default: 2 + connection_pool_default_cpu_limit: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_cpu_request: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "500m" + connection_pool_default_memory_limit: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" + connection_pool_default_memory_request: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" status: type: object additionalProperties: diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 33838b2a9..d4c9b518f 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -121,3 +121,14 @@ configuration: scalyr_memory_limit: 500Mi scalyr_memory_request: 50Mi # scalyr_server_url: "" + connection_pool: + connection_pool_default_cpu_limit: "1" + connection_pool_default_cpu_request: "500m" + connection_pool_default_memory_limit: 100Mi + connection_pool_default_memory_request: 100Mi + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5" + # connection_pool_max_db_connections: 60 + connection_pool_mode: "transaction" + connection_pool_number_of_instances: 2 + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 04c789fb9..06434da14 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -70,6 +70,55 @@ spec: uid: format: uuid type: string + connectionPool: + type: object + properties: + dockerImage: + type: string + maxDBConnections: + type: integer + mode: + type: string + enum: + - "session" + - "transaction" + numberOfInstances: + type: integer + minimum: 2 + resources: + type: object + required: + - requests + - limits + properties: + limits: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + requests: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + schema: + type: string + user: + type: string databases: type: object additionalProperties: @@ -77,6 +126,8 @@ spec: # Note: usernames specified here as database owners must be declared in the users key of the spec key. dockerImage: type: string + enableConnectionPool: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index eff47c255..dc552d3f4 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -105,6 +105,7 @@ var OperatorConfigCRDResourceColumns = []apiextv1beta1.CustomResourceColumnDefin var min0 = 0.0 var min1 = 1.0 +var min2 = 2.0 var minDisable = -1.0 // PostgresCRDResourceValidation to check applied manifest parameters @@ -176,6 +177,76 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ }, }, }, + "connectionPool": { + Type: "object", + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "dockerImage": { + Type: "string", + }, + "maxDBConnections": { + Type: "integer", + }, + "mode": { + Type: "string", + Enum: []apiextv1beta1.JSON{ + { + Raw: []byte(`"session"`), + }, + { + Raw: []byte(`"transaction"`), + }, + }, + }, + "numberOfInstances": { + Type: "integer", + Minimum: &min2, + }, + "resources": { + Type: "object", + Required: []string{"requests", "limits"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "limits": { + Type: "object", + Required: []string{"cpu", "memory"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "cpu": { + Type: "string", + Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "memory": { + Type: "string", + Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + }, + }, + "requests": { + Type: "object", + Required: []string{"cpu", "memory"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "cpu": { + Type: "string", + Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "memory": { + Type: "string", + Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + }, + }, + }, + }, + "schema": { + Type: "string", + }, + "user": { + Type: "string", + }, + }, + }, "databases": { Type: "object", AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{ @@ -188,6 +259,9 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ "dockerImage": { Type: "string", }, + "enableConnectionPool": { + Type: "boolean", + }, "enableLogicalBackup": { Type: "boolean", }, @@ -418,7 +492,7 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ Type: "string", }, "tls": { - Type: "object", + Type: "object", Required: []string{"secretName"}, Properties: map[string]apiextv1beta1.JSONSchemaProps{ "secretName": { @@ -1055,6 +1129,54 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation }, }, }, + "connection_pool": { + Type: "object", + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "connection_pool_default_cpu_limit": { + Type: "string", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "connection_pool_default_cpu_request": { + Type: "string", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "connection_pool_default_memory_limit": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "connection_pool_default_memory_request": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "connection_pool_image": { + Type: "string", + }, + "connection_pool_max_db_connections": { + Type: "integer", + }, + "connection_pool_mode": { + Type: "string", + Enum: []apiextv1beta1.JSON{ + { + Raw: []byte(`"session"`), + }, + { + Raw: []byte(`"transaction"`), + }, + }, + }, + "connection_pool_number_of_instances": { + Type: "integer", + Minimum: &min2, + }, + "connection_pool_schema": { + Type: "string", + }, + "connection_pool_user": { + Type: "string", + }, + }, + }, }, }, "status": { diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index 8463be6bd..7c1d2b7e8 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -1,5 +1,7 @@ package v1 +// Operator configuration CRD definition, please use snake_case for field names. + import ( "github.com/zalando/postgres-operator/pkg/util/config" @@ -65,12 +67,12 @@ type KubernetesMetaConfiguration struct { // TODO: use a proper toleration structure? PodToleration map[string]string `json:"toleration,omitempty"` // TODO: use namespacedname - PodEnvironmentConfigMap string `json:"pod_environment_configmap,omitempty"` - PodPriorityClassName string `json:"pod_priority_class_name,omitempty"` - MasterPodMoveTimeout Duration `json:"master_pod_move_timeout,omitempty"` - EnablePodAntiAffinity bool `json:"enable_pod_antiaffinity,omitempty"` - PodAntiAffinityTopologyKey string `json:"pod_antiaffinity_topology_key,omitempty"` - PodManagementPolicy string `json:"pod_management_policy,omitempty"` + PodEnvironmentConfigMap string `json:"pod_environment_configmap,omitempty"` + PodPriorityClassName string `json:"pod_priority_class_name,omitempty"` + MasterPodMoveTimeout Duration `json:"master_pod_move_timeout,omitempty"` + EnablePodAntiAffinity bool `json:"enable_pod_antiaffinity,omitempty"` + PodAntiAffinityTopologyKey string `json:"pod_antiaffinity_topology_key,omitempty"` + PodManagementPolicy string `json:"pod_management_policy,omitempty"` } // PostgresPodResourcesDefaults defines the spec of default resources @@ -152,6 +154,20 @@ type ScalyrConfiguration struct { ScalyrMemoryLimit string `json:"scalyr_memory_limit,omitempty"` } +// Defines default configuration for connection pool +type ConnectionPoolConfiguration struct { + NumberOfInstances *int32 `json:"connection_pool_number_of_instances,omitempty"` + Schema string `json:"connection_pool_schema,omitempty"` + User string `json:"connection_pool_user,omitempty"` + Image string `json:"connection_pool_image,omitempty"` + Mode string `json:"connection_pool_mode,omitempty"` + MaxDBConnections *int32 `json:"connection_pool_max_db_connections,omitempty"` + DefaultCPURequest string `json:"connection_pool_default_cpu_request,omitempty"` + DefaultMemoryRequest string `json:"connection_pool_default_memory_request,omitempty"` + DefaultCPULimit string `json:"connection_pool_default_cpu_limit,omitempty"` + DefaultMemoryLimit string `json:"connection_pool_default_memory_limit,omitempty"` +} + // OperatorLogicalBackupConfiguration defines configuration for logical backup type OperatorLogicalBackupConfiguration struct { Schedule string `json:"logical_backup_schedule,omitempty"` @@ -188,6 +204,7 @@ type OperatorConfigurationData struct { LoggingRESTAPI LoggingRESTAPIConfiguration `json:"logging_rest_api"` Scalyr ScalyrConfiguration `json:"scalyr"` LogicalBackup OperatorLogicalBackupConfiguration `json:"logical_backup"` + ConnectionPool ConnectionPoolConfiguration `json:"connection_pool"` } //Duration shortens this frequently used name diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 862db6a4e..1784f8235 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -1,5 +1,7 @@ package v1 +// Postgres CRD definition, please use CamelCase for field names. + import ( "time" @@ -27,6 +29,9 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` + EnableConnectionPool *bool `json:"enableConnectionPool,omitempty"` + ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` + TeamID string `json:"teamId"` DockerImage string `json:"dockerImage,omitempty"` @@ -162,3 +167,24 @@ type UserFlags []string type PostgresStatus struct { PostgresClusterStatus string `json:"PostgresClusterStatus"` } + +// Options for connection pooler +// +// TODO: prepared snippets of configuration, one can choose via type, e.g. +// pgbouncer-large (with higher resources) or odyssey-small (with smaller +// resources) +// Type string `json:"type,omitempty"` +// +// TODO: figure out what other important parameters of the connection pool it +// makes sense to expose. E.g. pool size (min/max boundaries), max client +// connections etc. +type ConnectionPool struct { + NumberOfInstances *int32 `json:"numberOfInstances,omitempty"` + Schema string `json:"schema,omitempty"` + User string `json:"user,omitempty"` + Mode string `json:"mode,omitempty"` + DockerImage string `json:"dockerImage,omitempty"` + MaxDBConnections *int32 `json:"maxDBConnections,omitempty"` + + Resources `json:"resources,omitempty"` +} diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 753b0490c..fcab394ca 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -68,6 +68,59 @@ func (in *CloneDescription) DeepCopy() *CloneDescription { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionPool) DeepCopyInto(out *ConnectionPool) { + *out = *in + if in.NumberOfInstances != nil { + in, out := &in.NumberOfInstances, &out.NumberOfInstances + *out = new(int32) + **out = **in + } + if in.MaxDBConnections != nil { + in, out := &in.MaxDBConnections, &out.MaxDBConnections + *out = new(int32) + **out = **in + } + out.Resources = in.Resources + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPool. +func (in *ConnectionPool) DeepCopy() *ConnectionPool { + if in == nil { + return nil + } + out := new(ConnectionPool) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionPoolConfiguration) DeepCopyInto(out *ConnectionPoolConfiguration) { + *out = *in + if in.NumberOfInstances != nil { + in, out := &in.NumberOfInstances, &out.NumberOfInstances + *out = new(int32) + **out = **in + } + if in.MaxDBConnections != nil { + in, out := &in.MaxDBConnections, &out.MaxDBConnections + *out = new(int32) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPoolConfiguration. +func (in *ConnectionPoolConfiguration) DeepCopy() *ConnectionPoolConfiguration { + if in == nil { + return nil + } + out := new(ConnectionPoolConfiguration) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KubernetesMetaConfiguration) DeepCopyInto(out *KubernetesMetaConfiguration) { *out = *in @@ -254,6 +307,7 @@ func (in *OperatorConfigurationData) DeepCopyInto(out *OperatorConfigurationData out.LoggingRESTAPI = in.LoggingRESTAPI out.Scalyr = in.Scalyr out.LogicalBackup = in.LogicalBackup + in.ConnectionPool.DeepCopyInto(&out.ConnectionPool) return } @@ -416,6 +470,16 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { out.Volume = in.Volume in.Patroni.DeepCopyInto(&out.Patroni) out.Resources = in.Resources + if in.EnableConnectionPool != nil { + in, out := &in.EnableConnectionPool, &out.EnableConnectionPool + *out = new(bool) + **out = **in + } + if in.ConnectionPool != nil { + in, out := &in.ConnectionPool, &out.ConnectionPool + *out = new(ConnectionPool) + (*in).DeepCopyInto(*out) + } if in.SpiloFSGroup != nil { in, out := &in.SpiloFSGroup, &out.SpiloFSGroup *out = new(int64) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d740260d2..dba67c142 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/r3labs/diff" "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -48,11 +49,26 @@ type Config struct { PodServiceAccountRoleBinding *rbacv1.RoleBinding } +// K8S objects that are belongs to a connection pool +type ConnectionPoolObjects struct { + Deployment *appsv1.Deployment + Service *v1.Service + + // It could happen that a connection pool was enabled, but the operator was + // not able to properly process a corresponding event or was restarted. In + // this case we will miss missing/require situation and a lookup function + // will not be installed. To avoid synchronizing it all the time to prevent + // this, we can remember the result in memory at least until the next + // restart. + LookupFunction bool +} + type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet + ConnectionPool *ConnectionPoolObjects PodDisruptionBudget *policybeta1.PodDisruptionBudget //Pods are treated separately //PVCs are treated separately @@ -184,7 +200,8 @@ func (c *Cluster) isNewCluster() bool { func (c *Cluster) initUsers() error { c.setProcessName("initializing users") - // clear our the previous state of the cluster users (in case we are running a sync). + // clear our the previous state of the cluster users (in case we are + // running a sync). c.systemUsers = map[string]spec.PgUser{} c.pgUsers = map[string]spec.PgUser{} @@ -292,8 +309,10 @@ func (c *Cluster) Create() error { } c.logger.Infof("pods are ready") - // create database objects unless we are running without pods or disabled that feature explicitly + // create database objects unless we are running without pods or disabled + // that feature explicitly if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { + c.logger.Infof("Create roles") if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) } @@ -316,6 +335,26 @@ func (c *Cluster) Create() error { c.logger.Errorf("could not list resources: %v", err) } + // Create connection pool deployment and services if necessary. Since we + // need to peform some operations with the database itself (e.g. install + // lookup function), do it as the last step, when everything is available. + // + // Do not consider connection pool as a strict requirement, and if + // something fails, report warning + if c.needConnectionPool() { + if c.ConnectionPool != nil { + c.logger.Warning("Connection pool already exists in the cluster") + return nil + } + connPool, err := c.createConnectionPool(c.installLookupFunction) + if err != nil { + c.logger.Warningf("could not create connection pool: %v", err) + return nil + } + c.logger.Infof("connection pool %q has been successfully created", + util.NameFromMeta(connPool.Deployment.ObjectMeta)) + } + return nil } @@ -571,7 +610,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } - if !reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) { + // connection pool needs one system user created, which is done in + // initUsers. Check if it needs to be called. + sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) + needConnPool := c.needConnectionPoolWorker(&newSpec.Spec) + if !sameUsers || needConnPool { c.logger.Debugf("syncing secrets") if err := c.initUsers(); err != nil { c.logger.Errorf("could not init users: %v", err) @@ -695,6 +738,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } + // sync connection pool + if err := c.syncConnectionPool(oldSpec, newSpec, c.installLookupFunction); err != nil { + return fmt.Errorf("could not sync connection pool: %v", err) + } + return nil } @@ -746,6 +794,12 @@ func (c *Cluster) Delete() { c.logger.Warningf("could not remove leftover patroni objects; %v", err) } + // Delete connection pool objects anyway, even if it's not mentioned in the + // manifest, just to not keep orphaned components in case if something went + // wrong + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } } //NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status). @@ -812,6 +866,35 @@ func (c *Cluster) initSystemUsers() { Name: c.OpConfig.ReplicationUsername, Password: util.RandomPassword(constants.PasswordLength), } + + // Connection pool user is an exception, if requested it's going to be + // created by operator as a normal pgUser + if c.needConnectionPool() { + // initialize empty connection pool if not done yet + if c.Spec.ConnectionPool == nil { + c.Spec.ConnectionPool = &acidv1.ConnectionPool{} + } + + username := util.Coalesce( + c.Spec.ConnectionPool.User, + c.OpConfig.ConnectionPool.User) + + // connection pooler application should be able to login with this role + connPoolUser := spec.PgUser{ + Origin: spec.RoleConnectionPool, + Name: username, + Flags: []string{constants.RoleFlagLogin}, + Password: util.RandomPassword(constants.PasswordLength), + } + + if _, exists := c.pgUsers[username]; !exists { + c.pgUsers[username] = connPoolUser + } + + if _, exists := c.systemUsers[constants.ConnectionPoolUserKeyName]; !exists { + c.systemUsers[constants.ConnectionPoolUserKeyName] = connPoolUser + } + } } func (c *Cluster) initRobotUsers() error { @@ -1138,3 +1221,119 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error { return c.deleteClusterObject(get, deleteConfigMapFn, "configmap") } + +// Test if two connection pool configuration needs to be synced. For simplicity +// compare not the actual K8S objects, but the configuration itself and request +// sync if there is any difference. +func (c *Cluster) needSyncConnPoolSpecs(oldSpec, newSpec *acidv1.ConnectionPool) (sync bool, reasons []string) { + reasons = []string{} + sync = false + + changelog, err := diff.Diff(oldSpec, newSpec) + if err != nil { + c.logger.Infof("Cannot get diff, do not do anything, %+v", err) + return false, reasons + } + + if len(changelog) > 0 { + sync = true + } + + for _, change := range changelog { + msg := fmt.Sprintf("%s %+v from '%+v' to '%+v'", + change.Type, change.Path, change.From, change.To) + reasons = append(reasons, msg) + } + + return sync, reasons +} + +func syncResources(a, b *v1.ResourceRequirements) bool { + for _, res := range []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } { + if !a.Limits[res].Equal(b.Limits[res]) || + !a.Requests[res].Equal(b.Requests[res]) { + return true + } + } + + return false +} + +// Check if we need to synchronize connection pool deployment due to new +// defaults, that are different from what we see in the DeploymentSpec +func (c *Cluster) needSyncConnPoolDefaults( + spec *acidv1.ConnectionPool, + deployment *appsv1.Deployment) (sync bool, reasons []string) { + + reasons = []string{} + sync = false + + config := c.OpConfig.ConnectionPool + podTemplate := deployment.Spec.Template + poolContainer := podTemplate.Spec.Containers[constants.ConnPoolContainer] + + if spec == nil { + spec = &acidv1.ConnectionPool{} + } + + if spec.NumberOfInstances == nil && + *deployment.Spec.Replicas != *config.NumberOfInstances { + + sync = true + msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)", + *deployment.Spec.Replicas, *config.NumberOfInstances) + reasons = append(reasons, msg) + } + + if spec.DockerImage == "" && + poolContainer.Image != config.Image { + + sync = true + msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", + poolContainer.Image, config.Image) + reasons = append(reasons, msg) + } + + expectedResources, err := generateResourceRequirements(spec.Resources, + c.makeDefaultConnPoolResources()) + + // An error to generate expected resources means something is not quite + // right, but for the purpose of robustness do not panic here, just report + // and ignore resources comparison (in the worst case there will be no + // updates for new resource values). + if err == nil && syncResources(&poolContainer.Resources, expectedResources) { + sync = true + msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", + poolContainer.Resources, expectedResources) + reasons = append(reasons, msg) + } + + if err != nil { + c.logger.Warningf("Cannot generate expected resources, %v", err) + } + + for _, env := range poolContainer.Env { + if spec.User == "" && env.Name == "PGUSER" { + ref := env.ValueFrom.SecretKeyRef.LocalObjectReference + + if ref.Name != c.credentialSecretName(config.User) { + sync = true + msg := fmt.Sprintf("Pool user is different (having %s, required %s)", + ref.Name, config.User) + reasons = append(reasons, msg) + } + } + + if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema { + sync = true + msg := fmt.Sprintf("Pool schema is different (having %s, required %s)", + env.Value, config.Schema) + reasons = append(reasons, msg) + } + } + + return sync, reasons +} diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 9efbc51c6..a1b361642 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -9,6 +9,7 @@ import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/teams" v1 "k8s.io/api/core/v1" @@ -704,3 +705,20 @@ func TestServiceAnnotations(t *testing.T) { }) } } + +func TestInitSystemUsers(t *testing.T) { + testName := "Test system users initialization" + + // default cluster without connection pool + cl.initSystemUsers() + if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; exist { + t.Errorf("%s, connection pool user is present", testName) + } + + // cluster with connection pool + cl.Spec.EnableConnectionPool = boolToPointer(true) + cl.initSystemUsers() + if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; !exist { + t.Errorf("%s, connection pool user is not present", testName) + } +} diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 07ea011a6..bca68c188 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -1,10 +1,12 @@ package cluster import ( + "bytes" "database/sql" "fmt" "net" "strings" + "text/template" "time" "github.com/lib/pq" @@ -28,13 +30,37 @@ const ( getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` + connectionPoolLookup = ` + CREATE SCHEMA IF NOT EXISTS {{.pool_schema}}; + + CREATE OR REPLACE FUNCTION {{.pool_schema}}.user_lookup( + in i_username text, out uname text, out phash text) + RETURNS record AS $$ + BEGIN + SELECT usename, passwd FROM pg_catalog.pg_shadow + WHERE usename = i_username INTO uname, phash; + RETURN; + END; + $$ LANGUAGE plpgsql SECURITY DEFINER; + + REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text) + FROM public, {{.pool_user}}; + GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text) + TO {{.pool_user}}; + GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}}; + ` ) -func (c *Cluster) pgConnectionString() string { +func (c *Cluster) pgConnectionString(dbname string) string { password := c.systemUsers[constants.SuperuserKeyName].Password - return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user='%s' password='%s' connect_timeout='%d'", + if dbname == "" { + dbname = "postgres" + } + + return fmt.Sprintf("host='%s' dbname='%s' sslmode=require user='%s' password='%s' connect_timeout='%d'", fmt.Sprintf("%s.%s.svc.%s", c.Name, c.Namespace, c.OpConfig.ClusterDomain), + dbname, c.systemUsers[constants.SuperuserKeyName].Name, strings.Replace(password, "$", "\\$", -1), constants.PostgresConnectTimeout/time.Second) @@ -49,13 +75,17 @@ func (c *Cluster) databaseAccessDisabled() bool { } func (c *Cluster) initDbConn() error { + return c.initDbConnWithName("") +} + +func (c *Cluster) initDbConnWithName(dbname string) error { c.setProcessName("initializing db connection") if c.pgDb != nil { return nil } var conn *sql.DB - connstring := c.pgConnectionString() + connstring := c.pgConnectionString(dbname) finalerr := retryutil.Retry(constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { @@ -94,6 +124,10 @@ func (c *Cluster) initDbConn() error { return nil } +func (c *Cluster) connectionIsClosed() bool { + return c.pgDb == nil +} + func (c *Cluster) closeDbConn() (err error) { c.setProcessName("closing db connection") if c.pgDb != nil { @@ -243,3 +277,88 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin return result } + +// Creates a connection pool credentials lookup function in every database to +// perform remote authentification. +func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error { + var stmtBytes bytes.Buffer + c.logger.Info("Installing lookup function") + + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init database connection") + } + defer func() { + if c.connectionIsClosed() { + return + } + + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + + currentDatabases, err := c.getDatabases() + if err != nil { + msg := "could not get databases to install pool lookup function: %v" + return fmt.Errorf(msg, err) + } + + templater := template.Must(template.New("sql").Parse(connectionPoolLookup)) + + for dbname, _ := range currentDatabases { + if dbname == "template0" || dbname == "template1" { + continue + } + + if err := c.initDbConnWithName(dbname); err != nil { + return fmt.Errorf("could not init database connection to %s", dbname) + } + + c.logger.Infof("Install pool lookup function into %s", dbname) + + params := TemplateParams{ + "pool_schema": poolSchema, + "pool_user": poolUser, + } + + if err := templater.Execute(&stmtBytes, params); err != nil { + c.logger.Errorf("could not prepare sql statement %+v: %v", + params, err) + // process other databases + continue + } + + // golang sql will do retries couple of times if pq driver reports + // connections issues (driver.ErrBadConn), but since our query is + // idempotent, we can retry in a view of other errors (e.g. due to + // failover a db is temporary in a read-only mode or so) to make sure + // it was applied. + execErr := retryutil.Retry( + constants.PostgresConnectTimeout, + constants.PostgresConnectRetryTimeout, + func() (bool, error) { + if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { + msg := fmt.Errorf("could not execute sql statement %s: %v", + stmtBytes.String(), err) + return false, msg + } + + return true, nil + }) + + if execErr != nil { + c.logger.Errorf("could not execute after retries %s: %v", + stmtBytes.String(), err) + // process other databases + continue + } + + c.logger.Infof("Pool lookup function installed into %s", dbname) + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + } + + c.ConnectionPool.LookupFunction = true + return nil +} diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 39b5c16a0..34b409d4e 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -21,6 +21,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/apimachinery/pkg/labels" @@ -31,10 +32,12 @@ const ( patroniPGBinariesParameterName = "bin_dir" patroniPGParametersParameterName = "parameters" patroniPGHBAConfParameterName = "pg_hba" + localHost = "127.0.0.1/32" + connectionPoolContainer = "connection-pool" + pgPort = 5432 // the gid of the postgres user in the default spilo image spiloPostgresGID = 103 - localHost = "127.0.0.1/32" ) type pgUser struct { @@ -70,6 +73,10 @@ func (c *Cluster) statefulSetName() string { return c.Name } +func (c *Cluster) connPoolName() string { + return c.Name + "-pooler" +} + func (c *Cluster) endpointName(role PostgresRole) string { name := c.Name if role == Replica { @@ -88,6 +95,28 @@ func (c *Cluster) serviceName(role PostgresRole) string { return name } +func (c *Cluster) serviceAddress(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return service.ObjectMeta.Name + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + +func (c *Cluster) servicePort(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return fmt.Sprint(service.Spec.Ports[0].Port) + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + func (c *Cluster) podDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } @@ -96,10 +125,39 @@ func (c *Cluster) makeDefaultResources() acidv1.Resources { config := c.OpConfig - defaultRequests := acidv1.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest} - defaultLimits := acidv1.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit} + defaultRequests := acidv1.ResourceDescription{ + CPU: config.Resources.DefaultCPURequest, + Memory: config.Resources.DefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.Resources.DefaultCPULimit, + Memory: config.Resources.DefaultMemoryLimit, + } - return acidv1.Resources{ResourceRequests: defaultRequests, ResourceLimits: defaultLimits} + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } +} + +// Generate default resource section for connection pool deployment, to be used +// if nothing custom is specified in the manifest +func (c *Cluster) makeDefaultConnPoolResources() acidv1.Resources { + config := c.OpConfig + + defaultRequests := acidv1.ResourceDescription{ + CPU: config.ConnectionPool.ConnPoolDefaultCPURequest, + Memory: config.ConnectionPool.ConnPoolDefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.ConnectionPool.ConnPoolDefaultCPULimit, + Memory: config.ConnectionPool.ConnPoolDefaultMemoryLimit, + } + + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } } func generateResourceRequirements(resources acidv1.Resources, defaultResources acidv1.Resources) (*v1.ResourceRequirements, error) { @@ -319,7 +377,11 @@ func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]stri return *tolerationsSpec } - if len(podToleration["key"]) > 0 || len(podToleration["operator"]) > 0 || len(podToleration["value"]) > 0 || len(podToleration["effect"]) > 0 { + if len(podToleration["key"]) > 0 || + len(podToleration["operator"]) > 0 || + len(podToleration["value"]) > 0 || + len(podToleration["effect"]) > 0 { + return []v1.Toleration{ { Key: podToleration["key"], @@ -786,12 +848,12 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef request := spec.Resources.ResourceRequests.Memory if request == "" { - request = c.OpConfig.DefaultMemoryRequest + request = c.OpConfig.Resources.DefaultMemoryRequest } limit := spec.Resources.ResourceLimits.Memory if limit == "" { - limit = c.OpConfig.DefaultMemoryLimit + limit = c.OpConfig.Resources.DefaultMemoryLimit } isSmaller, err := util.IsSmallerQuantity(request, limit) @@ -813,12 +875,12 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef // TODO #413 sidecarRequest := sidecar.Resources.ResourceRequests.Memory if request == "" { - request = c.OpConfig.DefaultMemoryRequest + request = c.OpConfig.Resources.DefaultMemoryRequest } sidecarLimit := sidecar.Resources.ResourceLimits.Memory if limit == "" { - limit = c.OpConfig.DefaultMemoryLimit + limit = c.OpConfig.Resources.DefaultMemoryLimit } isSmaller, err := util.IsSmallerQuantity(sidecarRequest, sidecarLimit) @@ -1774,6 +1836,306 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) { return "logical-backup-" + c.clusterName().Name } +// Generate pool size related environment variables. +// +// MAX_DB_CONN would specify the global maximum for connections to a target +// database. +// +// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough. +// +// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when +// most of the queries coming through a connection pooler are from the same +// user to the same db). In case if we want to spin up more connection pool +// instances, take this into account and maintain the same number of +// connections. +// +// MIN_SIZE is a pool minimal size, to prevent situation when sudden workload +// have to wait for spinning up a new connections. +// +// RESERVE_SIZE is how many additional connections to allow for a pool. +func (c *Cluster) getConnPoolEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar { + effectiveMode := util.Coalesce( + spec.ConnectionPool.Mode, + c.OpConfig.ConnectionPool.Mode) + + numberOfInstances := spec.ConnectionPool.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPool.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + effectiveMaxDBConn := util.CoalesceInt32( + spec.ConnectionPool.MaxDBConnections, + c.OpConfig.ConnectionPool.MaxDBConnections) + + if effectiveMaxDBConn == nil { + effectiveMaxDBConn = k8sutil.Int32ToPointer( + constants.ConnPoolMaxDBConnections) + } + + maxDBConn := *effectiveMaxDBConn / *numberOfInstances + + defaultSize := maxDBConn / 2 + minSize := defaultSize / 2 + reserveSize := minSize + + return []v1.EnvVar{ + { + Name: "CONNECTION_POOL_PORT", + Value: fmt.Sprint(pgPort), + }, + { + Name: "CONNECTION_POOL_MODE", + Value: effectiveMode, + }, + { + Name: "CONNECTION_POOL_DEFAULT_SIZE", + Value: fmt.Sprint(defaultSize), + }, + { + Name: "CONNECTION_POOL_MIN_SIZE", + Value: fmt.Sprint(minSize), + }, + { + Name: "CONNECTION_POOL_RESERVE_SIZE", + Value: fmt.Sprint(reserveSize), + }, + { + Name: "CONNECTION_POOL_MAX_CLIENT_CONN", + Value: fmt.Sprint(constants.ConnPoolMaxClientConnections), + }, + { + Name: "CONNECTION_POOL_MAX_DB_CONN", + Value: fmt.Sprint(maxDBConn), + }, + } +} + +func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( + *v1.PodTemplateSpec, error) { + + gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) + resources, err := generateResourceRequirements( + spec.ConnectionPool.Resources, + c.makeDefaultConnPoolResources()) + + effectiveDockerImage := util.Coalesce( + spec.ConnectionPool.DockerImage, + c.OpConfig.ConnectionPool.Image) + + effectiveSchema := util.Coalesce( + spec.ConnectionPool.Schema, + c.OpConfig.ConnectionPool.Schema) + + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements: %v", err) + } + + secretSelector := func(key string) *v1.SecretKeySelector { + effectiveUser := util.Coalesce( + spec.ConnectionPool.User, + c.OpConfig.ConnectionPool.User) + + return &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(effectiveUser), + }, + Key: key, + } + } + + envVars := []v1.EnvVar{ + { + Name: "PGHOST", + Value: c.serviceAddress(Master), + }, + { + Name: "PGPORT", + Value: c.servicePort(Master), + }, + { + Name: "PGUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + // the convention is to use the same schema name as + // connection pool username + { + Name: "PGSCHEMA", + Value: effectiveSchema, + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("password"), + }, + }, + } + + envVars = append(envVars, c.getConnPoolEnvVars(spec)...) + + poolerContainer := v1.Container{ + Name: connectionPoolContainer, + Image: effectiveDockerImage, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resources, + Ports: []v1.ContainerPort{ + { + ContainerPort: pgPort, + Protocol: v1.ProtocolTCP, + }, + }, + Env: envVars, + } + + podTemplate := &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: c.connPoolLabelsSelector().MatchLabels, + Namespace: c.Namespace, + Annotations: c.generatePodAnnotations(spec), + }, + Spec: v1.PodSpec{ + ServiceAccountName: c.OpConfig.PodServiceAccountName, + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{poolerContainer}, + // TODO: add tolerations to scheduler pooler on the same node + // as database + //Tolerations: *tolerationsSpec, + }, + } + + return podTemplate, nil +} + +// Return an array of ownerReferences to make an arbitraty object dependent on +// the StatefulSet. Dependency is made on StatefulSet instead of PostgreSQL CRD +// while the former is represent the actual state, and only it's deletion means +// we delete the cluster (e.g. if CRD was deleted, StatefulSet somehow +// survived, we can't delete an object because it will affect the functioning +// cluster). +func (c *Cluster) ownerReferences() []metav1.OwnerReference { + controller := true + + if c.Statefulset == nil { + c.logger.Warning("Cannot get owner reference, no statefulset") + return []metav1.OwnerReference{} + } + + return []metav1.OwnerReference{ + { + UID: c.Statefulset.ObjectMeta.UID, + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: c.Statefulset.ObjectMeta.Name, + Controller: &controller, + }, + } +} + +func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( + *appsv1.Deployment, error) { + + // there are two ways to enable connection pooler, either to specify a + // connectionPool section or enableConnectionPool. In the second case + // spec.connectionPool will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPool == nil { + spec.ConnectionPool = &acidv1.ConnectionPool{} + } + + podTemplate, err := c.generateConnPoolPodTemplate(spec) + numberOfInstances := spec.ConnectionPool.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPool.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + if *numberOfInstances < constants.ConnPoolMinInstances { + msg := "Adjusted number of connection pool instances from %d to %d" + c.logger.Warningf(msg, numberOfInstances, constants.ConnPoolMinInstances) + + *numberOfInstances = constants.ConnPoolMinInstances + } + + if err != nil { + return nil, err + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.connPoolLabelsSelector().MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this deployment, but there is a hope that this object + // will be garbage collected if something went wrong and operator + // didn't deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: numberOfInstances, + Selector: c.connPoolLabelsSelector(), + Template: *podTemplate, + }, + } + + return deployment, nil +} + +func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service { + + // there are two ways to enable connection pooler, either to specify a + // connectionPool section or enableConnectionPool. In the second case + // spec.connectionPool will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPool == nil { + spec.ConnectionPool = &acidv1.ConnectionPool{} + } + + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: c.connPoolName(), + Port: pgPort, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, + }, + }, + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "connection-pool": c.connPoolName(), + }, + } + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.connPoolLabelsSelector().MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this service, but there is a hope that this object will + // be garbage collected if something went wrong and operator didn't + // deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: serviceSpec, + } + + return service +} + func ensurePath(file string, defaultDir string, defaultFile string) string { if file == "" { return path.Join(defaultDir, defaultFile) diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 25e0f7af4..e04b281ba 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -1,6 +1,8 @@ package cluster import ( + "errors" + "fmt" "reflect" "testing" @@ -13,6 +15,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -582,6 +585,375 @@ func TestSecretVolume(t *testing.T) { } } +func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"] + if cpuReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest { + return fmt.Errorf("CPU request doesn't match, got %s, expected %s", + cpuReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest) + } + + memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"] + if memReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest { + return fmt.Errorf("Memory request doesn't match, got %s, expected %s", + memReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest) + } + + cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"] + if cpuLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit { + return fmt.Errorf("CPU limit doesn't match, got %s, expected %s", + cpuLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit) + } + + memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"] + if memLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit { + return fmt.Errorf("Memory limit doesn't match, got %s, expected %s", + memLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit) + } + + return nil +} + +func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + poolLabels := podSpec.ObjectMeta.Labels["connection-pool"] + + if poolLabels != cluster.connPoolLabelsSelector().MatchLabels["connection-pool"] { + return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", + podSpec.ObjectMeta.Labels, cluster.connPoolLabelsSelector().MatchLabels) + } + + return nil +} + +func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + required := map[string]bool{ + "PGHOST": false, + "PGPORT": false, + "PGUSER": false, + "PGSCHEMA": false, + "PGPASSWORD": false, + "CONNECTION_POOL_MODE": false, + "CONNECTION_POOL_PORT": false, + } + + envs := podSpec.Spec.Containers[0].Env + for _, env := range envs { + required[env.Name] = true + } + + for env, value := range required { + if !value { + return fmt.Errorf("Environment variable %s is not present", env) + } + } + + return nil +} + +func testCustomPodTemplate(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + if podSpec.ObjectMeta.Name != "test-pod-template" { + return fmt.Errorf("Custom pod template is not used, current spec %+v", + podSpec) + } + + return nil +} + +func TestConnPoolPodSpec(t *testing.T) { + testName := "Test connection pool pod template generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + MaxDBConnections: int32ToPointer(60), + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + var clusterNoDefaultRes = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{}, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "no default resources", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), + cluster: clusterNoDefaultRes, + check: noCheck, + }, + { + subTest: "default resources are set", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testResources, + }, + { + subTest: "labels for service", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testLabels, + }, + { + subTest: "required envs", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testEnvs, + }, + } + for _, tt := range tests { + podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, podSpec) + if err != nil { + t.Errorf("%s [%s]: Pod spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployment) error { + owner := deployment.ObjectMeta.OwnerReferences[0] + + if owner.Name != cluster.Statefulset.ObjectMeta.Name { + return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s", + owner.Name, cluster.Statefulset.ObjectMeta.Name) + } + + return nil +} + +func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { + labels := deployment.Spec.Selector.MatchLabels + expected := cluster.connPoolLabelsSelector().MatchLabels + + if labels["connection-pool"] != expected["connection-pool"] { + return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", + labels, expected) + } + + return nil +} + +func TestConnPoolDeploymentSpec(t *testing.T) { + testName := "Test connection pool deployment spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + noCheck := func(cluster *Cluster, deployment *appsv1.Deployment) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, deployment *appsv1.Deployment) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testDeploymentOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testSelector, + }, + } + for _, tt := range tests { + deployment, err := tt.cluster.generateConnPoolDeployment(tt.spec) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, deployment) + if err != nil { + t.Errorf("%s [%s]: Deployment spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error { + owner := service.ObjectMeta.OwnerReferences[0] + + if owner.Name != cluster.Statefulset.ObjectMeta.Name { + return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s", + owner.Name, cluster.Statefulset.ObjectMeta.Name) + } + + return nil +} + +func testServiceSelector(cluster *Cluster, service *v1.Service) error { + selector := service.Spec.Selector + + if selector["connection-pool"] != cluster.connPoolName() { + return fmt.Errorf("Selector is incorrect, got %s, expected %s", + selector["connection-pool"], cluster.connPoolName()) + } + + return nil +} + +func TestConnPoolServiceSpec(t *testing.T) { + testName := "Test connection pool service spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + noCheck := func(cluster *Cluster, deployment *v1.Service) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + cluster *Cluster + check func(cluster *Cluster, deployment *v1.Service) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: testServiceOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: testServiceSelector, + }, + } + for _, tt := range tests { + service := tt.cluster.generateConnPoolService(tt.spec) + + if err := tt.check(cluster, service); err != nil { + t.Errorf("%s [%s]: Service spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + func TestTLS(t *testing.T) { var err error var spec acidv1.PostgresSpec diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index d6c2149bf..2e02a9a83 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -90,6 +90,132 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) { return statefulSet, nil } +// Prepare the database for connection pool to be used, i.e. install lookup +// function (do it first, because it should be fast and if it didn't succeed, +// it doesn't makes sense to create more K8S objects. At this moment we assume +// that necessary connection pool user exists. +// +// After that create all the objects for connection pool, namely a deployment +// with a chosen pooler and a service to expose it. +func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolObjects, error) { + var msg string + c.setProcessName("creating connection pool") + + schema := c.Spec.ConnectionPool.Schema + if schema == "" { + schema = c.OpConfig.ConnectionPool.Schema + } + + user := c.Spec.ConnectionPool.User + if user == "" { + user = c.OpConfig.ConnectionPool.User + } + + err := lookup(schema, user) + + if err != nil { + msg = "could not prepare database for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + deploymentSpec, err := c.generateConnPoolDeployment(&c.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + // client-go does retry 10 times (with NoBackoff by default) when the API + // believe a request can be retried and returns Retry-After header. This + // should be good enough to not think about it here. + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return nil, err + } + + serviceSpec := c.generateConnPoolService(&c.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return nil, err + } + + c.ConnectionPool = &ConnectionPoolObjects{ + Deployment: deployment, + Service: service, + } + c.logger.Debugf("created new connection pool %q, uid: %q", + util.NameFromMeta(deployment.ObjectMeta), deployment.UID) + + return c.ConnectionPool, nil +} + +func (c *Cluster) deleteConnectionPool() (err error) { + c.setProcessName("deleting connection pool") + c.logger.Debugln("deleting connection pool") + + // Lack of connection pooler objects is not a fatal error, just log it if + // it was present before in the manifest + if c.ConnectionPool == nil { + c.logger.Infof("No connection pool to delete") + return nil + } + + // Clean up the deployment object. If deployment resource we've remembered + // is somehow empty, try to delete based on what would we generate + deploymentName := c.connPoolName() + deployment := c.ConnectionPool.Deployment + + if deployment != nil { + deploymentName = deployment.Name + } + + // set delete propagation policy to foreground, so that replica set will be + // also deleted. + policy := metav1.DeletePropagationForeground + options := metav1.DeleteOptions{PropagationPolicy: &policy} + err = c.KubeClient. + Deployments(c.Namespace). + Delete(deploymentName, &options) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool deployment was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete deployment: %v", err) + } + + c.logger.Infof("Connection pool deployment %q has been deleted", deploymentName) + + // Repeat the same for the service object + service := c.ConnectionPool.Service + serviceName := c.connPoolName() + + if service != nil { + serviceName = service.Name + } + + // set delete propagation policy to foreground, so that all the dependant + // will be deleted. + err = c.KubeClient. + Services(c.Namespace). + Delete(serviceName, &options) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool service was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete service: %v", err) + } + + c.logger.Infof("Connection pool service %q has been deleted", serviceName) + + c.ConnectionPool = nil + return nil +} + func getPodIndex(podName string) (int32, error) { parts := strings.Split(podName, "-") if len(parts) == 0 { @@ -674,3 +800,34 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { return c.PodDisruptionBudget } + +// Perform actual patching of a connection pool deployment, assuming that all +// the check were already done before. +func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { + c.setProcessName("updating connection pool") + if c.ConnectionPool == nil || c.ConnectionPool.Deployment == nil { + return nil, fmt.Errorf("there is no connection pool in the cluster") + } + + patchData, err := specPatch(newDeployment.Spec) + if err != nil { + return nil, fmt.Errorf("could not form patch for the deployment: %v", err) + } + + // An update probably requires RetryOnConflict, but since only one operator + // worker at one time will try to update it chances of conflicts are + // minimal. + deployment, err := c.KubeClient. + Deployments(c.ConnectionPool.Deployment.Namespace). + Patch( + c.ConnectionPool.Deployment.Name, + types.MergePatchType, + patchData, "") + if err != nil { + return nil, fmt.Errorf("could not patch deployment: %v", err) + } + + c.ConnectionPool.Deployment = deployment + + return deployment, nil +} diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go new file mode 100644 index 000000000..f06e96e65 --- /dev/null +++ b/pkg/cluster/resources_test.go @@ -0,0 +1,127 @@ +package cluster + +import ( + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func mockInstallLookupFunction(schema string, user string) error { + return nil +} + +func boolToPointer(value bool) *bool { + return &value +} + +func TestConnPoolCreationAndDeletion(t *testing.T) { + testName := "Test connection pool creation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + } + poolResources, err := cluster.createConnectionPool(mockInstallLookupFunction) + + if err != nil { + t.Errorf("%s: Cannot create connection pool, %s, %+v", + testName, err, poolResources) + } + + if poolResources.Deployment == nil { + t.Errorf("%s: Connection pool deployment is empty", testName) + } + + if poolResources.Service == nil { + t.Errorf("%s: Connection pool service is empty", testName) + } + + err = cluster.deleteConnectionPool() + if err != nil { + t.Errorf("%s: Cannot delete connection pool, %s", testName, err) + } +} + +func TestNeedConnPool(t *testing.T) { + testName := "Test how connection pool can be enabled" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with full definition", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(true), + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with flag", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(false), + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is still enabled with flag being false", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(true), + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with flag and full", + testName) + } +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index b04ff863b..a7c933ae7 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -23,6 +23,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() + oldSpec := c.Postgresql c.setSpec(newSpec) defer func() { @@ -108,6 +109,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } + // sync connection pool + if err = c.syncConnectionPool(&oldSpec, newSpec, c.installLookupFunction); err != nil { + return fmt.Errorf("could not sync connection pool: %v", err) + } + return err } @@ -424,7 +430,9 @@ func (c *Cluster) syncSecrets() error { } pwdUser := userMap[secretUsername] // if this secret belongs to the infrastructure role and the password has changed - replace it in the secret - if pwdUser.Password != string(secret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure { + if pwdUser.Password != string(secret.Data["password"]) && + pwdUser.Origin == spec.RoleOriginInfrastructure { + c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name) if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil { return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err) @@ -468,6 +476,16 @@ func (c *Cluster) syncRoles() (err error) { for _, u := range c.pgUsers { userNames = append(userNames, u.Name) } + + if c.needConnectionPool() { + connPoolUser := c.systemUsers[constants.ConnectionPoolUserKeyName] + userNames = append(userNames, connPoolUser.Name) + + if _, exists := c.pgUsers[connPoolUser.Name]; !exists { + c.pgUsers[connPoolUser.Name] = connPoolUser + } + } + dbUsers, err = c.readPgUsersFromDatabase(userNames) if err != nil { return fmt.Errorf("error getting users from the database: %v", err) @@ -600,3 +618,165 @@ func (c *Cluster) syncLogicalBackupJob() error { return nil } + +func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error { + if c.ConnectionPool == nil { + c.ConnectionPool = &ConnectionPoolObjects{} + } + + newNeedConnPool := c.needConnectionPoolWorker(&newSpec.Spec) + oldNeedConnPool := c.needConnectionPoolWorker(&oldSpec.Spec) + + if newNeedConnPool { + // Try to sync in any case. If we didn't needed connection pool before, + // it means we want to create it. If it was already present, still sync + // since it could happen that there is no difference in specs, and all + // the resources are remembered, but the deployment was manualy deleted + // in between + c.logger.Debug("syncing connection pool") + + // in this case also do not forget to install lookup function as for + // creating cluster + if !oldNeedConnPool || !c.ConnectionPool.LookupFunction { + newConnPool := newSpec.Spec.ConnectionPool + + specSchema := "" + specUser := "" + + if newConnPool != nil { + specSchema = newConnPool.Schema + specUser = newConnPool.User + } + + schema := util.Coalesce( + specSchema, + c.OpConfig.ConnectionPool.Schema) + + user := util.Coalesce( + specUser, + c.OpConfig.ConnectionPool.User) + + if err := lookup(schema, user); err != nil { + return err + } + } + + if err := c.syncConnectionPoolWorker(oldSpec, newSpec); err != nil { + c.logger.Errorf("could not sync connection pool: %v", err) + return err + } + } + + if oldNeedConnPool && !newNeedConnPool { + // delete and cleanup resources + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } + } + + if !oldNeedConnPool && !newNeedConnPool { + // delete and cleanup resources if not empty + if c.ConnectionPool != nil && + (c.ConnectionPool.Deployment != nil || + c.ConnectionPool.Service != nil) { + + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } + } + } + + return nil +} + +// Synchronize connection pool resources. Effectively we're interested only in +// synchronizing the corresponding deployment, but in case of deployment or +// service is missing, create it. After checking, also remember an object for +// the future references. +func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) error { + deployment, err := c.KubeClient. + Deployments(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Deployment %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + deploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + } else if err != nil { + return fmt.Errorf("could not get connection pool deployment to sync: %v", err) + } else { + c.ConnectionPool.Deployment = deployment + + // actual synchronization + oldConnPool := oldSpec.Spec.ConnectionPool + newConnPool := newSpec.Spec.ConnectionPool + specSync, specReason := c.needSyncConnPoolSpecs(oldConnPool, newConnPool) + defaultsSync, defaultsReason := c.needSyncConnPoolDefaults(newConnPool, deployment) + reason := append(specReason, defaultsReason...) + if specSync || defaultsSync { + c.logger.Infof("Update connection pool deployment %s, reason: %+v", + c.connPoolName(), reason) + + newDeploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg := "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + oldDeploymentSpec := c.ConnectionPool.Deployment + + deployment, err := c.updateConnPoolDeployment( + oldDeploymentSpec, + newDeploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + return nil + } + } + + service, err := c.KubeClient. + Services(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Service %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + serviceSpec := c.generateConnPoolService(&newSpec.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Service = service + } else if err != nil { + return fmt.Errorf("could not get connection pool service to sync: %v", err) + } else { + // Service updates are not supported and probably not that useful anyway + c.ConnectionPool.Service = service + } + + return nil +} diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go new file mode 100644 index 000000000..483d4ba58 --- /dev/null +++ b/pkg/cluster/sync_test.go @@ -0,0 +1,212 @@ +package cluster + +import ( + "fmt" + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func int32ToPointer(value int32) *int32 { + return &value +} + +func deploymentUpdated(cluster *Cluster, err error) error { + if cluster.ConnectionPool.Deployment.Spec.Replicas == nil || + *cluster.ConnectionPool.Deployment.Spec.Replicas != 2 { + return fmt.Errorf("Wrong nubmer of instances") + } + + return nil +} + +func objectsAreSaved(cluster *Cluster, err error) error { + if cluster.ConnectionPool == nil { + return fmt.Errorf("Connection pool resources are empty") + } + + if cluster.ConnectionPool.Deployment == nil { + return fmt.Errorf("Deployment was not saved") + } + + if cluster.ConnectionPool.Service == nil { + return fmt.Errorf("Service was not saved") + } + + return nil +} + +func objectsAreDeleted(cluster *Cluster, err error) error { + if cluster.ConnectionPool != nil { + return fmt.Errorf("Connection pool was not deleted") + } + + return nil +} + +func TestConnPoolSynchronization(t *testing.T) { + testName := "Test connection pool synchronization" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + NumberOfInstances: int32ToPointer(1), + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + clusterMissingObjects := *cluster + clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects() + + clusterMock := *cluster + clusterMock.KubeClient = k8sutil.NewMockKubernetesClient() + + clusterDirtyMock := *cluster + clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() + clusterDirtyMock.ConnectionPool = &ConnectionPoolObjects{ + Deployment: &appsv1.Deployment{}, + Service: &v1.Service{}, + } + + clusterNewDefaultsMock := *cluster + clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() + cluster.OpConfig.ConnectionPool.Image = "pooler:2.0" + cluster.OpConfig.ConnectionPool.NumberOfInstances = int32ToPointer(2) + + tests := []struct { + subTest string + oldSpec *acidv1.Postgresql + newSpec *acidv1.Postgresql + cluster *Cluster + check func(cluster *Cluster, err error) error + }{ + { + subTest: "create if doesn't exist", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "create if doesn't exist with a flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(true), + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "create from scratch", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "delete if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: &clusterMock, + check: objectsAreDeleted, + }, + { + subTest: "cleanup if still there", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: &clusterDirtyMock, + check: objectsAreDeleted, + }, + { + subTest: "update deployment", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(1), + }, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(2), + }, + }, + }, + cluster: &clusterMock, + check: deploymentUpdated, + }, + { + subTest: "update image from changed defaults", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterNewDefaultsMock, + check: deploymentUpdated, + }, + } + for _, tt := range tests { + err := tt.cluster.syncConnectionPool(tt.oldSpec, tt.newSpec, mockInstallLookupFunction) + + if err := tt.check(tt.cluster, err); err != nil { + t.Errorf("%s [%s]: Could not synchronize, %+v", + testName, tt.subTest, err) + } + } +} diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 138b7015c..04d00cb58 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -69,3 +69,7 @@ type ClusterStatus struct { Spec acidv1.PostgresSpec Error error } + +type TemplateParams map[string]interface{} + +type InstallFunction func(schema string, user string) error diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 8c02fed2e..dc1e93954 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -408,7 +408,32 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set { } func (c *Cluster) labelsSelector() *metav1.LabelSelector { - return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil} + return &metav1.LabelSelector{ + MatchLabels: c.labelsSet(false), + MatchExpressions: nil, + } +} + +// Return connection pool labels selector, which should from one point of view +// inherit most of the labels from the cluster itself, but at the same time +// have e.g. different `application` label, so that recreatePod operation will +// not interfere with it (it lists all the pods via labels, and if there would +// be no difference, it will recreate also pooler pods). +func (c *Cluster) connPoolLabelsSelector() *metav1.LabelSelector { + connPoolLabels := labels.Set(map[string]string{}) + + extraLabels := labels.Set(map[string]string{ + "connection-pool": c.connPoolName(), + "application": "db-connection-pool", + }) + + connPoolLabels = labels.Merge(connPoolLabels, c.labelsSet(false)) + connPoolLabels = labels.Merge(connPoolLabels, extraLabels) + + return &metav1.LabelSelector{ + MatchLabels: connPoolLabels, + MatchExpressions: nil, + } } func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set { @@ -483,3 +508,15 @@ func (c *Cluster) GetSpec() (*acidv1.Postgresql, error) { func (c *Cluster) patroniUsesKubernetes() bool { return c.OpConfig.EtcdHost == "" } + +func (c *Cluster) needConnectionPoolWorker(spec *acidv1.PostgresSpec) bool { + if spec.EnableConnectionPool == nil { + return spec.ConnectionPool != nil + } else { + return *spec.EnableConnectionPool + } +} + +func (c *Cluster) needConnectionPool() bool { + return c.needConnectionPoolWorker(&c.Spec) +} diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index 03602c3bd..970eef701 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -8,6 +8,7 @@ import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -21,6 +22,10 @@ func (c *Controller) readOperatorConfigurationFromCRD(configObjectNamespace, con return config, nil } +func int32ToPointer(value int32) *int32 { + return &value +} + // importConfigurationFromCRD is a transitional function that converts CRD configuration to the one based on the configmap func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigurationData) *config.Config { result := &config.Config{} @@ -143,5 +148,51 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit + // Connection pool. Looks like we can't use defaulting in CRD before 1.17, + // so ensure default values here. + result.ConnectionPool.NumberOfInstances = util.CoalesceInt32( + fromCRD.ConnectionPool.NumberOfInstances, + int32ToPointer(2)) + + result.ConnectionPool.NumberOfInstances = util.MaxInt32( + result.ConnectionPool.NumberOfInstances, + int32ToPointer(2)) + + result.ConnectionPool.Schema = util.Coalesce( + fromCRD.ConnectionPool.Schema, + constants.ConnectionPoolSchemaName) + + result.ConnectionPool.User = util.Coalesce( + fromCRD.ConnectionPool.User, + constants.ConnectionPoolUserName) + + result.ConnectionPool.Image = util.Coalesce( + fromCRD.ConnectionPool.Image, + "registry.opensource.zalan.do/acid/pgbouncer") + + result.ConnectionPool.Mode = util.Coalesce( + fromCRD.ConnectionPool.Mode, + constants.ConnectionPoolDefaultMode) + + result.ConnectionPool.ConnPoolDefaultCPURequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPURequest, + constants.ConnectionPoolDefaultCpuRequest) + + result.ConnectionPool.ConnPoolDefaultMemoryRequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryRequest, + constants.ConnectionPoolDefaultMemoryRequest) + + result.ConnectionPool.ConnPoolDefaultCPULimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPULimit, + constants.ConnectionPoolDefaultCpuLimit) + + result.ConnectionPool.ConnPoolDefaultMemoryLimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryLimit, + constants.ConnectionPoolDefaultMemoryLimit) + + result.ConnectionPool.MaxDBConnections = util.CoalesceInt32( + fromCRD.ConnectionPool.MaxDBConnections, + int32ToPointer(constants.ConnPoolMaxDBConnections)) + return result } diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 3e6bec8db..36783204d 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -23,13 +23,15 @@ const fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespa // RoleOrigin contains the code of the origin of a role type RoleOrigin int -// The rolesOrigin constant values must be sorted by the role priority for resolveNameConflict(...) to work. +// The rolesOrigin constant values must be sorted by the role priority for +// resolveNameConflict(...) to work. const ( RoleOriginUnknown RoleOrigin = iota RoleOriginManifest RoleOriginInfrastructure RoleOriginTeamsAPI RoleOriginSystem + RoleConnectionPool ) type syncUserOperation int @@ -178,6 +180,8 @@ func (r RoleOrigin) String() string { return "teams API role" case RoleOriginSystem: return "system role" + case RoleConnectionPool: + return "connection pool role" default: panic(fmt.Sprintf("bogus role origin value %d", r)) } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index fee65be81..e0e095617 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/zalando/postgres-operator/pkg/spec" + "github.com/zalando/postgres-operator/pkg/util/constants" ) // CRD describes CustomResourceDefinition specific configuration parameters @@ -83,6 +84,20 @@ type LogicalBackup struct { LogicalBackupS3SSE string `name:"logical_backup_s3_sse" default:"AES256"` } +// Operator options for connection pooler +type ConnectionPool struct { + NumberOfInstances *int32 `name:"connection_pool_number_of_instances" default:"2"` + Schema string `name:"connection_pool_schema" default:"pooler"` + User string `name:"connection_pool_user" default:"pooler"` + Image string `name:"connection_pool_image" default:"registry.opensource.zalan.do/acid/pgbouncer"` + Mode string `name:"connection_pool_mode" default:"transaction"` + MaxDBConnections *int32 `name:"connection_pool_max_db_connections" default:"60"` + ConnPoolDefaultCPURequest string `name:"connection_pool_default_cpu_request" default:"500m"` + ConnPoolDefaultMemoryRequest string `name:"connection_pool_default_memory_request" default:"100Mi"` + ConnPoolDefaultCPULimit string `name:"connection_pool_default_cpu_limit" default:"1"` + ConnPoolDefaultMemoryLimit string `name:"connection_pool_default_memory_limit" default:"100Mi"` +} + // Config describes operator config type Config struct { CRD @@ -90,6 +105,7 @@ type Config struct { Auth Scalyr LogicalBackup + ConnectionPool WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to' EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS @@ -196,5 +212,10 @@ func validate(cfg *Config) (err error) { if cfg.Workers == 0 { err = fmt.Errorf("number of workers should be higher than 0") } + + if *cfg.ConnectionPool.NumberOfInstances < constants.ConnPoolMinInstances { + msg := "number of connection pool instances should be higher than %d" + err = fmt.Errorf(msg, constants.ConnPoolMinInstances) + } return } diff --git a/pkg/util/constants/pooler.go b/pkg/util/constants/pooler.go new file mode 100644 index 000000000..540d64e2c --- /dev/null +++ b/pkg/util/constants/pooler.go @@ -0,0 +1,18 @@ +package constants + +// Connection pool specific constants +const ( + ConnectionPoolUserName = "pooler" + ConnectionPoolSchemaName = "pooler" + ConnectionPoolDefaultType = "pgbouncer" + ConnectionPoolDefaultMode = "transaction" + ConnectionPoolDefaultCpuRequest = "500m" + ConnectionPoolDefaultCpuLimit = "1" + ConnectionPoolDefaultMemoryRequest = "100Mi" + ConnectionPoolDefaultMemoryLimit = "100Mi" + + ConnPoolContainer = 0 + ConnPoolMaxDBConnections = 60 + ConnPoolMaxClientConnections = 10000 + ConnPoolMinInstances = 2 +) diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go index 2c20d69db..3d201142c 100644 --- a/pkg/util/constants/roles.go +++ b/pkg/util/constants/roles.go @@ -2,15 +2,16 @@ package constants // Roles specific constants const ( - PasswordLength = 64 - SuperuserKeyName = "superuser" - ReplicationUserKeyName = "replication" - RoleFlagSuperuser = "SUPERUSER" - RoleFlagInherit = "INHERIT" - RoleFlagLogin = "LOGIN" - RoleFlagNoLogin = "NOLOGIN" - RoleFlagCreateRole = "CREATEROLE" - RoleFlagCreateDB = "CREATEDB" - RoleFlagReplication = "REPLICATION" - RoleFlagByPassRLS = "BYPASSRLS" + PasswordLength = 64 + SuperuserKeyName = "superuser" + ConnectionPoolUserKeyName = "pooler" + ReplicationUserKeyName = "replication" + RoleFlagSuperuser = "SUPERUSER" + RoleFlagInherit = "INHERIT" + RoleFlagLogin = "LOGIN" + RoleFlagNoLogin = "NOLOGIN" + RoleFlagCreateRole = "CREATEROLE" + RoleFlagCreateDB = "CREATEDB" + RoleFlagReplication = "REPLICATION" + RoleFlagByPassRLS = "BYPASSRLS" ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 509b12c19..75b99ec7c 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -9,11 +9,13 @@ import ( batchv1beta1 "k8s.io/api/batch/v1beta1" clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1" + apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -26,6 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func Int32ToPointer(value int32) *int32 { + return &value +} + // KubernetesClient describes getters for Kubernetes objects type KubernetesClient struct { corev1.SecretsGetter @@ -39,6 +45,7 @@ type KubernetesClient struct { corev1.NamespacesGetter corev1.ServiceAccountsGetter appsv1.StatefulSetsGetter + appsv1.DeploymentsGetter rbacv1.RoleBindingsGetter policyv1beta1.PodDisruptionBudgetsGetter apiextbeta1.CustomResourceDefinitionsGetter @@ -55,6 +62,34 @@ type mockSecret struct { type MockSecretGetter struct { } +type mockDeployment struct { + appsv1.DeploymentInterface +} + +type mockDeploymentNotExist struct { + appsv1.DeploymentInterface +} + +type MockDeploymentGetter struct { +} + +type MockDeploymentNotExistGetter struct { +} + +type mockService struct { + corev1.ServiceInterface +} + +type mockServiceNotExist struct { + corev1.ServiceInterface +} + +type MockServiceGetter struct { +} + +type MockServiceNotExistGetter struct { +} + type mockConfigMap struct { corev1.ConfigMapInterface } @@ -101,6 +136,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.NodesGetter = client.CoreV1() kubeClient.NamespacesGetter = client.CoreV1() kubeClient.StatefulSetsGetter = client.AppsV1() + kubeClient.DeploymentsGetter = client.AppsV1() kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1() kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RoleBindingsGetter = client.RbacV1() @@ -230,19 +266,145 @@ func (c *mockConfigMap) Get(name string, options metav1.GetOptions) (*v1.ConfigM } // Secrets to be mocked -func (c *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface { +func (mock *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface { return &mockSecret{} } // ConfigMaps to be mocked -func (c *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface { +func (mock *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface { return &mockConfigMap{} } +func (mock *MockDeploymentGetter) Deployments(namespace string) appsv1.DeploymentInterface { + return &mockDeployment{} +} + +func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.DeploymentInterface { + return &mockDeploymentNotExist{} +} + +func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(1), + }, + }, nil +} + +func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) error { + return nil +} + +func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(1), + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Image: "pooler:1.0", + }, + }, + }, + }, + }, + }, nil +} + +func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(2), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + +func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + +func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(1), + }, + }, nil +} + +func (mock *MockServiceGetter) Services(namespace string) corev1.ServiceInterface { + return &mockService{} +} + +func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.ServiceInterface { + return &mockServiceNotExist{} +} + +func (mock *mockService) Create(*v1.Service) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error { + return nil +} + +func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + // NewMockKubernetesClient for other tests func NewMockKubernetesClient() KubernetesClient { return KubernetesClient{ - SecretsGetter: &MockSecretGetter{}, - ConfigMapsGetter: &MockConfigMapsGetter{}, + SecretsGetter: &MockSecretGetter{}, + ConfigMapsGetter: &MockConfigMapsGetter{}, + DeploymentsGetter: &MockDeploymentGetter{}, + ServicesGetter: &MockServiceGetter{}, + } +} + +func ClientMissingObjects() KubernetesClient { + return KubernetesClient{ + DeploymentsGetter: &MockDeploymentNotExistGetter{}, + ServicesGetter: &MockServiceNotExistGetter{}, } } diff --git a/pkg/util/util.go b/pkg/util/util.go index d9803ab48..46df5d345 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -147,6 +147,40 @@ func Coalesce(val, defaultVal string) string { return val } +// Yeah, golang +func CoalesceInt32(val, defaultVal *int32) *int32 { + if val == nil { + return defaultVal + } + return val +} + +// Test if any of the values is nil +func testNil(values ...*int32) bool { + for _, v := range values { + if v == nil { + return true + } + } + + return false +} + +// Return maximum of two integers provided via pointers. If one value is not +// defined, return the other one. If both are not defined, result is also +// undefined, caller needs to check for that. +func MaxInt32(a, b *int32) *int32 { + if testNil(a, b) { + return nil + } + + if *a > *b { + return a + } + + return b +} + // IsSmallerQuantity : checks if first resource is of a smaller quantity than the second func IsSmallerQuantity(requestStr, limitStr string) (bool, error) {