diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml index 9725c2708..7e3b607c0 100644 --- a/charts/postgres-operator/crds/operatorconfigurations.yaml +++ b/charts/postgres-operator/crds/operatorconfigurations.yaml @@ -318,6 +318,47 @@ spec: pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' scalyr_server_url: type: string + connection_pool: + type: object + properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_image: + type: string + #default: "registry.opensource.zalan.do/acid/pgbouncer" + connection_pool_max_db_connections: + type: integer + #default: 60 + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_number_of_instances: + type: integer + minimum: 2 + #default: 2 + connection_pool_default_cpu_limit: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_cpu_request: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "500m" + connection_pool_default_memory_limit: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" + connection_pool_default_memory_request: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" status: type: object additionalProperties: diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index af535e2c8..a4c0e4f3a 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -106,6 +106,55 @@ spec: uid: format: uuid type: string + connectionPool: + type: object + properties: + dockerImage: + type: string + maxDBConnections: + type: integer + mode: + type: string + enum: + - "session" + - "transaction" + numberOfInstances: + type: integer + minimum: 2 + resources: + type: object + required: + - requests + - limits + properties: + limits: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + requests: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + schema: + type: string + user: + type: string databases: type: object additionalProperties: @@ -113,6 +162,8 @@ spec: # Note: usernames specified here as database owners must be declared in the users key of the spec key. dockerImage: type: string + enableConnectionPool: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: diff --git a/charts/postgres-operator/templates/_helpers.tpl b/charts/postgres-operator/templates/_helpers.tpl index 306613ac3..e49670763 100644 --- a/charts/postgres-operator/templates/_helpers.tpl +++ b/charts/postgres-operator/templates/_helpers.tpl @@ -31,6 +31,20 @@ Create a service account name. {{ default (include "postgres-operator.fullname" .) .Values.serviceAccount.name }} {{- end -}} +{{/* +Create a pod service account name. +*/}} +{{- define "postgres-pod.serviceAccountName" -}} +{{ default (printf "%s-%v" (include "postgres-operator.fullname" .) "pod") .Values.podServiceAccount.name }} +{{- end -}} + +{{/* +Create a controller ID. +*/}} +{{- define "postgres-operator.controllerID" -}} +{{ default (include "postgres-operator.fullname" .) .Values.controllerID.name }} +{{- end -}} + {{/* Create chart name and version as used by the chart label. */}} diff --git a/charts/postgres-operator/templates/clusterrole-postgres-pod.yaml b/charts/postgres-operator/templates/clusterrole-postgres-pod.yaml index c327d9101..ef607ae3c 100644 --- a/charts/postgres-operator/templates/clusterrole-postgres-pod.yaml +++ b/charts/postgres-operator/templates/clusterrole-postgres-pod.yaml @@ -2,7 +2,7 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: postgres-pod + name: {{ include "postgres-pod.serviceAccountName" . }} labels: app.kubernetes.io/name: {{ template "postgres-operator.name" . }} helm.sh/chart: {{ template "postgres-operator.chart" . }} diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index 7b3dd462d..38ce85e7a 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -128,6 +128,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/charts/postgres-operator/templates/configmap.yaml b/charts/postgres-operator/templates/configmap.yaml index 0b976294e..e8a805db7 100644 --- a/charts/postgres-operator/templates/configmap.yaml +++ b/charts/postgres-operator/templates/configmap.yaml @@ -9,6 +9,7 @@ metadata: app.kubernetes.io/managed-by: {{ .Release.Service }} app.kubernetes.io/instance: {{ .Release.Name }} data: + pod_service_account_name: {{ include "postgres-pod.serviceAccountName" . }} {{ toYaml .Values.configGeneral | indent 2 }} {{ toYaml .Values.configUsers | indent 2 }} {{ toYaml .Values.configKubernetes | indent 2 }} @@ -19,4 +20,5 @@ data: {{ toYaml .Values.configDebug | indent 2 }} {{ toYaml .Values.configLoggingRestApi | indent 2 }} {{ toYaml .Values.configTeamsApi | indent 2 }} +{{ toYaml .Values.configConnectionPool | indent 2 }} {{- end }} diff --git a/charts/postgres-operator/templates/deployment.yaml b/charts/postgres-operator/templates/deployment.yaml index 1f7e39bbc..2d8eebcb3 100644 --- a/charts/postgres-operator/templates/deployment.yaml +++ b/charts/postgres-operator/templates/deployment.yaml @@ -43,6 +43,10 @@ spec: {{- else }} - name: POSTGRES_OPERATOR_CONFIGURATION_OBJECT value: {{ template "postgres-operator.fullname" . }} + {{- end }} + {{- if .Values.controllerID.create }} + - name: CONTROLLER_ID + value: {{ template "postgres-operator.controllerID" . }} {{- end }} resources: {{ toYaml .Values.resources | indent 10 }} diff --git a/charts/postgres-operator/templates/operatorconfiguration.yaml b/charts/postgres-operator/templates/operatorconfiguration.yaml index 06e9c7605..b52b3d664 100644 --- a/charts/postgres-operator/templates/operatorconfiguration.yaml +++ b/charts/postgres-operator/templates/operatorconfiguration.yaml @@ -13,6 +13,7 @@ configuration: users: {{ toYaml .Values.configUsers | indent 4 }} kubernetes: + pod_service_account_name: {{ include "postgres-pod.serviceAccountName" . }} oauth_token_secret_name: {{ template "postgres-operator.fullname" . }} {{ toYaml .Values.configKubernetes | indent 4 }} postgres_pod_resources: @@ -33,4 +34,6 @@ configuration: {{ toYaml .Values.configLoggingRestApi | indent 4 }} scalyr: {{ toYaml .Values.configScalyr | indent 4 }} + connection_pool: +{{ toYaml .Values.configConnectionPool | indent 4 }} {{- end }} diff --git a/charts/postgres-operator/values-crd.yaml b/charts/postgres-operator/values-crd.yaml index 7b1004b2c..79940b236 100644 --- a/charts/postgres-operator/values-crd.yaml +++ b/charts/postgres-operator/values-crd.yaml @@ -103,8 +103,6 @@ configKubernetes: # service account definition as JSON/YAML string to be used by postgres cluster pods # pod_service_account_definition: "" - # name of service account to be used by postgres cluster pods - pod_service_account_name: "postgres-pod" # role binding definition as JSON/YAML string to be used by pod service account # pod_service_account_role_binding_definition: "" @@ -269,6 +267,25 @@ configScalyr: # Memory request value for the Scalyr sidecar scalyr_memory_request: 50Mi +configConnectionPool: + # db schema to install lookup function into + connection_pool_schema: "pooler" + # db user for pooler to use + connection_pool_user: "pooler" + # docker image + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" + # max db connections the pooler should hold + connection_pool_max_db_connections: 60 + # default pooling mode + connection_pool_mode: "transaction" + # number of pooler instances + connection_pool_number_of_instances: 2 + # default resources + connection_pool_default_cpu_request: 500m + connection_pool_default_memory_request: 100Mi + connection_pool_default_cpu_limit: "1" + connection_pool_default_memory_limit: 100Mi + rbac: # Specifies whether RBAC resources should be created create: true @@ -284,6 +301,11 @@ serviceAccount: # If not set and create is true, a name is generated using the fullname template name: +podServiceAccount: + # The name of the ServiceAccount to be used by postgres cluster pods + # If not set a name is generated using the fullname template and "-pod" suffix + name: "postgres-pod" + priorityClassName: "" resources: @@ -305,3 +327,12 @@ tolerations: [] # Node labels for pod assignment # Ref: https://kubernetes.io/docs/user-guide/node-selection/ nodeSelector: {} + +controllerID: + # Specifies whether a controller ID should be defined for the operator + # Note, all postgres manifest must then contain the following annotation to be found by this operator + # "acid.zalan.do/controller": + create: false + # The name of the controller ID to use. + # If not set and create is true, a name is generated using the fullname template + name: diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index 6a78f716c..29f85339d 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -96,8 +96,6 @@ configKubernetes: # service account definition as JSON/YAML string to be used by postgres cluster pods # pod_service_account_definition: "" - # name of service account to be used by postgres cluster pods - pod_service_account_name: "postgres-pod" # role binding definition as JSON/YAML string to be used by pod service account # pod_service_account_role_binding_definition: "" @@ -245,6 +243,26 @@ configTeamsApi: # URL of the Teams API service # teams_api_url: http://fake-teams-api.default.svc.cluster.local +# configure connection pooler deployment created by the operator +configConnectionPool: + # db schema to install lookup function into + connection_pool_schema: "pooler" + # db user for pooler to use + connection_pool_user: "pooler" + # docker image + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" + # max db connections the pooler should hold + connection_pool_max_db_connections: 60 + # default pooling mode + connection_pool_mode: "transaction" + # number of pooler instances + connection_pool_number_of_instances: 2 + # default resources + connection_pool_default_cpu_request: 500m + connection_pool_default_memory_request: 100Mi + connection_pool_default_cpu_limit: "1" + connection_pool_default_memory_limit: 100Mi + rbac: # Specifies whether RBAC resources should be created create: true @@ -260,6 +278,11 @@ serviceAccount: # If not set and create is true, a name is generated using the fullname template name: +podServiceAccount: + # The name of the ServiceAccount to be used by postgres cluster pods + # If not set a name is generated using the fullname template and "-pod" suffix + name: "postgres-pod" + priorityClassName: "" resources: @@ -281,3 +304,12 @@ tolerations: [] # Node labels for pod assignment # Ref: https://kubernetes.io/docs/user-guide/node-selection/ nodeSelector: {} + +controllerID: + # Specifies whether a controller ID should be defined for the operator + # Note, all postgres manifest must then contain the following annotation to be found by this operator + # "acid.zalan.do/controller": + create: false + # The name of the controller ID to use. + # If not set and create is true, a name is generated using the fullname template + name: diff --git a/docker/DebugDockerfile b/docker/DebugDockerfile index 76dadf6df..0c11fe3b4 100644 --- a/docker/DebugDockerfile +++ b/docker/DebugDockerfile @@ -3,8 +3,17 @@ MAINTAINER Team ACID @ Zalando # We need root certificates to deal with teams api over https RUN apk --no-cache add ca-certificates go git musl-dev -RUN go get github.com/derekparker/delve/cmd/dlv COPY build/* / -CMD ["/root/go/bin/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] +RUN addgroup -g 1000 pgo +RUN adduser -D -u 1000 -G pgo -g 'Postgres Operator' pgo + +RUN go get github.com/derekparker/delve/cmd/dlv +RUN cp /root/go/bin/dlv /dlv +RUN chown -R pgo:pgo /dlv + +USER pgo:pgo +RUN ls -l / + +CMD ["/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 92e457d7e..4400cb666 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -111,12 +111,12 @@ These parameters are grouped directly under the `spec` key in the manifest. value overrides the `pod_toleration` setting from the operator. Optional. * **podPriorityClassName** - a name of the [priority - class](https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/#priorityclass) - that should be assigned to the cluster pods. When not specified, the value - is taken from the `pod_priority_class_name` operator parameter, if not set - then the default priority class is taken. The priority class itself must be - defined in advance. Optional. + a name of the [priority + class](https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/#priorityclass) + that should be assigned to the cluster pods. When not specified, the value + is taken from the `pod_priority_class_name` operator parameter, if not set + then the default priority class is taken. The priority class itself must be + defined in advance. Optional. * **podAnnotations** A map of key value pairs that gets attached as [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) @@ -140,6 +140,11 @@ These parameters are grouped directly under the `spec` key in the manifest. is `false`, then no volume will be mounted no matter how operator was configured (so you can override the operator configuration). Optional. +* **enableConnectionPool** + Tells the operator to create a connection pool with a database. If this + field is true, a connection pool deployment will be created even if + `connectionPool` section is empty. Optional, not set by default. + * **enableLogicalBackup** Determines if the logical backup of this cluster should be taken and uploaded to S3. Default: false. Optional. @@ -184,9 +189,9 @@ explanation of `ttl` and `loop_wait` parameters. ``` hostssl all +pamrole all pam ``` - , where pamrole is the name of the role for the pam authentication; any - custom `pg_hba` should include the pam line to avoid breaking pam - authentication. Optional. + where pamrole is the name of the role for the pam authentication; any + custom `pg_hba` should include the pam line to avoid breaking pam + authentication. Optional. * **ttl** Patroni `ttl` parameter value, optional. The default is set by the Spilo @@ -360,6 +365,35 @@ CPU and memory limits for the sidecar container. memory limits for the sidecar container. Optional, overrides the `default_memory_limits` operator configuration parameter. Optional. +## Connection pool + +Parameters are grouped under the `connectionPool` top-level key and specify +configuration for connection pool. If this section is not empty, a connection +pool will be created for a database even if `enableConnectionPool` is not +present. + +* **numberOfInstances** + How many instances of connection pool to create. + +* **schema** + Schema to create for credentials lookup function. + +* **user** + User to create for connection pool to be able to connect to a database. + +* **dockerImage** + Which docker image to use for connection pool deployment. + +* **maxDBConnections** + How many connections the pooler can max hold. This value is divided among the + pooler pods. + +* **mode** + In which mode to run connection pool, transaction or session. + +* **resources** + Resource configuration for connection pool deployment. + ## Custom TLS certificates Those parameters are grouped under the `tls` top-level key. @@ -379,4 +413,4 @@ Those parameters are grouped under the `tls` top-level key. * **caFile** Optional filename to the CA certificate. Useful when the client connects - with `sslmode=verify-ca` or `sslmode=verify-full`. + with `sslmode=verify-ca` or `sslmode=verify-full`. Default is empty. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 52176693e..1ab92a287 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -286,11 +286,11 @@ configuration they are grouped under the `kubernetes` key. capability. The default is `false`. * **master_pod_move_timeout** - The period of time to wait for the success of migration of master pods from - an unschedulable node. The migration includes Patroni switchovers to - respective replicas on healthy nodes. The situation where master pods still - exist on the old node after this timeout expires has to be fixed manually. - The default is 20 minutes. + The period of time to wait for the success of migration of master pods from + an unschedulable node. The migration includes Patroni switchovers to + respective replicas on healthy nodes. The situation where master pods still + exist on the old node after this timeout expires has to be fixed manually. + The default is 20 minutes. * **enable_pod_antiaffinity** toggles [pod anti affinity](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/) @@ -596,3 +596,40 @@ scalyr sidecar. In the CRD-based configuration they are grouped under the * **scalyr_memory_limit** Memory limit value for the Scalyr sidecar. The default is `500Mi`. + +## Connection pool configuration + +Parameters are grouped under the `connection_pool` top-level key and specify +default configuration for connection pool, if a postgres manifest requests it +but do not specify some of the parameters. All of them are optional with the +operator being able to provide some reasonable defaults. + +* **connection_pool_number_of_instances** + How many instances of connection pool to create. Default is 2 which is also + the required minimum. + +* **connection_pool_schema** + Schema to create for credentials lookup function. Default is `pooler`. + +* **connection_pool_user** + User to create for connection pool to be able to connect to a database. + Default is `pooler`. + +* **connection_pool_image** + Docker image to use for connection pool deployment. + Default: "registry.opensource.zalan.do/acid/pgbouncer" + +* **connection_pool_max_db_connections** + How many connections the pooler can max hold. This value is divided among the + pooler pods. Default is 60 which will make up 30 connections per pod for the + default setup with two instances. + +* **connection_pool_mode** + Default pool mode, `session` or `transaction`. Default is `transaction`. + +* **connection_pool_default_cpu_request** + **connection_pool_default_memory_reques** + **connection_pool_default_cpu_limit** + **connection_pool_default_memory_limit** + Default resource configuration for connection pool deployment. The internal + default for memory request and limit is `100Mi`, for CPU it is `500m` and `1`. diff --git a/docs/user.md b/docs/user.md index 6e71d0404..8c79bb485 100644 --- a/docs/user.md +++ b/docs/user.md @@ -30,7 +30,7 @@ spec: databases: foo: zalando postgresql: - version: "11" + version: "12" ``` Once you cloned the Postgres Operator [repository](https://github.com/zalando/postgres-operator) @@ -512,12 +512,65 @@ monitoring is outside the scope of operator responsibilities. See [administrator documentation](administrator.md) for details on how backups are executed. +## Connection pool + +The operator can create a database side connection pool for those applications, +where an application side pool is not feasible, but a number of connections is +high. To create a connection pool together with a database, modify the +manifest: + +```yaml +spec: + enableConnectionPool: true +``` + +This will tell the operator to create a connection pool with default +configuration, through which one can access the master via a separate service +`{cluster-name}-pooler`. In most of the cases provided default configuration +should be good enough. + +To configure a new connection pool, specify: + +``` +spec: + connectionPool: + # how many instances of connection pool to create + number_of_instances: 2 + + # in which mode to run, session or transaction + mode: "transaction" + + # schema, which operator will create to install credentials lookup + # function + schema: "pooler" + + # user, which operator will create for connection pool + user: "pooler" + + # resources for each instance + resources: + requests: + cpu: 500m + memory: 100Mi + limits: + cpu: "1" + memory: 100Mi +``` + +By default `pgbouncer` is used to create a connection pool. To find out about +pool modes see [docs](https://www.pgbouncer.org/config.html#pool_mode) (but it +should be general approach between different implementation). + +Note, that using `pgbouncer` means meaningful resource CPU limit should be less +than 1 core (there is a way to utilize more than one, but in K8S it's easier +just to spin up more instances). + ## Custom TLS certificates By default, the spilo image generates its own TLS certificate during startup. -This certificate is not secure since it cannot be verified and thus doesn't -protect from active MITM attacks. In this section we show how a Kubernete -Secret resources can be loaded with a custom TLS certificate. +However, this certificate cannot be verified and thus doesn't protect from +active MITM attacks. In this section we show how to specify a custom TLS +certificate which is mounted in the database pods via a K8s Secret. Before applying these changes, the operator must also be configured with the `spilo_fsgroup` set to the GID matching the postgres user group. If the value diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index f6be8a600..cc90aa5e2 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -9,6 +9,10 @@ import yaml from kubernetes import client, config +def to_selector(labels): + return ",".join(["=".join(l) for l in labels.items()]) + + class EndToEndTestCase(unittest.TestCase): ''' Test interaction of the operator with multiple K8s components. @@ -47,7 +51,8 @@ class EndToEndTestCase(unittest.TestCase): for filename in ["operator-service-account-rbac.yaml", "configmap.yaml", "postgres-operator.yaml"]: - k8s.create_with_kubectl("manifests/" + filename) + result = k8s.create_with_kubectl("manifests/" + filename) + print("stdout: {}, stderr: {}".format(result.stdout, result.stderr)) k8s.wait_for_operator_pod_start() @@ -55,9 +60,14 @@ class EndToEndTestCase(unittest.TestCase): 'default', label_selector='name=postgres-operator').items[0].spec.containers[0].image print("Tested operator image: {}".format(actual_operator_image)) # shows up after tests finish - k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") - k8s.wait_for_pod_start('spilo-role=master') - k8s.wait_for_pod_start('spilo-role=replica') + result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") + print('stdout: {}, stderr: {}'.format(result.stdout, result.stderr)) + try: + k8s.wait_for_pod_start('spilo-role=master') + k8s.wait_for_pod_start('spilo-role=replica') + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_enable_load_balancer(self): @@ -66,7 +76,7 @@ class EndToEndTestCase(unittest.TestCase): ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' # enable load balancer services pg_patch_enable_lbs = { @@ -178,7 +188,7 @@ class EndToEndTestCase(unittest.TestCase): Lower resource limits below configured minimum and let operator fix it ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' labels = 'spilo-role=master,' + cluster_label _, failover_targets = k8s.get_pg_nodes(cluster_label) @@ -247,7 +257,7 @@ class EndToEndTestCase(unittest.TestCase): Remove node readiness label from master node. This must cause a failover. ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' labels = 'spilo-role=master,' + cluster_label readiness_label = 'lifecycle-status' readiness_value = 'ready' @@ -289,7 +299,7 @@ class EndToEndTestCase(unittest.TestCase): Scale up from 2 to 3 and back to 2 pods by updating the Postgres manifest at runtime. ''' k8s = self.k8s - labels = "cluster-name=acid-minimal-cluster" + labels = "application=spilo,cluster-name=acid-minimal-cluster" k8s.wait_for_pg_to_scale(3) self.assertEqual(3, k8s.count_pods_with_label(labels)) @@ -339,13 +349,99 @@ class EndToEndTestCase(unittest.TestCase): } k8s.update_config(unpatch_custom_service_annotations) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_enable_disable_connection_pool(self): + ''' + For a database without connection pool, then turns it on, scale up, + turn off and on again. Test with different ways of doing this (via + enableConnectionPool or connectionPool configuration section). At the + end turn the connection pool off to not interfere with other tests. + ''' + k8s = self.k8s + service_labels = { + 'cluster-name': 'acid-minimal-cluster', + } + pod_labels = dict({ + 'connection-pool': 'acid-minimal-cluster-pooler', + }) + + pod_selector = to_selector(pod_labels) + service_selector = to_selector(service_labels) + + try: + # enable connection pool + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPool': True, + } + }) + k8s.wait_for_pod_start(pod_selector) + + pods = k8s.api.core_v1.list_namespaced_pod( + 'default', label_selector=pod_selector + ).items + + self.assertTrue(pods, 'No connection pool pods') + + k8s.wait_for_service(service_selector) + services = k8s.api.core_v1.list_namespaced_service( + 'default', label_selector=service_selector + ).items + services = [ + s for s in services + if s.metadata.name.endswith('pooler') + ] + + self.assertTrue(services, 'No connection pool service') + + # scale up connection pool deployment + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'connectionPool': { + 'numberOfInstances': 2, + }, + } + }) + + k8s.wait_for_running_pods(pod_selector, 2) + + # turn it off, keeping configuration section + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPool': False, + } + }) + k8s.wait_for_pods_to_stop(pod_selector) + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPool': True, + } + }) + k8s.wait_for_pod_start(pod_selector) + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_taint_based_eviction(self): ''' Add taint "postgres=:NoExecute" to node with master. This must cause a failover. ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' # get nodes of master and replica(s) (expected target of new master) current_master_node, current_replica_nodes = k8s.get_pg_nodes(cluster_label) @@ -473,7 +569,7 @@ class K8s: Wraps around K8 api client and helper methods. ''' - RETRY_TIMEOUT_SEC = 5 + RETRY_TIMEOUT_SEC = 10 def __init__(self): self.api = K8sApi() @@ -496,12 +592,39 @@ class K8s: # for local execution ~ 10 seconds suffices time.sleep(60) + def get_operator_pod(self): + pods = self.api.core_v1.list_namespaced_pod( + 'default', label_selector='name=postgres-operator' + ).items + + if pods: + return pods[0] + + return None + + def get_operator_log(self): + operator_pod = self.get_operator_pod() + pod_name = operator_pod.metadata.name + return self.api.core_v1.read_namespaced_pod_log( + name=pod_name, + namespace='default' + ) + def wait_for_pod_start(self, pod_labels, namespace='default'): pod_phase = 'No pod running' while pod_phase != 'Running': pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items if pods: pod_phase = pods[0].status.phase + + if pods and pod_phase != 'Running': + pod_name = pods[0].metadata.name + response = self.api.core_v1.read_namespaced_pod( + name=pod_name, + namespace=namespace + ) + print("Pod description {}".format(response)) + time.sleep(self.RETRY_TIMEOUT_SEC) def get_service_type(self, svc_labels, namespace='default'): @@ -531,10 +654,27 @@ class K8s: _ = self.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body) - labels = 'cluster-name=acid-minimal-cluster' + labels = 'application=spilo,cluster-name=acid-minimal-cluster' while self.count_pods_with_label(labels) != number_of_instances: time.sleep(self.RETRY_TIMEOUT_SEC) + def wait_for_running_pods(self, labels, number, namespace=''): + while self.count_pods_with_label(labels) != number: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_pods_to_stop(self, labels, namespace=''): + while self.count_pods_with_label(labels) != 0: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_service(self, labels, namespace='default'): + def get_services(): + return self.api.core_v1.list_namespaced_service( + namespace, label_selector=labels + ).items + + while not get_services(): + time.sleep(self.RETRY_TIMEOUT_SEC) + def count_pods_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items) @@ -571,7 +711,10 @@ class K8s: self.wait_for_operator_pod_start() def create_with_kubectl(self, path): - subprocess.run(["kubectl", "create", "-f", path]) + return subprocess.run( + ["kubectl", "create", "-f", path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) if __name__ == '__main__': diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index ceb27a5c3..c82f1eac5 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -24,7 +24,7 @@ spec: databases: foo: zalando postgresql: - version: "11" + version: "12" parameters: # Expert section shared_buffers: "32MB" max_connections: "10" diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index c9a37833f..67c3368f3 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -11,6 +11,16 @@ data: cluster_history_entries: "1000" cluster_labels: application:spilo cluster_name_label: cluster-name + # connection_pool_default_cpu_limit: "1" + # connection_pool_default_cpu_request: "500m" + # connection_pool_default_memory_limit: 100Mi + # connection_pool_default_memory_request: 100Mi + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5" + # connection_pool_max_db_connections: 60 + # connection_pool_mode: "transaction" + # connection_pool_number_of_instances: 2 + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" # custom_service_annotations: "keyx:valuez,keya:valuea" # custom_pod_annotations: "keya:valuea,keyb:valueb" db_hosted_zone: db.example.com diff --git a/manifests/minimal-postgres-manifest.yaml b/manifests/minimal-postgres-manifest.yaml index 75dfdf07f..af0add8e6 100644 --- a/manifests/minimal-postgres-manifest.yaml +++ b/manifests/minimal-postgres-manifest.yaml @@ -16,4 +16,4 @@ spec: databases: foo: zalando # dbname: owner postgresql: - version: "11" + version: "12" diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index e5bc49f83..83cd721e7 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -129,6 +129,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index 7bd5c529c..4e6858af8 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -294,6 +294,47 @@ spec: pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' scalyr_server_url: type: string + connection_pool: + type: object + properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_image: + type: string + #default: "registry.opensource.zalan.do/acid/pgbouncer" + connection_pool_max_db_connections: + type: integer + #default: 60 + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_number_of_instances: + type: integer + minimum: 2 + #default: 2 + connection_pool_default_cpu_limit: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_cpu_request: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "500m" + connection_pool_default_memory_limit: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" + connection_pool_default_memory_request: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" status: type: object additionalProperties: diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 810694124..9d609713c 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -121,3 +121,14 @@ configuration: scalyr_memory_limit: 500Mi scalyr_memory_request: 50Mi # scalyr_server_url: "" + connection_pool: + connection_pool_default_cpu_limit: "1" + connection_pool_default_cpu_request: "500m" + connection_pool_default_memory_limit: 100Mi + connection_pool_default_memory_request: 100Mi + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5" + # connection_pool_max_db_connections: 60 + connection_pool_mode: "transaction" + connection_pool_number_of_instances: 2 + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 04c789fb9..06434da14 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -70,6 +70,55 @@ spec: uid: format: uuid type: string + connectionPool: + type: object + properties: + dockerImage: + type: string + maxDBConnections: + type: integer + mode: + type: string + enum: + - "session" + - "transaction" + numberOfInstances: + type: integer + minimum: 2 + resources: + type: object + required: + - requests + - limits + properties: + limits: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + requests: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + schema: + type: string + user: + type: string databases: type: object additionalProperties: @@ -77,6 +126,8 @@ spec: # Note: usernames specified here as database owners must be declared in the users key of the spec key. dockerImage: type: string + enableConnectionPool: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: diff --git a/manifests/standby-manifest.yaml b/manifests/standby-manifest.yaml index 2b621bd10..4c8d09650 100644 --- a/manifests/standby-manifest.yaml +++ b/manifests/standby-manifest.yaml @@ -9,7 +9,7 @@ spec: size: 1Gi numberOfInstances: 1 postgresql: - version: "11" + version: "12" # Make this a standby cluster and provide the s3 bucket path of source cluster for continuous streaming. standby: s3_wal_path: "s3://path/to/bucket/containing/wal/of/source/cluster/" diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index eff47c255..dc552d3f4 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -105,6 +105,7 @@ var OperatorConfigCRDResourceColumns = []apiextv1beta1.CustomResourceColumnDefin var min0 = 0.0 var min1 = 1.0 +var min2 = 2.0 var minDisable = -1.0 // PostgresCRDResourceValidation to check applied manifest parameters @@ -176,6 +177,76 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ }, }, }, + "connectionPool": { + Type: "object", + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "dockerImage": { + Type: "string", + }, + "maxDBConnections": { + Type: "integer", + }, + "mode": { + Type: "string", + Enum: []apiextv1beta1.JSON{ + { + Raw: []byte(`"session"`), + }, + { + Raw: []byte(`"transaction"`), + }, + }, + }, + "numberOfInstances": { + Type: "integer", + Minimum: &min2, + }, + "resources": { + Type: "object", + Required: []string{"requests", "limits"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "limits": { + Type: "object", + Required: []string{"cpu", "memory"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "cpu": { + Type: "string", + Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "memory": { + Type: "string", + Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + }, + }, + "requests": { + Type: "object", + Required: []string{"cpu", "memory"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "cpu": { + Type: "string", + Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "memory": { + Type: "string", + Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + }, + }, + }, + }, + "schema": { + Type: "string", + }, + "user": { + Type: "string", + }, + }, + }, "databases": { Type: "object", AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{ @@ -188,6 +259,9 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ "dockerImage": { Type: "string", }, + "enableConnectionPool": { + Type: "boolean", + }, "enableLogicalBackup": { Type: "boolean", }, @@ -418,7 +492,7 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ Type: "string", }, "tls": { - Type: "object", + Type: "object", Required: []string{"secretName"}, Properties: map[string]apiextv1beta1.JSONSchemaProps{ "secretName": { @@ -1055,6 +1129,54 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation }, }, }, + "connection_pool": { + Type: "object", + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "connection_pool_default_cpu_limit": { + Type: "string", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "connection_pool_default_cpu_request": { + Type: "string", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "connection_pool_default_memory_limit": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "connection_pool_default_memory_request": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "connection_pool_image": { + Type: "string", + }, + "connection_pool_max_db_connections": { + Type: "integer", + }, + "connection_pool_mode": { + Type: "string", + Enum: []apiextv1beta1.JSON{ + { + Raw: []byte(`"session"`), + }, + { + Raw: []byte(`"transaction"`), + }, + }, + }, + "connection_pool_number_of_instances": { + Type: "integer", + Minimum: &min2, + }, + "connection_pool_schema": { + Type: "string", + }, + "connection_pool_user": { + Type: "string", + }, + }, + }, }, }, "status": { diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index cd04b2897..3dbe96b7f 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -1,5 +1,7 @@ package v1 +// Operator configuration CRD definition, please use snake_case for field names. + import ( "github.com/zalando/postgres-operator/pkg/util/config" @@ -151,6 +153,20 @@ type ScalyrConfiguration struct { ScalyrMemoryLimit string `json:"scalyr_memory_limit,omitempty"` } +// Defines default configuration for connection pool +type ConnectionPoolConfiguration struct { + NumberOfInstances *int32 `json:"connection_pool_number_of_instances,omitempty"` + Schema string `json:"connection_pool_schema,omitempty"` + User string `json:"connection_pool_user,omitempty"` + Image string `json:"connection_pool_image,omitempty"` + Mode string `json:"connection_pool_mode,omitempty"` + MaxDBConnections *int32 `json:"connection_pool_max_db_connections,omitempty"` + DefaultCPURequest string `json:"connection_pool_default_cpu_request,omitempty"` + DefaultMemoryRequest string `json:"connection_pool_default_memory_request,omitempty"` + DefaultCPULimit string `json:"connection_pool_default_cpu_limit,omitempty"` + DefaultMemoryLimit string `json:"connection_pool_default_memory_limit,omitempty"` +} + // OperatorLogicalBackupConfiguration defines configuration for logical backup type OperatorLogicalBackupConfiguration struct { Schedule string `json:"logical_backup_schedule,omitempty"` @@ -187,6 +203,7 @@ type OperatorConfigurationData struct { LoggingRESTAPI LoggingRESTAPIConfiguration `json:"logging_rest_api"` Scalyr ScalyrConfiguration `json:"scalyr"` LogicalBackup OperatorLogicalBackupConfiguration `json:"logical_backup"` + ConnectionPool ConnectionPoolConfiguration `json:"connection_pool"` } //Duration shortens this frequently used name diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 862db6a4e..1784f8235 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -1,5 +1,7 @@ package v1 +// Postgres CRD definition, please use CamelCase for field names. + import ( "time" @@ -27,6 +29,9 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` + EnableConnectionPool *bool `json:"enableConnectionPool,omitempty"` + ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` + TeamID string `json:"teamId"` DockerImage string `json:"dockerImage,omitempty"` @@ -162,3 +167,24 @@ type UserFlags []string type PostgresStatus struct { PostgresClusterStatus string `json:"PostgresClusterStatus"` } + +// Options for connection pooler +// +// TODO: prepared snippets of configuration, one can choose via type, e.g. +// pgbouncer-large (with higher resources) or odyssey-small (with smaller +// resources) +// Type string `json:"type,omitempty"` +// +// TODO: figure out what other important parameters of the connection pool it +// makes sense to expose. E.g. pool size (min/max boundaries), max client +// connections etc. +type ConnectionPool struct { + NumberOfInstances *int32 `json:"numberOfInstances,omitempty"` + Schema string `json:"schema,omitempty"` + User string `json:"user,omitempty"` + Mode string `json:"mode,omitempty"` + DockerImage string `json:"dockerImage,omitempty"` + MaxDBConnections *int32 `json:"maxDBConnections,omitempty"` + + Resources `json:"resources,omitempty"` +} diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index b95824f14..65a19600a 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -68,6 +68,59 @@ func (in *CloneDescription) DeepCopy() *CloneDescription { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionPool) DeepCopyInto(out *ConnectionPool) { + *out = *in + if in.NumberOfInstances != nil { + in, out := &in.NumberOfInstances, &out.NumberOfInstances + *out = new(int32) + **out = **in + } + if in.MaxDBConnections != nil { + in, out := &in.MaxDBConnections, &out.MaxDBConnections + *out = new(int32) + **out = **in + } + out.Resources = in.Resources + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPool. +func (in *ConnectionPool) DeepCopy() *ConnectionPool { + if in == nil { + return nil + } + out := new(ConnectionPool) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionPoolConfiguration) DeepCopyInto(out *ConnectionPoolConfiguration) { + *out = *in + if in.NumberOfInstances != nil { + in, out := &in.NumberOfInstances, &out.NumberOfInstances + *out = new(int32) + **out = **in + } + if in.MaxDBConnections != nil { + in, out := &in.MaxDBConnections, &out.MaxDBConnections + *out = new(int32) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPoolConfiguration. +func (in *ConnectionPoolConfiguration) DeepCopy() *ConnectionPoolConfiguration { + if in == nil { + return nil + } + out := new(ConnectionPoolConfiguration) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KubernetesMetaConfiguration) DeepCopyInto(out *KubernetesMetaConfiguration) { *out = *in @@ -255,6 +308,7 @@ func (in *OperatorConfigurationData) DeepCopyInto(out *OperatorConfigurationData out.LoggingRESTAPI = in.LoggingRESTAPI out.Scalyr = in.Scalyr out.LogicalBackup = in.LogicalBackup + in.ConnectionPool.DeepCopyInto(&out.ConnectionPool) return } @@ -417,6 +471,16 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { out.Volume = in.Volume in.Patroni.DeepCopyInto(&out.Patroni) out.Resources = in.Resources + if in.EnableConnectionPool != nil { + in, out := &in.EnableConnectionPool, &out.EnableConnectionPool + *out = new(bool) + **out = **in + } + if in.ConnectionPool != nil { + in, out := &in.ConnectionPool, &out.ConnectionPool + *out = new(ConnectionPool) + (*in).DeepCopyInto(*out) + } if in.SpiloFSGroup != nil { in, out := &in.SpiloFSGroup, &out.SpiloFSGroup *out = new(int64) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d740260d2..dba67c142 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/r3labs/diff" "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -48,11 +49,26 @@ type Config struct { PodServiceAccountRoleBinding *rbacv1.RoleBinding } +// K8S objects that are belongs to a connection pool +type ConnectionPoolObjects struct { + Deployment *appsv1.Deployment + Service *v1.Service + + // It could happen that a connection pool was enabled, but the operator was + // not able to properly process a corresponding event or was restarted. In + // this case we will miss missing/require situation and a lookup function + // will not be installed. To avoid synchronizing it all the time to prevent + // this, we can remember the result in memory at least until the next + // restart. + LookupFunction bool +} + type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet + ConnectionPool *ConnectionPoolObjects PodDisruptionBudget *policybeta1.PodDisruptionBudget //Pods are treated separately //PVCs are treated separately @@ -184,7 +200,8 @@ func (c *Cluster) isNewCluster() bool { func (c *Cluster) initUsers() error { c.setProcessName("initializing users") - // clear our the previous state of the cluster users (in case we are running a sync). + // clear our the previous state of the cluster users (in case we are + // running a sync). c.systemUsers = map[string]spec.PgUser{} c.pgUsers = map[string]spec.PgUser{} @@ -292,8 +309,10 @@ func (c *Cluster) Create() error { } c.logger.Infof("pods are ready") - // create database objects unless we are running without pods or disabled that feature explicitly + // create database objects unless we are running without pods or disabled + // that feature explicitly if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { + c.logger.Infof("Create roles") if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) } @@ -316,6 +335,26 @@ func (c *Cluster) Create() error { c.logger.Errorf("could not list resources: %v", err) } + // Create connection pool deployment and services if necessary. Since we + // need to peform some operations with the database itself (e.g. install + // lookup function), do it as the last step, when everything is available. + // + // Do not consider connection pool as a strict requirement, and if + // something fails, report warning + if c.needConnectionPool() { + if c.ConnectionPool != nil { + c.logger.Warning("Connection pool already exists in the cluster") + return nil + } + connPool, err := c.createConnectionPool(c.installLookupFunction) + if err != nil { + c.logger.Warningf("could not create connection pool: %v", err) + return nil + } + c.logger.Infof("connection pool %q has been successfully created", + util.NameFromMeta(connPool.Deployment.ObjectMeta)) + } + return nil } @@ -571,7 +610,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } - if !reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) { + // connection pool needs one system user created, which is done in + // initUsers. Check if it needs to be called. + sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) + needConnPool := c.needConnectionPoolWorker(&newSpec.Spec) + if !sameUsers || needConnPool { c.logger.Debugf("syncing secrets") if err := c.initUsers(); err != nil { c.logger.Errorf("could not init users: %v", err) @@ -695,6 +738,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } + // sync connection pool + if err := c.syncConnectionPool(oldSpec, newSpec, c.installLookupFunction); err != nil { + return fmt.Errorf("could not sync connection pool: %v", err) + } + return nil } @@ -746,6 +794,12 @@ func (c *Cluster) Delete() { c.logger.Warningf("could not remove leftover patroni objects; %v", err) } + // Delete connection pool objects anyway, even if it's not mentioned in the + // manifest, just to not keep orphaned components in case if something went + // wrong + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } } //NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status). @@ -812,6 +866,35 @@ func (c *Cluster) initSystemUsers() { Name: c.OpConfig.ReplicationUsername, Password: util.RandomPassword(constants.PasswordLength), } + + // Connection pool user is an exception, if requested it's going to be + // created by operator as a normal pgUser + if c.needConnectionPool() { + // initialize empty connection pool if not done yet + if c.Spec.ConnectionPool == nil { + c.Spec.ConnectionPool = &acidv1.ConnectionPool{} + } + + username := util.Coalesce( + c.Spec.ConnectionPool.User, + c.OpConfig.ConnectionPool.User) + + // connection pooler application should be able to login with this role + connPoolUser := spec.PgUser{ + Origin: spec.RoleConnectionPool, + Name: username, + Flags: []string{constants.RoleFlagLogin}, + Password: util.RandomPassword(constants.PasswordLength), + } + + if _, exists := c.pgUsers[username]; !exists { + c.pgUsers[username] = connPoolUser + } + + if _, exists := c.systemUsers[constants.ConnectionPoolUserKeyName]; !exists { + c.systemUsers[constants.ConnectionPoolUserKeyName] = connPoolUser + } + } } func (c *Cluster) initRobotUsers() error { @@ -1138,3 +1221,119 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error { return c.deleteClusterObject(get, deleteConfigMapFn, "configmap") } + +// Test if two connection pool configuration needs to be synced. For simplicity +// compare not the actual K8S objects, but the configuration itself and request +// sync if there is any difference. +func (c *Cluster) needSyncConnPoolSpecs(oldSpec, newSpec *acidv1.ConnectionPool) (sync bool, reasons []string) { + reasons = []string{} + sync = false + + changelog, err := diff.Diff(oldSpec, newSpec) + if err != nil { + c.logger.Infof("Cannot get diff, do not do anything, %+v", err) + return false, reasons + } + + if len(changelog) > 0 { + sync = true + } + + for _, change := range changelog { + msg := fmt.Sprintf("%s %+v from '%+v' to '%+v'", + change.Type, change.Path, change.From, change.To) + reasons = append(reasons, msg) + } + + return sync, reasons +} + +func syncResources(a, b *v1.ResourceRequirements) bool { + for _, res := range []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } { + if !a.Limits[res].Equal(b.Limits[res]) || + !a.Requests[res].Equal(b.Requests[res]) { + return true + } + } + + return false +} + +// Check if we need to synchronize connection pool deployment due to new +// defaults, that are different from what we see in the DeploymentSpec +func (c *Cluster) needSyncConnPoolDefaults( + spec *acidv1.ConnectionPool, + deployment *appsv1.Deployment) (sync bool, reasons []string) { + + reasons = []string{} + sync = false + + config := c.OpConfig.ConnectionPool + podTemplate := deployment.Spec.Template + poolContainer := podTemplate.Spec.Containers[constants.ConnPoolContainer] + + if spec == nil { + spec = &acidv1.ConnectionPool{} + } + + if spec.NumberOfInstances == nil && + *deployment.Spec.Replicas != *config.NumberOfInstances { + + sync = true + msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)", + *deployment.Spec.Replicas, *config.NumberOfInstances) + reasons = append(reasons, msg) + } + + if spec.DockerImage == "" && + poolContainer.Image != config.Image { + + sync = true + msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", + poolContainer.Image, config.Image) + reasons = append(reasons, msg) + } + + expectedResources, err := generateResourceRequirements(spec.Resources, + c.makeDefaultConnPoolResources()) + + // An error to generate expected resources means something is not quite + // right, but for the purpose of robustness do not panic here, just report + // and ignore resources comparison (in the worst case there will be no + // updates for new resource values). + if err == nil && syncResources(&poolContainer.Resources, expectedResources) { + sync = true + msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", + poolContainer.Resources, expectedResources) + reasons = append(reasons, msg) + } + + if err != nil { + c.logger.Warningf("Cannot generate expected resources, %v", err) + } + + for _, env := range poolContainer.Env { + if spec.User == "" && env.Name == "PGUSER" { + ref := env.ValueFrom.SecretKeyRef.LocalObjectReference + + if ref.Name != c.credentialSecretName(config.User) { + sync = true + msg := fmt.Sprintf("Pool user is different (having %s, required %s)", + ref.Name, config.User) + reasons = append(reasons, msg) + } + } + + if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema { + sync = true + msg := fmt.Sprintf("Pool schema is different (having %s, required %s)", + env.Value, config.Schema) + reasons = append(reasons, msg) + } + } + + return sync, reasons +} diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 9efbc51c6..a1b361642 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -9,6 +9,7 @@ import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/teams" v1 "k8s.io/api/core/v1" @@ -704,3 +705,20 @@ func TestServiceAnnotations(t *testing.T) { }) } } + +func TestInitSystemUsers(t *testing.T) { + testName := "Test system users initialization" + + // default cluster without connection pool + cl.initSystemUsers() + if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; exist { + t.Errorf("%s, connection pool user is present", testName) + } + + // cluster with connection pool + cl.Spec.EnableConnectionPool = boolToPointer(true) + cl.initSystemUsers() + if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; !exist { + t.Errorf("%s, connection pool user is not present", testName) + } +} diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 07ea011a6..bca68c188 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -1,10 +1,12 @@ package cluster import ( + "bytes" "database/sql" "fmt" "net" "strings" + "text/template" "time" "github.com/lib/pq" @@ -28,13 +30,37 @@ const ( getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` + connectionPoolLookup = ` + CREATE SCHEMA IF NOT EXISTS {{.pool_schema}}; + + CREATE OR REPLACE FUNCTION {{.pool_schema}}.user_lookup( + in i_username text, out uname text, out phash text) + RETURNS record AS $$ + BEGIN + SELECT usename, passwd FROM pg_catalog.pg_shadow + WHERE usename = i_username INTO uname, phash; + RETURN; + END; + $$ LANGUAGE plpgsql SECURITY DEFINER; + + REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text) + FROM public, {{.pool_user}}; + GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text) + TO {{.pool_user}}; + GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}}; + ` ) -func (c *Cluster) pgConnectionString() string { +func (c *Cluster) pgConnectionString(dbname string) string { password := c.systemUsers[constants.SuperuserKeyName].Password - return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user='%s' password='%s' connect_timeout='%d'", + if dbname == "" { + dbname = "postgres" + } + + return fmt.Sprintf("host='%s' dbname='%s' sslmode=require user='%s' password='%s' connect_timeout='%d'", fmt.Sprintf("%s.%s.svc.%s", c.Name, c.Namespace, c.OpConfig.ClusterDomain), + dbname, c.systemUsers[constants.SuperuserKeyName].Name, strings.Replace(password, "$", "\\$", -1), constants.PostgresConnectTimeout/time.Second) @@ -49,13 +75,17 @@ func (c *Cluster) databaseAccessDisabled() bool { } func (c *Cluster) initDbConn() error { + return c.initDbConnWithName("") +} + +func (c *Cluster) initDbConnWithName(dbname string) error { c.setProcessName("initializing db connection") if c.pgDb != nil { return nil } var conn *sql.DB - connstring := c.pgConnectionString() + connstring := c.pgConnectionString(dbname) finalerr := retryutil.Retry(constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { @@ -94,6 +124,10 @@ func (c *Cluster) initDbConn() error { return nil } +func (c *Cluster) connectionIsClosed() bool { + return c.pgDb == nil +} + func (c *Cluster) closeDbConn() (err error) { c.setProcessName("closing db connection") if c.pgDb != nil { @@ -243,3 +277,88 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin return result } + +// Creates a connection pool credentials lookup function in every database to +// perform remote authentification. +func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error { + var stmtBytes bytes.Buffer + c.logger.Info("Installing lookup function") + + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init database connection") + } + defer func() { + if c.connectionIsClosed() { + return + } + + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + + currentDatabases, err := c.getDatabases() + if err != nil { + msg := "could not get databases to install pool lookup function: %v" + return fmt.Errorf(msg, err) + } + + templater := template.Must(template.New("sql").Parse(connectionPoolLookup)) + + for dbname, _ := range currentDatabases { + if dbname == "template0" || dbname == "template1" { + continue + } + + if err := c.initDbConnWithName(dbname); err != nil { + return fmt.Errorf("could not init database connection to %s", dbname) + } + + c.logger.Infof("Install pool lookup function into %s", dbname) + + params := TemplateParams{ + "pool_schema": poolSchema, + "pool_user": poolUser, + } + + if err := templater.Execute(&stmtBytes, params); err != nil { + c.logger.Errorf("could not prepare sql statement %+v: %v", + params, err) + // process other databases + continue + } + + // golang sql will do retries couple of times if pq driver reports + // connections issues (driver.ErrBadConn), but since our query is + // idempotent, we can retry in a view of other errors (e.g. due to + // failover a db is temporary in a read-only mode or so) to make sure + // it was applied. + execErr := retryutil.Retry( + constants.PostgresConnectTimeout, + constants.PostgresConnectRetryTimeout, + func() (bool, error) { + if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { + msg := fmt.Errorf("could not execute sql statement %s: %v", + stmtBytes.String(), err) + return false, msg + } + + return true, nil + }) + + if execErr != nil { + c.logger.Errorf("could not execute after retries %s: %v", + stmtBytes.String(), err) + // process other databases + continue + } + + c.logger.Infof("Pool lookup function installed into %s", dbname) + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + } + + c.ConnectionPool.LookupFunction = true + return nil +} diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 43b5b3636..2c40bb0ba 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -18,6 +18,7 @@ import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" + pkgspec "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" @@ -32,10 +33,12 @@ const ( patroniPGBinariesParameterName = "bin_dir" patroniPGParametersParameterName = "parameters" patroniPGHBAConfParameterName = "pg_hba" + localHost = "127.0.0.1/32" + connectionPoolContainer = "connection-pool" + pgPort = 5432 // the gid of the postgres user in the default spilo image spiloPostgresGID = 103 - localHost = "127.0.0.1/32" ) type pgUser struct { @@ -71,6 +74,10 @@ func (c *Cluster) statefulSetName() string { return c.Name } +func (c *Cluster) connPoolName() string { + return c.Name + "-pooler" +} + func (c *Cluster) endpointName(role PostgresRole) string { name := c.Name if role == Replica { @@ -89,6 +96,28 @@ func (c *Cluster) serviceName(role PostgresRole) string { return name } +func (c *Cluster) serviceAddress(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return service.ObjectMeta.Name + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + +func (c *Cluster) servicePort(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return fmt.Sprint(service.Spec.Ports[0].Port) + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + func (c *Cluster) podDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } @@ -97,10 +126,39 @@ func (c *Cluster) makeDefaultResources() acidv1.Resources { config := c.OpConfig - defaultRequests := acidv1.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest} - defaultLimits := acidv1.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit} + defaultRequests := acidv1.ResourceDescription{ + CPU: config.Resources.DefaultCPURequest, + Memory: config.Resources.DefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.Resources.DefaultCPULimit, + Memory: config.Resources.DefaultMemoryLimit, + } - return acidv1.Resources{ResourceRequests: defaultRequests, ResourceLimits: defaultLimits} + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } +} + +// Generate default resource section for connection pool deployment, to be used +// if nothing custom is specified in the manifest +func (c *Cluster) makeDefaultConnPoolResources() acidv1.Resources { + config := c.OpConfig + + defaultRequests := acidv1.ResourceDescription{ + CPU: config.ConnectionPool.ConnPoolDefaultCPURequest, + Memory: config.ConnectionPool.ConnPoolDefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.ConnectionPool.ConnPoolDefaultCPULimit, + Memory: config.ConnectionPool.ConnPoolDefaultMemoryLimit, + } + + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } } func generateResourceRequirements(resources acidv1.Resources, defaultResources acidv1.Resources) (*v1.ResourceRequirements, error) { @@ -124,12 +182,12 @@ func generateResourceRequirements(resources acidv1.Resources, defaultResources a return &result, nil } -func fillResourceList(pgSpec acidv1.ResourceDescription, defaults acidv1.ResourceDescription) (v1.ResourceList, error) { +func fillResourceList(spec acidv1.ResourceDescription, defaults acidv1.ResourceDescription) (v1.ResourceList, error) { var err error requests := v1.ResourceList{} - if pgSpec.CPU != "" { - requests[v1.ResourceCPU], err = resource.ParseQuantity(pgSpec.CPU) + if spec.CPU != "" { + requests[v1.ResourceCPU], err = resource.ParseQuantity(spec.CPU) if err != nil { return nil, fmt.Errorf("could not parse CPU quantity: %v", err) } @@ -139,8 +197,8 @@ func fillResourceList(pgSpec acidv1.ResourceDescription, defaults acidv1.Resourc return nil, fmt.Errorf("could not parse default CPU quantity: %v", err) } } - if pgSpec.Memory != "" { - requests[v1.ResourceMemory], err = resource.ParseQuantity(pgSpec.Memory) + if spec.Memory != "" { + requests[v1.ResourceMemory], err = resource.ParseQuantity(spec.Memory) if err != nil { return nil, fmt.Errorf("could not parse memory quantity: %v", err) } @@ -320,7 +378,11 @@ func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]stri return *tolerationsSpec } - if len(podToleration["key"]) > 0 || len(podToleration["operator"]) > 0 || len(podToleration["value"]) > 0 || len(podToleration["effect"]) > 0 { + if len(podToleration["key"]) > 0 || + len(podToleration["operator"]) > 0 || + len(podToleration["value"]) > 0 || + len(podToleration["effect"]) > 0 { + return []v1.Toleration{ { Key: podToleration["key"], @@ -424,9 +486,9 @@ func generateSidecarContainers(sidecars []acidv1.Sidecar, // Check whether or not we're requested to mount an shm volume, // taking into account that PostgreSQL manifest has precedence. -func mountShmVolumeNeeded(opConfig config.Config, pgSpec *acidv1.PostgresSpec) *bool { - if pgSpec.ShmVolume != nil && *pgSpec.ShmVolume { - return pgSpec.ShmVolume +func mountShmVolumeNeeded(opConfig config.Config, spec *acidv1.PostgresSpec) *bool { + if spec.ShmVolume != nil && *spec.ShmVolume { + return spec.ShmVolume } return opConfig.ShmVolume @@ -550,10 +612,6 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri Name: "KUBERNETES_ROLE_LABEL", Value: c.OpConfig.PodRoleLabel, }, - { - Name: "KUBERNETES_LABELS", - Value: labels.Set(c.OpConfig.ClusterLabels).String(), - }, { Name: "PGPASSWORD_SUPERUSER", ValueFrom: &v1.EnvVarSource{ @@ -589,6 +647,12 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri Value: c.OpConfig.PamRoleName, }, } + // Spilo expects cluster labels as JSON + if clusterLabels, err := json.Marshal(labels.Set(c.OpConfig.ClusterLabels)); err != nil { + envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_LABELS", Value: labels.Set(c.OpConfig.ClusterLabels).String()}) + } else { + envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_LABELS", Value: string(clusterLabels)}) + } if spiloConfiguration != "" { envVars = append(envVars, v1.EnvVar{Name: "SPILO_CONFIGURATION", Value: spiloConfiguration}) } @@ -767,7 +831,7 @@ func (c *Cluster) getNewPgVersion(container v1.Container, newPgVersion string) ( return newPgVersion, nil } -func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.StatefulSet, error) { +func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.StatefulSet, error) { var ( err error @@ -783,14 +847,14 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat // controller adjusts the default memory request at operator startup - request := pgSpec.Resources.ResourceRequests.Memory + request := spec.Resources.ResourceRequests.Memory if request == "" { - request = c.OpConfig.DefaultMemoryRequest + request = c.OpConfig.Resources.DefaultMemoryRequest } - limit := pgSpec.Resources.ResourceLimits.Memory + limit := spec.Resources.ResourceLimits.Memory if limit == "" { - limit = c.OpConfig.DefaultMemoryLimit + limit = c.OpConfig.Resources.DefaultMemoryLimit } isSmaller, err := util.IsSmallerQuantity(request, limit) @@ -799,7 +863,7 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat } if isSmaller { c.logger.Warningf("The memory request of %v for the Postgres container is increased to match the memory limit of %v.", request, limit) - pgSpec.Resources.ResourceRequests.Memory = limit + spec.Resources.ResourceRequests.Memory = limit } @@ -807,17 +871,17 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat // as this sidecar is managed separately // adjust sidecar containers defined for that particular cluster - for _, sidecar := range pgSpec.Sidecars { + for _, sidecar := range spec.Sidecars { // TODO #413 sidecarRequest := sidecar.Resources.ResourceRequests.Memory if request == "" { - request = c.OpConfig.DefaultMemoryRequest + request = c.OpConfig.Resources.DefaultMemoryRequest } sidecarLimit := sidecar.Resources.ResourceLimits.Memory if limit == "" { - limit = c.OpConfig.DefaultMemoryLimit + limit = c.OpConfig.Resources.DefaultMemoryLimit } isSmaller, err := util.IsSmallerQuantity(sidecarRequest, sidecarLimit) @@ -834,21 +898,21 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat defaultResources := c.makeDefaultResources() - resourceRequirements, err := generateResourceRequirements(pgSpec.Resources, defaultResources) + resourceRequirements, err := generateResourceRequirements(spec.Resources, defaultResources) if err != nil { return nil, fmt.Errorf("could not generate resource requirements: %v", err) } - if pgSpec.InitContainers != nil && len(pgSpec.InitContainers) > 0 { + if spec.InitContainers != nil && len(spec.InitContainers) > 0 { if c.OpConfig.EnableInitContainers != nil && !(*c.OpConfig.EnableInitContainers) { c.logger.Warningf("initContainers specified but disabled in configuration - next statefulset creation would fail") } - initContainers = pgSpec.InitContainers + initContainers = spec.InitContainers } customPodEnvVarsList := make([]v1.EnvVar, 0) - if c.OpConfig.PodEnvironmentConfigMap != (spec.NamespacedName{}) { + if c.OpConfig.PodEnvironmentConfigMap != (pkgspec.NamespacedName{}) { var cm *v1.ConfigMap cm, err = c.KubeClient.ConfigMaps(c.OpConfig.PodEnvironmentConfigMap.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap.Name, metav1.GetOptions{}) if err != nil { @@ -866,33 +930,33 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat sort.Slice(customPodEnvVarsList, func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name }) } - if pgSpec.StandbyCluster != nil && pgSpec.StandbyCluster.S3WalPath == "" { + if spec.StandbyCluster != nil && spec.StandbyCluster.S3WalPath == "" { return nil, fmt.Errorf("s3_wal_path is empty for standby cluster") } // backward compatible check for InitContainers - if pgSpec.InitContainersOld != nil { + if spec.InitContainersOld != nil { msg := "Manifest parameter init_containers is deprecated." - if pgSpec.InitContainers == nil { + if spec.InitContainers == nil { c.logger.Warningf("%s Consider using initContainers instead.", msg) - pgSpec.InitContainers = pgSpec.InitContainersOld + spec.InitContainers = spec.InitContainersOld } else { c.logger.Warningf("%s Only value from initContainers is used", msg) } } // backward compatible check for PodPriorityClassName - if pgSpec.PodPriorityClassNameOld != "" { + if spec.PodPriorityClassNameOld != "" { msg := "Manifest parameter pod_priority_class_name is deprecated." - if pgSpec.PodPriorityClassName == "" { + if spec.PodPriorityClassName == "" { c.logger.Warningf("%s Consider using podPriorityClassName instead.", msg) - pgSpec.PodPriorityClassName = pgSpec.PodPriorityClassNameOld + spec.PodPriorityClassName = spec.PodPriorityClassNameOld } else { c.logger.Warningf("%s Only value from podPriorityClassName is used", msg) } } - spiloConfiguration, err := generateSpiloJSONConfiguration(&pgSpec.PostgresqlParam, &pgSpec.Patroni, c.OpConfig.PamRoleName, c.logger) + spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, c.OpConfig.PamRoleName, c.logger) if err != nil { return nil, fmt.Errorf("could not generate Spilo JSON configuration: %v", err) } @@ -901,24 +965,24 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat spiloEnvVars := c.generateSpiloPodEnvVars( c.Postgresql.GetUID(), spiloConfiguration, - &pgSpec.Clone, - pgSpec.StandbyCluster, + &spec.Clone, + spec.StandbyCluster, customPodEnvVarsList, ) // pickup the docker image for the spilo container - effectiveDockerImage := util.Coalesce(pgSpec.DockerImage, c.OpConfig.DockerImage) + effectiveDockerImage := util.Coalesce(spec.DockerImage, c.OpConfig.DockerImage) // determine the FSGroup for the spilo pod effectiveFSGroup := c.OpConfig.Resources.SpiloFSGroup - if pgSpec.SpiloFSGroup != nil { - effectiveFSGroup = pgSpec.SpiloFSGroup + if spec.SpiloFSGroup != nil { + effectiveFSGroup = spec.SpiloFSGroup } - volumeMounts := generateVolumeMounts(pgSpec.Volume) + volumeMounts := generateVolumeMounts(spec.Volume) // configure TLS with a custom secret volume - if pgSpec.TLS != nil && pgSpec.TLS.SecretName != "" { + if spec.TLS != nil && spec.TLS.SecretName != "" { if effectiveFSGroup == nil { c.logger.Warnf("Setting the default FSGroup to satisfy the TLS configuration") fsGroup := int64(spiloPostgresGID) @@ -931,7 +995,7 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat Name: "tls-secret", VolumeSource: v1.VolumeSource{ Secret: &v1.SecretVolumeSource{ - SecretName: pgSpec.TLS.SecretName, + SecretName: spec.TLS.SecretName, DefaultMode: &defaultMode, }, }, @@ -945,16 +1009,16 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat }) // use the same filenames as Secret resources by default - certFile := ensurePath(pgSpec.TLS.CertificateFile, mountPath, "tls.crt") - privateKeyFile := ensurePath(pgSpec.TLS.PrivateKeyFile, mountPath, "tls.key") + certFile := ensurePath(spec.TLS.CertificateFile, mountPath, "tls.crt") + privateKeyFile := ensurePath(spec.TLS.PrivateKeyFile, mountPath, "tls.key") spiloEnvVars = append( spiloEnvVars, v1.EnvVar{Name: "SSL_CERTIFICATE_FILE", Value: certFile}, v1.EnvVar{Name: "SSL_PRIVATE_KEY_FILE", Value: privateKeyFile}, ) - if pgSpec.TLS.CAFile != "" { - caFile := ensurePath(pgSpec.TLS.CAFile, mountPath, "") + if spec.TLS.CAFile != "" { + caFile := ensurePath(spec.TLS.CAFile, mountPath, "") spiloEnvVars = append( spiloEnvVars, v1.EnvVar{Name: "SSL_CA_FILE", Value: caFile}, @@ -973,7 +1037,7 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat ) // resolve conflicts between operator-global and per-cluster sidecars - sideCars := c.mergeSidecars(pgSpec.Sidecars) + sideCars := c.mergeSidecars(spec.Sidecars) resourceRequirementsScalyrSidecar := makeResources( c.OpConfig.ScalyrCPURequest, @@ -1003,10 +1067,10 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat } } - tolerationSpec := tolerations(&pgSpec.Tolerations, c.OpConfig.PodToleration) - effectivePodPriorityClassName := util.Coalesce(pgSpec.PodPriorityClassName, c.OpConfig.PodPriorityClassName) + tolerationSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration) + effectivePodPriorityClassName := util.Coalesce(spec.PodPriorityClassName, c.OpConfig.PodPriorityClassName) - annotations := c.generatePodAnnotations(pgSpec) + annotations := c.generatePodAnnotations(spec) // generate pod template for the statefulset, based on the spilo container and sidecars podTemplate, err = generatePodTemplate( @@ -1023,7 +1087,7 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat c.OpConfig.PodServiceAccountName, c.OpConfig.KubeIAMRole, effectivePodPriorityClassName, - mountShmVolumeNeeded(c.OpConfig, pgSpec), + mountShmVolumeNeeded(c.OpConfig, spec), c.OpConfig.EnablePodAntiAffinity, c.OpConfig.PodAntiAffinityTopologyKey, c.OpConfig.AdditionalSecretMount, @@ -1034,12 +1098,12 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat return nil, fmt.Errorf("could not generate pod template: %v", err) } - if volumeClaimTemplate, err = generatePersistentVolumeClaimTemplate(pgSpec.Volume.Size, - pgSpec.Volume.StorageClass); err != nil { + if volumeClaimTemplate, err = generatePersistentVolumeClaimTemplate(spec.Volume.Size, + spec.Volume.StorageClass); err != nil { return nil, fmt.Errorf("could not generate volume claim template: %v", err) } - numberOfInstances := c.getNumberOfInstances(pgSpec) + numberOfInstances := c.getNumberOfInstances(spec) // the operator has domain-specific logic on how to do rolling updates of PG clusters // so we do not use default rolling updates implemented by stateful sets @@ -1076,13 +1140,13 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat return statefulSet, nil } -func (c *Cluster) generatePodAnnotations(pgSpec *acidv1.PostgresSpec) map[string]string { +func (c *Cluster) generatePodAnnotations(spec *acidv1.PostgresSpec) map[string]string { annotations := make(map[string]string) for k, v := range c.OpConfig.CustomPodAnnotations { annotations[k] = v } - if pgSpec != nil || pgSpec.PodAnnotations != nil { - for k, v := range pgSpec.PodAnnotations { + if spec != nil || spec.PodAnnotations != nil { + for k, v := range spec.PodAnnotations { annotations[k] = v } } @@ -1148,13 +1212,13 @@ func (c *Cluster) mergeSidecars(sidecars []acidv1.Sidecar) []acidv1.Sidecar { return result } -func (c *Cluster) getNumberOfInstances(pgSpec *acidv1.PostgresSpec) int32 { +func (c *Cluster) getNumberOfInstances(spec *acidv1.PostgresSpec) int32 { min := c.OpConfig.MinInstances max := c.OpConfig.MaxInstances - cur := pgSpec.NumberOfInstances + cur := spec.NumberOfInstances newcur := cur - if pgSpec.StandbyCluster != nil { + if spec.StandbyCluster != nil { if newcur == 1 { min = newcur max = newcur @@ -1309,15 +1373,15 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) return &secret } -func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, pgSpec *acidv1.PostgresSpec) bool { +func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *acidv1.PostgresSpec) bool { switch role { case Replica: // if the value is explicitly set in a Postgresql manifest, follow this setting - if pgSpec.EnableReplicaLoadBalancer != nil { - return *pgSpec.EnableReplicaLoadBalancer + if spec.EnableReplicaLoadBalancer != nil { + return *spec.EnableReplicaLoadBalancer } // otherwise, follow the operator configuration @@ -1325,8 +1389,8 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, pgSpec * case Master: - if pgSpec.EnableMasterLoadBalancer != nil { - return *pgSpec.EnableMasterLoadBalancer + if spec.EnableMasterLoadBalancer != nil { + return *spec.EnableMasterLoadBalancer } return c.OpConfig.EnableMasterLoadBalancer @@ -1337,7 +1401,7 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, pgSpec * } -func (c *Cluster) generateService(role PostgresRole, pgSpec *acidv1.PostgresSpec) *v1.Service { +func (c *Cluster) generateService(role PostgresRole, spec *acidv1.PostgresSpec) *v1.Service { serviceSpec := v1.ServiceSpec{ Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, Type: v1.ServiceTypeClusterIP, @@ -1347,12 +1411,12 @@ func (c *Cluster) generateService(role PostgresRole, pgSpec *acidv1.PostgresSpec serviceSpec.Selector = c.roleLabelsSet(false, role) } - if c.shouldCreateLoadBalancerForService(role, pgSpec) { + if c.shouldCreateLoadBalancerForService(role, spec) { - // pgSpec.AllowedSourceRanges evaluates to the empty slice of zero length + // spec.AllowedSourceRanges evaluates to the empty slice of zero length // when omitted or set to 'null'/empty sequence in the PG manifest - if len(pgSpec.AllowedSourceRanges) > 0 { - serviceSpec.LoadBalancerSourceRanges = pgSpec.AllowedSourceRanges + if len(spec.AllowedSourceRanges) > 0 { + serviceSpec.LoadBalancerSourceRanges = spec.AllowedSourceRanges } else { // safe default value: lock a load balancer only to the local address unless overridden explicitly serviceSpec.LoadBalancerSourceRanges = []string{localHost} @@ -1371,7 +1435,7 @@ func (c *Cluster) generateService(role PostgresRole, pgSpec *acidv1.PostgresSpec Name: c.serviceName(role), Namespace: c.Namespace, Labels: c.roleLabelsSet(true, role), - Annotations: c.generateServiceAnnotations(role, pgSpec), + Annotations: c.generateServiceAnnotations(role, spec), }, Spec: serviceSpec, } @@ -1379,19 +1443,19 @@ func (c *Cluster) generateService(role PostgresRole, pgSpec *acidv1.PostgresSpec return service } -func (c *Cluster) generateServiceAnnotations(role PostgresRole, pgSpec *acidv1.PostgresSpec) map[string]string { +func (c *Cluster) generateServiceAnnotations(role PostgresRole, spec *acidv1.PostgresSpec) map[string]string { annotations := make(map[string]string) for k, v := range c.OpConfig.CustomServiceAnnotations { annotations[k] = v } - if pgSpec != nil || pgSpec.ServiceAnnotations != nil { - for k, v := range pgSpec.ServiceAnnotations { + if spec != nil || spec.ServiceAnnotations != nil { + for k, v := range spec.ServiceAnnotations { annotations[k] = v } } - if c.shouldCreateLoadBalancerForService(role, pgSpec) { + if c.shouldCreateLoadBalancerForService(role, spec) { var dnsName string if role == Master { dnsName = c.masterDNSName() @@ -1779,6 +1843,306 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) { return "logical-backup-" + c.clusterName().Name } +// Generate pool size related environment variables. +// +// MAX_DB_CONN would specify the global maximum for connections to a target +// database. +// +// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough. +// +// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when +// most of the queries coming through a connection pooler are from the same +// user to the same db). In case if we want to spin up more connection pool +// instances, take this into account and maintain the same number of +// connections. +// +// MIN_SIZE is a pool minimal size, to prevent situation when sudden workload +// have to wait for spinning up a new connections. +// +// RESERVE_SIZE is how many additional connections to allow for a pool. +func (c *Cluster) getConnPoolEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar { + effectiveMode := util.Coalesce( + spec.ConnectionPool.Mode, + c.OpConfig.ConnectionPool.Mode) + + numberOfInstances := spec.ConnectionPool.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPool.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + effectiveMaxDBConn := util.CoalesceInt32( + spec.ConnectionPool.MaxDBConnections, + c.OpConfig.ConnectionPool.MaxDBConnections) + + if effectiveMaxDBConn == nil { + effectiveMaxDBConn = k8sutil.Int32ToPointer( + constants.ConnPoolMaxDBConnections) + } + + maxDBConn := *effectiveMaxDBConn / *numberOfInstances + + defaultSize := maxDBConn / 2 + minSize := defaultSize / 2 + reserveSize := minSize + + return []v1.EnvVar{ + { + Name: "CONNECTION_POOL_PORT", + Value: fmt.Sprint(pgPort), + }, + { + Name: "CONNECTION_POOL_MODE", + Value: effectiveMode, + }, + { + Name: "CONNECTION_POOL_DEFAULT_SIZE", + Value: fmt.Sprint(defaultSize), + }, + { + Name: "CONNECTION_POOL_MIN_SIZE", + Value: fmt.Sprint(minSize), + }, + { + Name: "CONNECTION_POOL_RESERVE_SIZE", + Value: fmt.Sprint(reserveSize), + }, + { + Name: "CONNECTION_POOL_MAX_CLIENT_CONN", + Value: fmt.Sprint(constants.ConnPoolMaxClientConnections), + }, + { + Name: "CONNECTION_POOL_MAX_DB_CONN", + Value: fmt.Sprint(maxDBConn), + }, + } +} + +func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( + *v1.PodTemplateSpec, error) { + + gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) + resources, err := generateResourceRequirements( + spec.ConnectionPool.Resources, + c.makeDefaultConnPoolResources()) + + effectiveDockerImage := util.Coalesce( + spec.ConnectionPool.DockerImage, + c.OpConfig.ConnectionPool.Image) + + effectiveSchema := util.Coalesce( + spec.ConnectionPool.Schema, + c.OpConfig.ConnectionPool.Schema) + + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements: %v", err) + } + + secretSelector := func(key string) *v1.SecretKeySelector { + effectiveUser := util.Coalesce( + spec.ConnectionPool.User, + c.OpConfig.ConnectionPool.User) + + return &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(effectiveUser), + }, + Key: key, + } + } + + envVars := []v1.EnvVar{ + { + Name: "PGHOST", + Value: c.serviceAddress(Master), + }, + { + Name: "PGPORT", + Value: c.servicePort(Master), + }, + { + Name: "PGUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + // the convention is to use the same schema name as + // connection pool username + { + Name: "PGSCHEMA", + Value: effectiveSchema, + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("password"), + }, + }, + } + + envVars = append(envVars, c.getConnPoolEnvVars(spec)...) + + poolerContainer := v1.Container{ + Name: connectionPoolContainer, + Image: effectiveDockerImage, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resources, + Ports: []v1.ContainerPort{ + { + ContainerPort: pgPort, + Protocol: v1.ProtocolTCP, + }, + }, + Env: envVars, + } + + podTemplate := &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: c.connPoolLabelsSelector().MatchLabels, + Namespace: c.Namespace, + Annotations: c.generatePodAnnotations(spec), + }, + Spec: v1.PodSpec{ + ServiceAccountName: c.OpConfig.PodServiceAccountName, + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{poolerContainer}, + // TODO: add tolerations to scheduler pooler on the same node + // as database + //Tolerations: *tolerationsSpec, + }, + } + + return podTemplate, nil +} + +// Return an array of ownerReferences to make an arbitraty object dependent on +// the StatefulSet. Dependency is made on StatefulSet instead of PostgreSQL CRD +// while the former is represent the actual state, and only it's deletion means +// we delete the cluster (e.g. if CRD was deleted, StatefulSet somehow +// survived, we can't delete an object because it will affect the functioning +// cluster). +func (c *Cluster) ownerReferences() []metav1.OwnerReference { + controller := true + + if c.Statefulset == nil { + c.logger.Warning("Cannot get owner reference, no statefulset") + return []metav1.OwnerReference{} + } + + return []metav1.OwnerReference{ + { + UID: c.Statefulset.ObjectMeta.UID, + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: c.Statefulset.ObjectMeta.Name, + Controller: &controller, + }, + } +} + +func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( + *appsv1.Deployment, error) { + + // there are two ways to enable connection pooler, either to specify a + // connectionPool section or enableConnectionPool. In the second case + // spec.connectionPool will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPool == nil { + spec.ConnectionPool = &acidv1.ConnectionPool{} + } + + podTemplate, err := c.generateConnPoolPodTemplate(spec) + numberOfInstances := spec.ConnectionPool.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPool.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + if *numberOfInstances < constants.ConnPoolMinInstances { + msg := "Adjusted number of connection pool instances from %d to %d" + c.logger.Warningf(msg, numberOfInstances, constants.ConnPoolMinInstances) + + *numberOfInstances = constants.ConnPoolMinInstances + } + + if err != nil { + return nil, err + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.connPoolLabelsSelector().MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this deployment, but there is a hope that this object + // will be garbage collected if something went wrong and operator + // didn't deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: numberOfInstances, + Selector: c.connPoolLabelsSelector(), + Template: *podTemplate, + }, + } + + return deployment, nil +} + +func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service { + + // there are two ways to enable connection pooler, either to specify a + // connectionPool section or enableConnectionPool. In the second case + // spec.connectionPool will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPool == nil { + spec.ConnectionPool = &acidv1.ConnectionPool{} + } + + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: c.connPoolName(), + Port: pgPort, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, + }, + }, + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "connection-pool": c.connPoolName(), + }, + } + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.connPoolLabelsSelector().MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this service, but there is a hope that this object will + // be garbage collected if something went wrong and operator didn't + // deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: serviceSpec, + } + + return service +} + func ensurePath(file string, defaultDir string, defaultFile string) string { if file == "" { return path.Join(defaultDir, defaultFile) diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 25e0f7af4..e04b281ba 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -1,6 +1,8 @@ package cluster import ( + "errors" + "fmt" "reflect" "testing" @@ -13,6 +15,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -582,6 +585,375 @@ func TestSecretVolume(t *testing.T) { } } +func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"] + if cpuReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest { + return fmt.Errorf("CPU request doesn't match, got %s, expected %s", + cpuReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest) + } + + memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"] + if memReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest { + return fmt.Errorf("Memory request doesn't match, got %s, expected %s", + memReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest) + } + + cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"] + if cpuLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit { + return fmt.Errorf("CPU limit doesn't match, got %s, expected %s", + cpuLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit) + } + + memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"] + if memLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit { + return fmt.Errorf("Memory limit doesn't match, got %s, expected %s", + memLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit) + } + + return nil +} + +func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + poolLabels := podSpec.ObjectMeta.Labels["connection-pool"] + + if poolLabels != cluster.connPoolLabelsSelector().MatchLabels["connection-pool"] { + return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", + podSpec.ObjectMeta.Labels, cluster.connPoolLabelsSelector().MatchLabels) + } + + return nil +} + +func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + required := map[string]bool{ + "PGHOST": false, + "PGPORT": false, + "PGUSER": false, + "PGSCHEMA": false, + "PGPASSWORD": false, + "CONNECTION_POOL_MODE": false, + "CONNECTION_POOL_PORT": false, + } + + envs := podSpec.Spec.Containers[0].Env + for _, env := range envs { + required[env.Name] = true + } + + for env, value := range required { + if !value { + return fmt.Errorf("Environment variable %s is not present", env) + } + } + + return nil +} + +func testCustomPodTemplate(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + if podSpec.ObjectMeta.Name != "test-pod-template" { + return fmt.Errorf("Custom pod template is not used, current spec %+v", + podSpec) + } + + return nil +} + +func TestConnPoolPodSpec(t *testing.T) { + testName := "Test connection pool pod template generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + MaxDBConnections: int32ToPointer(60), + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + var clusterNoDefaultRes = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{}, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "no default resources", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), + cluster: clusterNoDefaultRes, + check: noCheck, + }, + { + subTest: "default resources are set", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testResources, + }, + { + subTest: "labels for service", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testLabels, + }, + { + subTest: "required envs", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testEnvs, + }, + } + for _, tt := range tests { + podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, podSpec) + if err != nil { + t.Errorf("%s [%s]: Pod spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployment) error { + owner := deployment.ObjectMeta.OwnerReferences[0] + + if owner.Name != cluster.Statefulset.ObjectMeta.Name { + return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s", + owner.Name, cluster.Statefulset.ObjectMeta.Name) + } + + return nil +} + +func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { + labels := deployment.Spec.Selector.MatchLabels + expected := cluster.connPoolLabelsSelector().MatchLabels + + if labels["connection-pool"] != expected["connection-pool"] { + return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", + labels, expected) + } + + return nil +} + +func TestConnPoolDeploymentSpec(t *testing.T) { + testName := "Test connection pool deployment spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + noCheck := func(cluster *Cluster, deployment *appsv1.Deployment) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, deployment *appsv1.Deployment) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testDeploymentOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testSelector, + }, + } + for _, tt := range tests { + deployment, err := tt.cluster.generateConnPoolDeployment(tt.spec) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, deployment) + if err != nil { + t.Errorf("%s [%s]: Deployment spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error { + owner := service.ObjectMeta.OwnerReferences[0] + + if owner.Name != cluster.Statefulset.ObjectMeta.Name { + return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s", + owner.Name, cluster.Statefulset.ObjectMeta.Name) + } + + return nil +} + +func testServiceSelector(cluster *Cluster, service *v1.Service) error { + selector := service.Spec.Selector + + if selector["connection-pool"] != cluster.connPoolName() { + return fmt.Errorf("Selector is incorrect, got %s, expected %s", + selector["connection-pool"], cluster.connPoolName()) + } + + return nil +} + +func TestConnPoolServiceSpec(t *testing.T) { + testName := "Test connection pool service spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + noCheck := func(cluster *Cluster, deployment *v1.Service) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + cluster *Cluster + check func(cluster *Cluster, deployment *v1.Service) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: testServiceOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: testServiceSelector, + }, + } + for _, tt := range tests { + service := tt.cluster.generateConnPoolService(tt.spec) + + if err := tt.check(cluster, service); err != nil { + t.Errorf("%s [%s]: Service spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + func TestTLS(t *testing.T) { var err error var spec acidv1.PostgresSpec diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index d6c2149bf..2e02a9a83 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -90,6 +90,132 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) { return statefulSet, nil } +// Prepare the database for connection pool to be used, i.e. install lookup +// function (do it first, because it should be fast and if it didn't succeed, +// it doesn't makes sense to create more K8S objects. At this moment we assume +// that necessary connection pool user exists. +// +// After that create all the objects for connection pool, namely a deployment +// with a chosen pooler and a service to expose it. +func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolObjects, error) { + var msg string + c.setProcessName("creating connection pool") + + schema := c.Spec.ConnectionPool.Schema + if schema == "" { + schema = c.OpConfig.ConnectionPool.Schema + } + + user := c.Spec.ConnectionPool.User + if user == "" { + user = c.OpConfig.ConnectionPool.User + } + + err := lookup(schema, user) + + if err != nil { + msg = "could not prepare database for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + deploymentSpec, err := c.generateConnPoolDeployment(&c.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + // client-go does retry 10 times (with NoBackoff by default) when the API + // believe a request can be retried and returns Retry-After header. This + // should be good enough to not think about it here. + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return nil, err + } + + serviceSpec := c.generateConnPoolService(&c.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return nil, err + } + + c.ConnectionPool = &ConnectionPoolObjects{ + Deployment: deployment, + Service: service, + } + c.logger.Debugf("created new connection pool %q, uid: %q", + util.NameFromMeta(deployment.ObjectMeta), deployment.UID) + + return c.ConnectionPool, nil +} + +func (c *Cluster) deleteConnectionPool() (err error) { + c.setProcessName("deleting connection pool") + c.logger.Debugln("deleting connection pool") + + // Lack of connection pooler objects is not a fatal error, just log it if + // it was present before in the manifest + if c.ConnectionPool == nil { + c.logger.Infof("No connection pool to delete") + return nil + } + + // Clean up the deployment object. If deployment resource we've remembered + // is somehow empty, try to delete based on what would we generate + deploymentName := c.connPoolName() + deployment := c.ConnectionPool.Deployment + + if deployment != nil { + deploymentName = deployment.Name + } + + // set delete propagation policy to foreground, so that replica set will be + // also deleted. + policy := metav1.DeletePropagationForeground + options := metav1.DeleteOptions{PropagationPolicy: &policy} + err = c.KubeClient. + Deployments(c.Namespace). + Delete(deploymentName, &options) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool deployment was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete deployment: %v", err) + } + + c.logger.Infof("Connection pool deployment %q has been deleted", deploymentName) + + // Repeat the same for the service object + service := c.ConnectionPool.Service + serviceName := c.connPoolName() + + if service != nil { + serviceName = service.Name + } + + // set delete propagation policy to foreground, so that all the dependant + // will be deleted. + err = c.KubeClient. + Services(c.Namespace). + Delete(serviceName, &options) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool service was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete service: %v", err) + } + + c.logger.Infof("Connection pool service %q has been deleted", serviceName) + + c.ConnectionPool = nil + return nil +} + func getPodIndex(podName string) (int32, error) { parts := strings.Split(podName, "-") if len(parts) == 0 { @@ -674,3 +800,34 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { return c.PodDisruptionBudget } + +// Perform actual patching of a connection pool deployment, assuming that all +// the check were already done before. +func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { + c.setProcessName("updating connection pool") + if c.ConnectionPool == nil || c.ConnectionPool.Deployment == nil { + return nil, fmt.Errorf("there is no connection pool in the cluster") + } + + patchData, err := specPatch(newDeployment.Spec) + if err != nil { + return nil, fmt.Errorf("could not form patch for the deployment: %v", err) + } + + // An update probably requires RetryOnConflict, but since only one operator + // worker at one time will try to update it chances of conflicts are + // minimal. + deployment, err := c.KubeClient. + Deployments(c.ConnectionPool.Deployment.Namespace). + Patch( + c.ConnectionPool.Deployment.Name, + types.MergePatchType, + patchData, "") + if err != nil { + return nil, fmt.Errorf("could not patch deployment: %v", err) + } + + c.ConnectionPool.Deployment = deployment + + return deployment, nil +} diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go new file mode 100644 index 000000000..f06e96e65 --- /dev/null +++ b/pkg/cluster/resources_test.go @@ -0,0 +1,127 @@ +package cluster + +import ( + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func mockInstallLookupFunction(schema string, user string) error { + return nil +} + +func boolToPointer(value bool) *bool { + return &value +} + +func TestConnPoolCreationAndDeletion(t *testing.T) { + testName := "Test connection pool creation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + } + poolResources, err := cluster.createConnectionPool(mockInstallLookupFunction) + + if err != nil { + t.Errorf("%s: Cannot create connection pool, %s, %+v", + testName, err, poolResources) + } + + if poolResources.Deployment == nil { + t.Errorf("%s: Connection pool deployment is empty", testName) + } + + if poolResources.Service == nil { + t.Errorf("%s: Connection pool service is empty", testName) + } + + err = cluster.deleteConnectionPool() + if err != nil { + t.Errorf("%s: Cannot delete connection pool, %s", testName, err) + } +} + +func TestNeedConnPool(t *testing.T) { + testName := "Test how connection pool can be enabled" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with full definition", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(true), + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with flag", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(false), + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is still enabled with flag being false", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(true), + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with flag and full", + testName) + } +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index b04ff863b..a7c933ae7 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -23,6 +23,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() + oldSpec := c.Postgresql c.setSpec(newSpec) defer func() { @@ -108,6 +109,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } + // sync connection pool + if err = c.syncConnectionPool(&oldSpec, newSpec, c.installLookupFunction); err != nil { + return fmt.Errorf("could not sync connection pool: %v", err) + } + return err } @@ -424,7 +430,9 @@ func (c *Cluster) syncSecrets() error { } pwdUser := userMap[secretUsername] // if this secret belongs to the infrastructure role and the password has changed - replace it in the secret - if pwdUser.Password != string(secret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure { + if pwdUser.Password != string(secret.Data["password"]) && + pwdUser.Origin == spec.RoleOriginInfrastructure { + c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name) if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil { return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err) @@ -468,6 +476,16 @@ func (c *Cluster) syncRoles() (err error) { for _, u := range c.pgUsers { userNames = append(userNames, u.Name) } + + if c.needConnectionPool() { + connPoolUser := c.systemUsers[constants.ConnectionPoolUserKeyName] + userNames = append(userNames, connPoolUser.Name) + + if _, exists := c.pgUsers[connPoolUser.Name]; !exists { + c.pgUsers[connPoolUser.Name] = connPoolUser + } + } + dbUsers, err = c.readPgUsersFromDatabase(userNames) if err != nil { return fmt.Errorf("error getting users from the database: %v", err) @@ -600,3 +618,165 @@ func (c *Cluster) syncLogicalBackupJob() error { return nil } + +func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error { + if c.ConnectionPool == nil { + c.ConnectionPool = &ConnectionPoolObjects{} + } + + newNeedConnPool := c.needConnectionPoolWorker(&newSpec.Spec) + oldNeedConnPool := c.needConnectionPoolWorker(&oldSpec.Spec) + + if newNeedConnPool { + // Try to sync in any case. If we didn't needed connection pool before, + // it means we want to create it. If it was already present, still sync + // since it could happen that there is no difference in specs, and all + // the resources are remembered, but the deployment was manualy deleted + // in between + c.logger.Debug("syncing connection pool") + + // in this case also do not forget to install lookup function as for + // creating cluster + if !oldNeedConnPool || !c.ConnectionPool.LookupFunction { + newConnPool := newSpec.Spec.ConnectionPool + + specSchema := "" + specUser := "" + + if newConnPool != nil { + specSchema = newConnPool.Schema + specUser = newConnPool.User + } + + schema := util.Coalesce( + specSchema, + c.OpConfig.ConnectionPool.Schema) + + user := util.Coalesce( + specUser, + c.OpConfig.ConnectionPool.User) + + if err := lookup(schema, user); err != nil { + return err + } + } + + if err := c.syncConnectionPoolWorker(oldSpec, newSpec); err != nil { + c.logger.Errorf("could not sync connection pool: %v", err) + return err + } + } + + if oldNeedConnPool && !newNeedConnPool { + // delete and cleanup resources + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } + } + + if !oldNeedConnPool && !newNeedConnPool { + // delete and cleanup resources if not empty + if c.ConnectionPool != nil && + (c.ConnectionPool.Deployment != nil || + c.ConnectionPool.Service != nil) { + + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } + } + } + + return nil +} + +// Synchronize connection pool resources. Effectively we're interested only in +// synchronizing the corresponding deployment, but in case of deployment or +// service is missing, create it. After checking, also remember an object for +// the future references. +func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) error { + deployment, err := c.KubeClient. + Deployments(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Deployment %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + deploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + } else if err != nil { + return fmt.Errorf("could not get connection pool deployment to sync: %v", err) + } else { + c.ConnectionPool.Deployment = deployment + + // actual synchronization + oldConnPool := oldSpec.Spec.ConnectionPool + newConnPool := newSpec.Spec.ConnectionPool + specSync, specReason := c.needSyncConnPoolSpecs(oldConnPool, newConnPool) + defaultsSync, defaultsReason := c.needSyncConnPoolDefaults(newConnPool, deployment) + reason := append(specReason, defaultsReason...) + if specSync || defaultsSync { + c.logger.Infof("Update connection pool deployment %s, reason: %+v", + c.connPoolName(), reason) + + newDeploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg := "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + oldDeploymentSpec := c.ConnectionPool.Deployment + + deployment, err := c.updateConnPoolDeployment( + oldDeploymentSpec, + newDeploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + return nil + } + } + + service, err := c.KubeClient. + Services(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Service %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + serviceSpec := c.generateConnPoolService(&newSpec.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Service = service + } else if err != nil { + return fmt.Errorf("could not get connection pool service to sync: %v", err) + } else { + // Service updates are not supported and probably not that useful anyway + c.ConnectionPool.Service = service + } + + return nil +} diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go new file mode 100644 index 000000000..483d4ba58 --- /dev/null +++ b/pkg/cluster/sync_test.go @@ -0,0 +1,212 @@ +package cluster + +import ( + "fmt" + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func int32ToPointer(value int32) *int32 { + return &value +} + +func deploymentUpdated(cluster *Cluster, err error) error { + if cluster.ConnectionPool.Deployment.Spec.Replicas == nil || + *cluster.ConnectionPool.Deployment.Spec.Replicas != 2 { + return fmt.Errorf("Wrong nubmer of instances") + } + + return nil +} + +func objectsAreSaved(cluster *Cluster, err error) error { + if cluster.ConnectionPool == nil { + return fmt.Errorf("Connection pool resources are empty") + } + + if cluster.ConnectionPool.Deployment == nil { + return fmt.Errorf("Deployment was not saved") + } + + if cluster.ConnectionPool.Service == nil { + return fmt.Errorf("Service was not saved") + } + + return nil +} + +func objectsAreDeleted(cluster *Cluster, err error) error { + if cluster.ConnectionPool != nil { + return fmt.Errorf("Connection pool was not deleted") + } + + return nil +} + +func TestConnPoolSynchronization(t *testing.T) { + testName := "Test connection pool synchronization" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + NumberOfInstances: int32ToPointer(1), + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + clusterMissingObjects := *cluster + clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects() + + clusterMock := *cluster + clusterMock.KubeClient = k8sutil.NewMockKubernetesClient() + + clusterDirtyMock := *cluster + clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() + clusterDirtyMock.ConnectionPool = &ConnectionPoolObjects{ + Deployment: &appsv1.Deployment{}, + Service: &v1.Service{}, + } + + clusterNewDefaultsMock := *cluster + clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() + cluster.OpConfig.ConnectionPool.Image = "pooler:2.0" + cluster.OpConfig.ConnectionPool.NumberOfInstances = int32ToPointer(2) + + tests := []struct { + subTest string + oldSpec *acidv1.Postgresql + newSpec *acidv1.Postgresql + cluster *Cluster + check func(cluster *Cluster, err error) error + }{ + { + subTest: "create if doesn't exist", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "create if doesn't exist with a flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(true), + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "create from scratch", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "delete if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: &clusterMock, + check: objectsAreDeleted, + }, + { + subTest: "cleanup if still there", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: &clusterDirtyMock, + check: objectsAreDeleted, + }, + { + subTest: "update deployment", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(1), + }, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(2), + }, + }, + }, + cluster: &clusterMock, + check: deploymentUpdated, + }, + { + subTest: "update image from changed defaults", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterNewDefaultsMock, + check: deploymentUpdated, + }, + } + for _, tt := range tests { + err := tt.cluster.syncConnectionPool(tt.oldSpec, tt.newSpec, mockInstallLookupFunction) + + if err := tt.check(tt.cluster, err); err != nil { + t.Errorf("%s [%s]: Could not synchronize, %+v", + testName, tt.subTest, err) + } + } +} diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 138b7015c..04d00cb58 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -69,3 +69,7 @@ type ClusterStatus struct { Spec acidv1.PostgresSpec Error error } + +type TemplateParams map[string]interface{} + +type InstallFunction func(schema string, user string) error diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 8c02fed2e..dc1e93954 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -408,7 +408,32 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set { } func (c *Cluster) labelsSelector() *metav1.LabelSelector { - return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil} + return &metav1.LabelSelector{ + MatchLabels: c.labelsSet(false), + MatchExpressions: nil, + } +} + +// Return connection pool labels selector, which should from one point of view +// inherit most of the labels from the cluster itself, but at the same time +// have e.g. different `application` label, so that recreatePod operation will +// not interfere with it (it lists all the pods via labels, and if there would +// be no difference, it will recreate also pooler pods). +func (c *Cluster) connPoolLabelsSelector() *metav1.LabelSelector { + connPoolLabels := labels.Set(map[string]string{}) + + extraLabels := labels.Set(map[string]string{ + "connection-pool": c.connPoolName(), + "application": "db-connection-pool", + }) + + connPoolLabels = labels.Merge(connPoolLabels, c.labelsSet(false)) + connPoolLabels = labels.Merge(connPoolLabels, extraLabels) + + return &metav1.LabelSelector{ + MatchLabels: connPoolLabels, + MatchExpressions: nil, + } } func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set { @@ -483,3 +508,15 @@ func (c *Cluster) GetSpec() (*acidv1.Postgresql, error) { func (c *Cluster) patroniUsesKubernetes() bool { return c.OpConfig.EtcdHost == "" } + +func (c *Cluster) needConnectionPoolWorker(spec *acidv1.PostgresSpec) bool { + if spec.EnableConnectionPool == nil { + return spec.ConnectionPool != nil + } else { + return *spec.EnableConnectionPool + } +} + +func (c *Cluster) needConnectionPool() bool { + return c.needConnectionPoolWorker(&c.Spec) +} diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index 03602c3bd..970eef701 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -8,6 +8,7 @@ import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -21,6 +22,10 @@ func (c *Controller) readOperatorConfigurationFromCRD(configObjectNamespace, con return config, nil } +func int32ToPointer(value int32) *int32 { + return &value +} + // importConfigurationFromCRD is a transitional function that converts CRD configuration to the one based on the configmap func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigurationData) *config.Config { result := &config.Config{} @@ -143,5 +148,51 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit + // Connection pool. Looks like we can't use defaulting in CRD before 1.17, + // so ensure default values here. + result.ConnectionPool.NumberOfInstances = util.CoalesceInt32( + fromCRD.ConnectionPool.NumberOfInstances, + int32ToPointer(2)) + + result.ConnectionPool.NumberOfInstances = util.MaxInt32( + result.ConnectionPool.NumberOfInstances, + int32ToPointer(2)) + + result.ConnectionPool.Schema = util.Coalesce( + fromCRD.ConnectionPool.Schema, + constants.ConnectionPoolSchemaName) + + result.ConnectionPool.User = util.Coalesce( + fromCRD.ConnectionPool.User, + constants.ConnectionPoolUserName) + + result.ConnectionPool.Image = util.Coalesce( + fromCRD.ConnectionPool.Image, + "registry.opensource.zalan.do/acid/pgbouncer") + + result.ConnectionPool.Mode = util.Coalesce( + fromCRD.ConnectionPool.Mode, + constants.ConnectionPoolDefaultMode) + + result.ConnectionPool.ConnPoolDefaultCPURequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPURequest, + constants.ConnectionPoolDefaultCpuRequest) + + result.ConnectionPool.ConnPoolDefaultMemoryRequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryRequest, + constants.ConnectionPoolDefaultMemoryRequest) + + result.ConnectionPool.ConnPoolDefaultCPULimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPULimit, + constants.ConnectionPoolDefaultCpuLimit) + + result.ConnectionPool.ConnPoolDefaultMemoryLimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryLimit, + constants.ConnectionPoolDefaultMemoryLimit) + + result.ConnectionPool.MaxDBConnections = util.CoalesceInt32( + fromCRD.ConnectionPool.MaxDBConnections, + int32ToPointer(constants.ConnPoolMaxDBConnections)) + return result } diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 3e6bec8db..36783204d 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -23,13 +23,15 @@ const fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespa // RoleOrigin contains the code of the origin of a role type RoleOrigin int -// The rolesOrigin constant values must be sorted by the role priority for resolveNameConflict(...) to work. +// The rolesOrigin constant values must be sorted by the role priority for +// resolveNameConflict(...) to work. const ( RoleOriginUnknown RoleOrigin = iota RoleOriginManifest RoleOriginInfrastructure RoleOriginTeamsAPI RoleOriginSystem + RoleConnectionPool ) type syncUserOperation int @@ -178,6 +180,8 @@ func (r RoleOrigin) String() string { return "teams API role" case RoleOriginSystem: return "system role" + case RoleConnectionPool: + return "connection pool role" default: panic(fmt.Sprintf("bogus role origin value %d", r)) } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 96d95fa39..403615f06 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/zalando/postgres-operator/pkg/spec" + "github.com/zalando/postgres-operator/pkg/util/constants" ) // CRD describes CustomResourceDefinition specific configuration parameters @@ -83,6 +84,20 @@ type LogicalBackup struct { LogicalBackupS3SSE string `name:"logical_backup_s3_sse" default:"AES256"` } +// Operator options for connection pooler +type ConnectionPool struct { + NumberOfInstances *int32 `name:"connection_pool_number_of_instances" default:"2"` + Schema string `name:"connection_pool_schema" default:"pooler"` + User string `name:"connection_pool_user" default:"pooler"` + Image string `name:"connection_pool_image" default:"registry.opensource.zalan.do/acid/pgbouncer"` + Mode string `name:"connection_pool_mode" default:"transaction"` + MaxDBConnections *int32 `name:"connection_pool_max_db_connections" default:"60"` + ConnPoolDefaultCPURequest string `name:"connection_pool_default_cpu_request" default:"500m"` + ConnPoolDefaultMemoryRequest string `name:"connection_pool_default_memory_request" default:"100Mi"` + ConnPoolDefaultCPULimit string `name:"connection_pool_default_cpu_limit" default:"1"` + ConnPoolDefaultMemoryLimit string `name:"connection_pool_default_memory_limit" default:"100Mi"` +} + // Config describes operator config type Config struct { CRD @@ -90,6 +105,7 @@ type Config struct { Auth Scalyr LogicalBackup + ConnectionPool WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to' EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS @@ -196,5 +212,10 @@ func validate(cfg *Config) (err error) { if cfg.Workers == 0 { err = fmt.Errorf("number of workers should be higher than 0") } + + if *cfg.ConnectionPool.NumberOfInstances < constants.ConnPoolMinInstances { + msg := "number of connection pool instances should be higher than %d" + err = fmt.Errorf(msg, constants.ConnPoolMinInstances) + } return } diff --git a/pkg/util/constants/pooler.go b/pkg/util/constants/pooler.go new file mode 100644 index 000000000..540d64e2c --- /dev/null +++ b/pkg/util/constants/pooler.go @@ -0,0 +1,18 @@ +package constants + +// Connection pool specific constants +const ( + ConnectionPoolUserName = "pooler" + ConnectionPoolSchemaName = "pooler" + ConnectionPoolDefaultType = "pgbouncer" + ConnectionPoolDefaultMode = "transaction" + ConnectionPoolDefaultCpuRequest = "500m" + ConnectionPoolDefaultCpuLimit = "1" + ConnectionPoolDefaultMemoryRequest = "100Mi" + ConnectionPoolDefaultMemoryLimit = "100Mi" + + ConnPoolContainer = 0 + ConnPoolMaxDBConnections = 60 + ConnPoolMaxClientConnections = 10000 + ConnPoolMinInstances = 2 +) diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go index 2c20d69db..3d201142c 100644 --- a/pkg/util/constants/roles.go +++ b/pkg/util/constants/roles.go @@ -2,15 +2,16 @@ package constants // Roles specific constants const ( - PasswordLength = 64 - SuperuserKeyName = "superuser" - ReplicationUserKeyName = "replication" - RoleFlagSuperuser = "SUPERUSER" - RoleFlagInherit = "INHERIT" - RoleFlagLogin = "LOGIN" - RoleFlagNoLogin = "NOLOGIN" - RoleFlagCreateRole = "CREATEROLE" - RoleFlagCreateDB = "CREATEDB" - RoleFlagReplication = "REPLICATION" - RoleFlagByPassRLS = "BYPASSRLS" + PasswordLength = 64 + SuperuserKeyName = "superuser" + ConnectionPoolUserKeyName = "pooler" + ReplicationUserKeyName = "replication" + RoleFlagSuperuser = "SUPERUSER" + RoleFlagInherit = "INHERIT" + RoleFlagLogin = "LOGIN" + RoleFlagNoLogin = "NOLOGIN" + RoleFlagCreateRole = "CREATEROLE" + RoleFlagCreateDB = "CREATEDB" + RoleFlagReplication = "REPLICATION" + RoleFlagByPassRLS = "BYPASSRLS" ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 509b12c19..75b99ec7c 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -9,11 +9,13 @@ import ( batchv1beta1 "k8s.io/api/batch/v1beta1" clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1" + apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -26,6 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func Int32ToPointer(value int32) *int32 { + return &value +} + // KubernetesClient describes getters for Kubernetes objects type KubernetesClient struct { corev1.SecretsGetter @@ -39,6 +45,7 @@ type KubernetesClient struct { corev1.NamespacesGetter corev1.ServiceAccountsGetter appsv1.StatefulSetsGetter + appsv1.DeploymentsGetter rbacv1.RoleBindingsGetter policyv1beta1.PodDisruptionBudgetsGetter apiextbeta1.CustomResourceDefinitionsGetter @@ -55,6 +62,34 @@ type mockSecret struct { type MockSecretGetter struct { } +type mockDeployment struct { + appsv1.DeploymentInterface +} + +type mockDeploymentNotExist struct { + appsv1.DeploymentInterface +} + +type MockDeploymentGetter struct { +} + +type MockDeploymentNotExistGetter struct { +} + +type mockService struct { + corev1.ServiceInterface +} + +type mockServiceNotExist struct { + corev1.ServiceInterface +} + +type MockServiceGetter struct { +} + +type MockServiceNotExistGetter struct { +} + type mockConfigMap struct { corev1.ConfigMapInterface } @@ -101,6 +136,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.NodesGetter = client.CoreV1() kubeClient.NamespacesGetter = client.CoreV1() kubeClient.StatefulSetsGetter = client.AppsV1() + kubeClient.DeploymentsGetter = client.AppsV1() kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1() kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RoleBindingsGetter = client.RbacV1() @@ -230,19 +266,145 @@ func (c *mockConfigMap) Get(name string, options metav1.GetOptions) (*v1.ConfigM } // Secrets to be mocked -func (c *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface { +func (mock *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface { return &mockSecret{} } // ConfigMaps to be mocked -func (c *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface { +func (mock *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface { return &mockConfigMap{} } +func (mock *MockDeploymentGetter) Deployments(namespace string) appsv1.DeploymentInterface { + return &mockDeployment{} +} + +func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.DeploymentInterface { + return &mockDeploymentNotExist{} +} + +func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(1), + }, + }, nil +} + +func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) error { + return nil +} + +func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(1), + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Image: "pooler:1.0", + }, + }, + }, + }, + }, + }, nil +} + +func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(2), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + +func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + +func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(1), + }, + }, nil +} + +func (mock *MockServiceGetter) Services(namespace string) corev1.ServiceInterface { + return &mockService{} +} + +func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.ServiceInterface { + return &mockServiceNotExist{} +} + +func (mock *mockService) Create(*v1.Service) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error { + return nil +} + +func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + // NewMockKubernetesClient for other tests func NewMockKubernetesClient() KubernetesClient { return KubernetesClient{ - SecretsGetter: &MockSecretGetter{}, - ConfigMapsGetter: &MockConfigMapsGetter{}, + SecretsGetter: &MockSecretGetter{}, + ConfigMapsGetter: &MockConfigMapsGetter{}, + DeploymentsGetter: &MockDeploymentGetter{}, + ServicesGetter: &MockServiceGetter{}, + } +} + +func ClientMissingObjects() KubernetesClient { + return KubernetesClient{ + DeploymentsGetter: &MockDeploymentNotExistGetter{}, + ServicesGetter: &MockServiceNotExistGetter{}, } } diff --git a/pkg/util/util.go b/pkg/util/util.go index d9803ab48..46df5d345 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -147,6 +147,40 @@ func Coalesce(val, defaultVal string) string { return val } +// Yeah, golang +func CoalesceInt32(val, defaultVal *int32) *int32 { + if val == nil { + return defaultVal + } + return val +} + +// Test if any of the values is nil +func testNil(values ...*int32) bool { + for _, v := range values { + if v == nil { + return true + } + } + + return false +} + +// Return maximum of two integers provided via pointers. If one value is not +// defined, return the other one. If both are not defined, result is also +// undefined, caller needs to check for that. +func MaxInt32(a, b *int32) *int32 { + if testNil(a, b) { + return nil + } + + if *a > *b { + return a + } + + return b +} + // IsSmallerQuantity : checks if first resource is of a smaller quantity than the second func IsSmallerQuantity(requestStr, limitStr string) (bool, error) { diff --git a/ui/manifests/deployment.yaml b/ui/manifests/deployment.yaml index 477e4d655..6138ca1a8 100644 --- a/ui/manifests/deployment.yaml +++ b/ui/manifests/deployment.yaml @@ -20,7 +20,7 @@ spec: serviceAccountName: postgres-operator-ui containers: - name: "service" - image: registry.opensource.zalan.do/acid/postgres-operator-ui:v1.3.0 + image: registry.opensource.zalan.do/acid/postgres-operator-ui:v1.4.0 ports: - containerPort: 8081 protocol: "TCP"