From 49158ecb6843181dd984b6ff4f4390c068bec062 Mon Sep 17 00:00:00 2001 From: Rafia Sabih Date: Fri, 13 Nov 2020 14:52:21 +0100 Subject: [PATCH] Connection pooler for replica (#1127) * Enable connection pooler for replica * Refactor code for connection pooler - Move all the relevant code to a separate file - Move all the related tests to a separate file - Avoid using cluster where not required - Simplify the logic in sync and other methods - Cleanup of duplicated or unused code * Fix labels for the replica pods * Update deleteConnectionPooler to include role * Adding test cases and other changes - Fix unit test and delete secret when required only - Make sure we use empty fresh cluster for every test case. * enhance e2e test * Disable pooler in complete manifest as this is source for e2e too an creates unnecessary pooler setups. Co-authored-by: Rafia Sabih Co-authored-by: Jan Mussler --- .../postgres-operator/crds/postgresqls.yaml | 4 +- docs/reference/cluster_manifest.md | 19 +- docs/user.md | 8 +- e2e/scripts/watch_objects.sh | 4 +- e2e/tests/k8s_api.py | 43 +- e2e/tests/test_e2e.py | 239 +++-- manifests/complete-postgres-manifest.yaml | 28 +- manifests/postgresql.crd.yaml | 2 + pkg/apis/acid.zalan.do/v1/crds.go | 3 + pkg/apis/acid.zalan.do/v1/postgresql_type.go | 5 +- pkg/cluster/cluster.go | 183 +--- pkg/cluster/connection_pooler.go | 905 +++++++++++++++++ pkg/cluster/connection_pooler_new_test.go | 45 + pkg/cluster/connection_pooler_test.go | 956 ++++++++++++++++++ pkg/cluster/database.go | 7 +- pkg/cluster/k8sres.go | 306 ------ pkg/cluster/k8sres_test.go | 318 +----- pkg/cluster/resources.go | 198 ---- pkg/cluster/resources_test.go | 127 --- pkg/cluster/sync.go | 206 +--- pkg/cluster/sync_test.go | 264 ----- pkg/cluster/types.go | 2 +- pkg/cluster/util.go | 34 - pkg/controller/postgresql.go | 4 +- ui/operator_ui/main.py | 11 + 25 files changed, 2185 insertions(+), 1736 deletions(-) create mode 100644 pkg/cluster/connection_pooler.go create mode 100644 pkg/cluster/connection_pooler_new_test.go create mode 100644 pkg/cluster/connection_pooler_test.go delete mode 100644 pkg/cluster/resources_test.go delete mode 100644 pkg/cluster/sync_test.go diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 74c8f74b8..9127fa86e 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -190,6 +190,8 @@ spec: type: string enableConnectionPooler: type: boolean + enableReplicaConnectionPooler: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: @@ -603,4 +605,4 @@ spec: status: type: object additionalProperties: - type: string + type: string \ No newline at end of file diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 70ab14855..f7ddb6ff1 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -151,10 +151,15 @@ These parameters are grouped directly under the `spec` key in the manifest. configured (so you can override the operator configuration). Optional. * **enableConnectionPooler** - Tells the operator to create a connection pooler with a database. If this - field is true, a connection pooler deployment will be created even if + Tells the operator to create a connection pooler with a database for the master + service. If this field is true, a connection pooler deployment will be created even if `connectionPooler` section is empty. Optional, not set by default. +* **enableReplicaConnectionPooler** + Tells the operator to create a connection pooler with a database for the replica + service. If this field is true, a connection pooler deployment for replica + will be created even if `connectionPooler` section is empty. Optional, not set by default. + * **enableLogicalBackup** Determines if the logical backup of this cluster should be taken and uploaded to S3. Default: false. Optional. @@ -241,10 +246,10 @@ explanation of `ttl` and `loop_wait` parameters. * **synchronous_mode** Patroni `synchronous_mode` parameter value. The default is set to `false`. Optional. - + * **synchronous_mode_strict** Patroni `synchronous_mode_strict` parameter value. Can be used in addition to `synchronous_mode`. The default is set to `false`. Optional. - + ## Postgres container resources Those parameters define [CPU and memory requests and limits](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/) @@ -397,8 +402,10 @@ CPU and memory limits for the sidecar container. Parameters are grouped under the `connectionPooler` top-level key and specify configuration for connection pooler. If this section is not empty, a connection -pooler will be created for a database even if `enableConnectionPooler` is not -present. +pooler will be created for master service only even if `enableConnectionPooler` +is not present. But if this section is present then it defines the configuration +for both master and replica pooler services (if `enableReplicaConnectionPooler` + is enabled). * **numberOfInstances** How many instances of connection pooler to create. diff --git a/docs/user.md b/docs/user.md index f834c788a..8723a01e4 100644 --- a/docs/user.md +++ b/docs/user.md @@ -807,11 +807,17 @@ manifest: ```yaml spec: enableConnectionPooler: true + enableReplicaConnectionPooler: true ``` This will tell the operator to create a connection pooler with default configuration, through which one can access the master via a separate service -`{cluster-name}-pooler`. In most of the cases the +`{cluster-name}-pooler`. With the first option, connection pooler for master service +is created and with the second option, connection pooler for replica is created. +Note that both of these flags are independent of each other and user can set or +unset any of them as per their requirements without any effect on the other. + +In most of the cases the [default configuration](reference/operator_parameters.md#connection-pooler-configuration) should be good enough. To configure a new connection pooler individually for each Postgres cluster, specify: diff --git a/e2e/scripts/watch_objects.sh b/e2e/scripts/watch_objects.sh index dbd98ffc6..52364f247 100755 --- a/e2e/scripts/watch_objects.sh +++ b/e2e/scripts/watch_objects.sh @@ -8,7 +8,9 @@ kubectl get statefulset -o jsonpath='{.items..metadata.annotations.zalando-postg echo echo echo 'Pods' -kubectl get pods -l application=spilo -l name=postgres-operator -l application=db-connection-pooler -o wide --all-namespaces +kubectl get pods -l application=spilo -o wide --all-namespaces +echo +kubectl get pods -l application=db-connection-pooler -o wide --all-namespaces echo echo 'Statefulsets' kubectl get statefulsets --all-namespaces diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 93280dd53..30165e6a0 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -1,19 +1,14 @@ import json -import unittest import time -import timeout_decorator import subprocess import warnings -import os -import yaml -from datetime import datetime from kubernetes import client, config from kubernetes.client.rest import ApiException def to_selector(labels): - return ",".join(["=".join(l) for l in labels.items()]) + return ",".join(["=".join(lbl) for lbl in labels.items()]) class K8sApi: @@ -43,8 +38,8 @@ class K8s: def __init__(self, labels='x=y', namespace='default'): self.api = K8sApi() - self.labels=labels - self.namespace=namespace + self.labels = labels + self.namespace = namespace def get_pg_nodes(self, pg_cluster_name, namespace='default'): master_pod_node = '' @@ -81,7 +76,7 @@ class K8s: 'default', label_selector='name=postgres-operator' ).items - pods = list(filter(lambda x: x.status.phase=='Running', pods)) + pods = list(filter(lambda x: x.status.phase == 'Running', pods)) if len(pods): return pods[0] @@ -110,7 +105,6 @@ class K8s: time.sleep(self.RETRY_TIMEOUT_SEC) - def get_service_type(self, svc_labels, namespace='default'): svc_type = '' svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items @@ -213,8 +207,8 @@ class K8s: self.wait_for_logical_backup_job(expected_num_of_jobs=1) def delete_operator_pod(self, step="Delete operator pod"): - # patching the pod template in the deployment restarts the operator pod - self.api.apps_v1.patch_namespaced_deployment("postgres-operator","default", {"spec":{"template":{"metadata":{"annotations":{"step":"{}-{}".format(step, datetime.fromtimestamp(time.time()))}}}}}) + # patching the pod template in the deployment restarts the operator pod + self.api.apps_v1.patch_namespaced_deployment("postgres-operator", "default", {"spec": {"template": {"metadata": {"annotations": {"step": "{}-{}".format(step, time.time())}}}}}) self.wait_for_operator_pod_start() def update_config(self, config_map_patch, step="Updating operator deployment"): @@ -237,7 +231,7 @@ class K8s: def get_patroni_state(self, pod): r = self.exec_with_kubectl(pod, "patronictl list -f json") - if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[": + if not r.returncode == 0 or not r.stdout.decode()[0:1] == "[": return [] return json.loads(r.stdout.decode()) @@ -248,7 +242,7 @@ class K8s: pod = pod.metadata.name r = self.exec_with_kubectl(pod, "curl localhost:8080/workers/all/status/") - if not r.returncode == 0 or not r.stdout.decode()[0:1]=="{": + if not r.returncode == 0 or not r.stdout.decode()[0:1] == "{": return None return json.loads(r.stdout.decode()) @@ -277,7 +271,7 @@ class K8s: ''' pod = self.api.core_v1.list_namespaced_pod( namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name) - + if len(pod.items) == 0: return None return pod.items[0].spec.containers[0].image @@ -305,8 +299,8 @@ class K8sBase: def __init__(self, labels='x=y', namespace='default'): self.api = K8sApi() - self.labels=labels - self.namespace=namespace + self.labels = labels + self.namespace = namespace def get_pg_nodes(self, pg_cluster_labels='cluster-name=acid-minimal-cluster', namespace='default'): master_pod_node = '' @@ -434,10 +428,10 @@ class K8sBase: def count_pdbs_with_label(self, labels, namespace='default'): return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget( namespace, label_selector=labels).items) - + def count_running_pods(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'): pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items - return len(list(filter(lambda x: x.status.phase=='Running', pods))) + return len(list(filter(lambda x: x.status.phase == 'Running', pods))) def wait_for_pod_failover(self, failover_targets, labels, namespace='default'): pod_phase = 'Failing over' @@ -484,14 +478,14 @@ class K8sBase: def get_patroni_state(self, pod): r = self.exec_with_kubectl(pod, "patronictl list -f json") - if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[": + if not r.returncode == 0 or not r.stdout.decode()[0:1] == "[": return [] return json.loads(r.stdout.decode()) def get_patroni_running_members(self, pod): result = self.get_patroni_state(pod) - return list(filter(lambda x: x["State"]=="running", result)) - + return list(filter(lambda x: x["State"] == "running", result)) + def get_statefulset_image(self, label_selector="application=spilo,cluster-name=acid-minimal-cluster", namespace='default'): ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=label_selector, limit=1) if len(ssets.items) == 0: @@ -505,7 +499,7 @@ class K8sBase: ''' pod = self.api.core_v1.list_namespaced_pod( namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name) - + if len(pod.items) == 0: return None return pod.items[0].spec.containers[0].image @@ -514,10 +508,13 @@ class K8sBase: """ Inspiriational classes towards easier writing of end to end tests with one cluster per test case """ + + class K8sOperator(K8sBase): def __init__(self, labels="name=postgres-operator", namespace="default"): super().__init__(labels, namespace) + class K8sPostgres(K8sBase): def __init__(self, labels="cluster-name=acid-minimal-cluster", namespace="default"): super().__init__(labels, namespace) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 0fc60bf42..f863123bd 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -14,8 +14,9 @@ from kubernetes.client.rest import ApiException SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-12:1.6-p5" SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-cdp-12:1.6-p114" + def to_selector(labels): - return ",".join(["=".join(l) for l in labels.items()]) + return ",".join(["=".join(lbl) for lbl in labels.items()]) def clean_list(values): @@ -41,7 +42,7 @@ class EndToEndTestCase(unittest.TestCase): self.assertEqual(y, x, m.format(y)) return True except AssertionError: - retries = retries -1 + retries = retries - 1 if not retries > 0: raise time.sleep(interval) @@ -53,7 +54,7 @@ class EndToEndTestCase(unittest.TestCase): self.assertNotEqual(y, x, m.format(y)) return True except AssertionError: - retries = retries -1 + retries = retries - 1 if not retries > 0: raise time.sleep(interval) @@ -64,7 +65,7 @@ class EndToEndTestCase(unittest.TestCase): self.assertTrue(f(), m) return True except AssertionError: - retries = retries -1 + retries = retries - 1 if not retries > 0: raise time.sleep(interval) @@ -104,13 +105,13 @@ class EndToEndTestCase(unittest.TestCase): with open("manifests/postgres-operator.yaml", 'r+') as f: operator_deployment = yaml.safe_load(f) operator_deployment["spec"]["template"]["spec"]["containers"][0]["image"] = os.environ['OPERATOR_IMAGE'] - + with open("manifests/postgres-operator.yaml", 'w') as f: yaml.dump(operator_deployment, f, Dumper=yaml.Dumper) with open("manifests/configmap.yaml", 'r+') as f: - configmap = yaml.safe_load(f) - configmap["data"]["workers"] = "1" + configmap = yaml.safe_load(f) + configmap["data"]["workers"] = "1" with open("manifests/configmap.yaml", 'w') as f: yaml.dump(configmap, f, Dumper=yaml.Dumper) @@ -129,8 +130,8 @@ class EndToEndTestCase(unittest.TestCase): k8s.wait_for_operator_pod_start() # reset taints and tolerations - k8s.api.core_v1.patch_node("postgres-operator-e2e-tests-worker",{"spec":{"taints":[]}}) - k8s.api.core_v1.patch_node("postgres-operator-e2e-tests-worker2",{"spec":{"taints":[]}}) + k8s.api.core_v1.patch_node("postgres-operator-e2e-tests-worker", {"spec": {"taints": []}}) + k8s.api.core_v1.patch_node("postgres-operator-e2e-tests-worker2", {"spec": {"taints": []}}) # make sure we start a new operator on every new run, # this tackles the problem when kind is reused @@ -160,26 +161,76 @@ class EndToEndTestCase(unittest.TestCase): the end turn connection pooler off to not interfere with other tests. ''' k8s = self.k8s - service_labels = { - 'cluster-name': 'acid-minimal-cluster', - } - pod_labels = dict({ - 'connection-pooler': 'acid-minimal-cluster-pooler', - }) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - # enable connection pooler k8s.api.custom_objects_api.patch_namespaced_custom_object( 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', { 'spec': { 'enableConnectionPooler': True, + 'enableReplicaConnectionPooler': True, } }) - self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2, "Deployment replicas is 2 default") - self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), 2, "No pooler pods found") - self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), 1, "No pooler service found") + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2, + "Deployment replicas is 2 default") + self.eventuallyEqual(lambda: k8s.count_running_pods( + "connection-pooler=acid-minimal-cluster-pooler"), + 2, "No pooler pods found") + self.eventuallyEqual(lambda: k8s.count_running_pods( + "connection-pooler=acid-minimal-cluster-pooler-repl"), + 2, "No pooler replica pods found") + self.eventuallyEqual(lambda: k8s.count_services_with_label( + 'application=db-connection-pooler,cluster-name=acid-minimal-cluster'), + 2, "No pooler service found") + + # Turn off only master connection pooler + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPooler': False, + 'enableReplicaConnectionPooler': True, + } + }) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name="acid-minimal-cluster-pooler-repl"), 2, + "Deployment replicas is 2 default") + self.eventuallyEqual(lambda: k8s.count_running_pods( + "connection-pooler=acid-minimal-cluster-pooler"), + 0, "Master pooler pods not deleted") + self.eventuallyEqual(lambda: k8s.count_running_pods( + "connection-pooler=acid-minimal-cluster-pooler-repl"), + 2, "Pooler replica pods not found") + self.eventuallyEqual(lambda: k8s.count_services_with_label( + 'application=db-connection-pooler,cluster-name=acid-minimal-cluster'), + 1, "No pooler service found") + + # Turn off only replica connection pooler + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPooler': True, + 'enableReplicaConnectionPooler': False, + } + }) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2, + "Deployment replicas is 2 default") + self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), + 2, "Master pooler pods not found") + self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler-repl"), + 0, "Pooler replica pods not deleted") + self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), + 1, "No pooler service found") # scale up connection pooler deployment k8s.api.custom_objects_api.patch_namespaced_custom_object( @@ -193,8 +244,10 @@ class EndToEndTestCase(unittest.TestCase): } }) - self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 3, "Deployment replicas is scaled to 3") - self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), 3, "Scale up of pooler pods does not work") + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 3, + "Deployment replicas is scaled to 3") + self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), + 3, "Scale up of pooler pods does not work") # turn it off, keeping config should be overwritten by false k8s.api.custom_objects_api.patch_namespaced_custom_object( @@ -202,12 +255,15 @@ class EndToEndTestCase(unittest.TestCase): 'postgresqls', 'acid-minimal-cluster', { 'spec': { - 'enableConnectionPooler': False + 'enableConnectionPooler': False, + 'enableReplicaConnectionPooler': False, } }) - self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), 0, "Pooler pods not scaled down") - self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), 0, "Pooler service not removed") + self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), + 0, "Pooler pods not scaled down") + self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), + 0, "Pooler service not removed") # Verify that all the databases have pooler schema installed. # Do this via psql, since otherwise we need to deal with @@ -267,7 +323,8 @@ class EndToEndTestCase(unittest.TestCase): 'postgresqls', 'acid-minimal-cluster', { 'spec': { - 'connectionPooler': None + 'connectionPooler': None, + 'EnableReplicaConnectionPooler': False, } }) @@ -281,8 +338,8 @@ class EndToEndTestCase(unittest.TestCase): cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster,spilo-role={}' self.eventuallyEqual(lambda: k8s.get_service_type(cluster_label.format("master")), - 'ClusterIP', - "Expected ClusterIP type initially, found {}") + 'ClusterIP', + "Expected ClusterIP type initially, found {}") try: # enable load balancer services @@ -294,14 +351,14 @@ class EndToEndTestCase(unittest.TestCase): } k8s.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_enable_lbs) - + self.eventuallyEqual(lambda: k8s.get_service_type(cluster_label.format("master")), 'LoadBalancer', - "Expected LoadBalancer service type for master, found {}") + "Expected LoadBalancer service type for master, found {}") self.eventuallyEqual(lambda: k8s.get_service_type(cluster_label.format("replica")), 'LoadBalancer', - "Expected LoadBalancer service type for master, found {}") + "Expected LoadBalancer service type for master, found {}") # disable load balancer services again pg_patch_disable_lbs = { @@ -312,14 +369,14 @@ class EndToEndTestCase(unittest.TestCase): } k8s.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_lbs) - + self.eventuallyEqual(lambda: k8s.get_service_type(cluster_label.format("master")), 'ClusterIP', - "Expected LoadBalancer service type for master, found {}") + "Expected LoadBalancer service type for master, found {}") self.eventuallyEqual(lambda: k8s.get_service_type(cluster_label.format("replica")), 'ClusterIP', - "Expected LoadBalancer service type for master, found {}") + "Expected LoadBalancer service type for master, found {}") except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) @@ -333,7 +390,8 @@ class EndToEndTestCase(unittest.TestCase): k8s = self.k8s # update infrastructure roles description secret_name = "postgresql-infrastructure-roles" - roles = "secretname: postgresql-infrastructure-roles-new, userkey: user, rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon" + roles = "secretname: postgresql-infrastructure-roles-new, userkey: user,"\ + "rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon" patch_infrastructure_roles = { "data": { "infrastructure_roles_secret_name": secret_name, @@ -341,7 +399,8 @@ class EndToEndTestCase(unittest.TestCase): }, } k8s.update_config(patch_infrastructure_roles) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") try: # check that new roles are represented in the config by requesting the @@ -351,11 +410,12 @@ class EndToEndTestCase(unittest.TestCase): try: operator_pod = k8s.get_operator_pod() get_config_cmd = "wget --quiet -O - localhost:8080/config" - result = k8s.exec_with_kubectl(operator_pod.metadata.name, get_config_cmd) + result = k8s.exec_with_kubectl(operator_pod.metadata.name, + get_config_cmd) try: roles_dict = (json.loads(result.stdout) - .get("controller", {}) - .get("InfrastructureRoles")) + .get("controller", {}) + .get("InfrastructureRoles")) except: return False @@ -377,7 +437,6 @@ class EndToEndTestCase(unittest.TestCase): return False self.eventuallyTrue(verify_role, "infrastructure role setup is not loaded") - except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) @@ -386,13 +445,15 @@ class EndToEndTestCase(unittest.TestCase): @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_lazy_spilo_upgrade(self): ''' - Test lazy upgrade for the Spilo image: operator changes a stateful set but lets pods run with the old image - until they are recreated for reasons other than operator's activity. That works because the operator configures - stateful sets to use "onDelete" pod update policy. + Test lazy upgrade for the Spilo image: operator changes a stateful set + but lets pods run with the old image until they are recreated for + reasons other than operator's activity. That works because the operator + configures stateful sets to use "onDelete" pod update policy. The test covers: 1) enabling lazy upgrade in existing operator deployment - 2) forcing the normal rolling upgrade by changing the operator configmap and restarting its pod + 2) forcing the normal rolling upgrade by changing the operator + configmap and restarting its pod ''' k8s = self.k8s @@ -400,8 +461,10 @@ class EndToEndTestCase(unittest.TestCase): pod0 = 'acid-minimal-cluster-0' pod1 = 'acid-minimal-cluster-1' - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running") + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, + "No 2 pods running") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), + 2, "Postgres status did not enter running") patch_lazy_spilo_upgrade = { "data": { @@ -409,14 +472,20 @@ class EndToEndTestCase(unittest.TestCase): "enable_lazy_spilo_upgrade": "false" } } - k8s.update_config(patch_lazy_spilo_upgrade, step="Init baseline image version") + k8s.update_config(patch_lazy_spilo_upgrade, + step="Init baseline image version") - self.eventuallyEqual(lambda: k8s.get_statefulset_image(), SPILO_CURRENT, "Stagefulset not updated initially") - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running") + self.eventuallyEqual(lambda: k8s.get_statefulset_image(), SPILO_CURRENT, + "Statefulset not updated initially") + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, + "No 2 pods running") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), + 2, "Postgres status did not enter running") - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), SPILO_CURRENT, "Rolling upgrade was not executed") - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), SPILO_CURRENT, "Rolling upgrade was not executed") + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), + SPILO_CURRENT, "Rolling upgrade was not executed") + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), + SPILO_CURRENT, "Rolling upgrade was not executed") # update docker image in config and enable the lazy upgrade conf_image = SPILO_LAZY @@ -426,18 +495,25 @@ class EndToEndTestCase(unittest.TestCase): "enable_lazy_spilo_upgrade": "true" } } - k8s.update_config(patch_lazy_spilo_upgrade,step="patch image and lazy upgrade") - self.eventuallyEqual(lambda: k8s.get_statefulset_image(), conf_image, "Statefulset not updated to next Docker image") + k8s.update_config(patch_lazy_spilo_upgrade, + step="patch image and lazy upgrade") + self.eventuallyEqual(lambda: k8s.get_statefulset_image(), conf_image, + "Statefulset not updated to next Docker image") try: # restart the pod to get a container with the new image - k8s.api.core_v1.delete_namespaced_pod(pod0, 'default') - + k8s.api.core_v1.delete_namespaced_pod(pod0, 'default') + # verify only pod-0 which was deleted got new image from statefulset - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), conf_image, "Delete pod-0 did not get new spilo image") - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running") - self.assertNotEqual(lambda: k8s.get_effective_pod_image(pod1), SPILO_CURRENT, "pod-1 should not have change Docker image to {}".format(SPILO_CURRENT)) + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), + conf_image, "Delete pod-0 did not get new spilo image") + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, + "No two pods running after lazy rolling upgrade") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), + 2, "Postgres status did not enter running") + self.assertNotEqual(lambda: k8s.get_effective_pod_image(pod1), + SPILO_CURRENT, + "pod-1 should not have change Docker image to {}".format(SPILO_CURRENT)) # clean up unpatch_lazy_spilo_upgrade = { @@ -449,9 +525,14 @@ class EndToEndTestCase(unittest.TestCase): # at this point operator will complete the normal rolling upgrade # so we additonally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), conf_image, "Rolling upgrade was not executed", 50, 3) - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), conf_image, "Rolling upgrade was not executed", 50, 3) - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running") + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), + conf_image, "Rolling upgrade was not executed", + 50, 3) + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), + conf_image, "Rolling upgrade was not executed", + 50, 3) + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), + 2, "Postgres status did not enter running") except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) @@ -505,9 +586,9 @@ class EndToEndTestCase(unittest.TestCase): def get_docker_image(): jobs = k8s.get_logical_backup_job().items return jobs[0].spec.job_template.spec.template.spec.containers[0].image - + self.eventuallyEqual(get_docker_image, image, - "Expected job image {}, found {}".format(image, "{}")) + "Expected job image {}, found {}".format(image, "{}")) # delete the logical backup cron job pg_patch_disable_backup = { @@ -517,7 +598,7 @@ class EndToEndTestCase(unittest.TestCase): } k8s.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_backup) - + self.eventuallyEqual(lambda: len(k8s.get_logical_backup_job().items), 0, "failed to create logical backup job") except timeout_decorator.TimeoutError: @@ -563,21 +644,21 @@ class EndToEndTestCase(unittest.TestCase): } k8s.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) - - k8s.patch_statefulset({"metadata":{"annotations":{"zalando-postgres-operator-rolling-update-required": "False"}}}) + + k8s.patch_statefulset({"metadata": {"annotations": {"zalando-postgres-operator-rolling-update-required": "False"}}}) k8s.update_config(patch_min_resource_limits, "Minimum resource test") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running") - + def verify_pod_limits(): pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector="cluster-name=acid-minimal-cluster,application=spilo").items - if len(pods)<2: + if len(pods) < 2: return False - r = pods[0].spec.containers[0].resources.limits['memory']==minMemoryLimit + r = pods[0].spec.containers[0].resources.limits['memory'] == minMemoryLimit r = r and pods[0].spec.containers[0].resources.limits['cpu'] == minCPULimit - r = r and pods[1].spec.containers[0].resources.limits['memory']==minMemoryLimit + r = r and pods[1].spec.containers[0].resources.limits['memory'] == minMemoryLimit r = r and pods[1].spec.containers[0].resources.limits['cpu'] == minCPULimit return r @@ -586,7 +667,7 @@ class EndToEndTestCase(unittest.TestCase): @classmethod def setUp(cls): # cls.k8s.update_config({}, step="Setup") - cls.k8s.patch_statefulset({"meta":{"annotations":{"zalando-postgres-operator-rolling-update-required": False}}}) + cls.k8s.patch_statefulset({"meta": {"annotations": {"zalando-postgres-operator-rolling-update-required": False}}}) pass @timeout_decorator.timeout(TEST_TIMEOUT_SEC) @@ -642,7 +723,7 @@ class EndToEndTestCase(unittest.TestCase): } } } - self.assertTrue(len(failover_targets)>0, "No failover targets available") + self.assertTrue(len(failover_targets) > 0, "No failover targets available") for failover_target in failover_targets: k8s.api.core_v1.patch_node(failover_target, patch_readiness_label) @@ -672,12 +753,12 @@ class EndToEndTestCase(unittest.TestCase): Scale up from 2 to 3 and back to 2 pods by updating the Postgres manifest at runtime. ''' k8s = self.k8s - pod="acid-minimal-cluster-0" + pod = "acid-minimal-cluster-0" - k8s.scale_cluster(3) + k8s.scale_cluster(3) self.eventuallyEqual(lambda: k8s.count_running_pods(), 3, "Scale up to 3 failed") self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod)), 3, "Not all 3 nodes healthy") - + k8s.scale_cluster(2) self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "Scale down to 2 failed") self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod)), 2, "Not all members 2 healthy") @@ -756,6 +837,7 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing") + self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing") @timeout_decorator.timeout(TEST_TIMEOUT_SEC) @unittest.skip("Skipping this test until fixed") @@ -798,7 +880,7 @@ class EndToEndTestCase(unittest.TestCase): "toleration": "key:postgres,operator:Exists,effect:NoExecute" } } - + k8s.update_config(patch_toleration_config, step="allow tainted nodes") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") @@ -825,13 +907,14 @@ class EndToEndTestCase(unittest.TestCase): } } k8s.update_config(patch_delete_annotations) + time.sleep(25) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") try: # this delete attempt should be omitted because of missing annotations k8s.api.custom_objects_api.delete_namespaced_custom_object( "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster") - time.sleep(5) + time.sleep(15) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") # check that pods and services are still there diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 79d1251e6..e6fb9a43c 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -18,7 +18,8 @@ spec: - createdb enableMasterLoadBalancer: false enableReplicaLoadBalancer: false -# enableConnectionPooler: true # not needed when connectionPooler section is present (see below) + enableConnectionPooler: false # enable/disable connection pooler deployment + enableReplicaConnectionPooler: false # set to enable connectionPooler for replica service allowedSourceRanges: # load balancers' source ranges for both master and replica services - 127.0.0.1/32 databases: @@ -126,18 +127,19 @@ spec: # - 01:00-06:00 #UTC # - Sat:00:00-04:00 - connectionPooler: - numberOfInstances: 2 - mode: "transaction" - schema: "pooler" - user: "pooler" - resources: - requests: - cpu: 300m - memory: 100Mi - limits: - cpu: "1" - memory: 100Mi +# overwrite custom properties for connection pooler deployments +# connectionPooler: +# numberOfInstances: 2 +# mode: "transaction" +# schema: "pooler" +# user: "pooler" +# resources: +# requests: +# cpu: 300m +# memory: 100Mi +# limits: +# cpu: "1" +# memory: 100Mi initContainers: - name: date diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 16bdac564..5ee05f444 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -186,6 +186,8 @@ spec: type: string enableConnectionPooler: type: boolean + enableReplicaConnectionPooler: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index 92b904bae..2ed0d6b01 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -262,6 +262,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ "enableConnectionPooler": { Type: "boolean", }, + "enableReplicaConnectionPooler": { + Type: "boolean", + }, "enableLogicalBackup": { Type: "boolean", }, diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 499a4cfda..a3dc490b5 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -29,8 +29,9 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` - EnableConnectionPooler *bool `json:"enableConnectionPooler,omitempty"` - ConnectionPooler *ConnectionPooler `json:"connectionPooler,omitempty"` + EnableConnectionPooler *bool `json:"enableConnectionPooler,omitempty"` + EnableReplicaConnectionPooler *bool `json:"enableReplicaConnectionPooler,omitempty"` + ConnectionPooler *ConnectionPooler `json:"connectionPooler,omitempty"` TeamID string `json:"teamId"` DockerImage string `json:"dockerImage,omitempty"` diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 7ec7be176..ee5c44bc9 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -12,9 +12,9 @@ import ( "sync" "time" - "github.com/r3labs/diff" "github.com/sirupsen/logrus" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" "github.com/zalando/postgres-operator/pkg/spec" pgteams "github.com/zalando/postgres-operator/pkg/teams" @@ -54,26 +54,11 @@ type Config struct { PodServiceAccountRoleBinding *rbacv1.RoleBinding } -// K8S objects that are belongs to a connection pooler -type ConnectionPoolerObjects struct { - Deployment *appsv1.Deployment - Service *v1.Service - - // It could happen that a connection pooler was enabled, but the operator - // was not able to properly process a corresponding event or was restarted. - // In this case we will miss missing/require situation and a lookup function - // will not be installed. To avoid synchronizing it all the time to prevent - // this, we can remember the result in memory at least until the next - // restart. - LookupFunction bool -} - type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet - ConnectionPooler *ConnectionPoolerObjects PodDisruptionBudget *policybeta1.PodDisruptionBudget //Pods are treated separately //PVCs are treated separately @@ -103,9 +88,8 @@ type Cluster struct { currentProcess Process processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex - + ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects } - type compareStatefulsetResult struct { match bool replace bool @@ -347,19 +331,7 @@ func (c *Cluster) Create() error { // // Do not consider connection pooler as a strict requirement, and if // something fails, report warning - if c.needConnectionPooler() { - if c.ConnectionPooler != nil { - c.logger.Warning("Connection pooler already exists in the cluster") - return nil - } - connectionPooler, err := c.createConnectionPooler(c.installLookupFunction) - if err != nil { - c.logger.Warningf("could not create connection pooler: %v", err) - return nil - } - c.logger.Infof("connection pooler %q has been successfully created", - util.NameFromMeta(connectionPooler.Deployment.ObjectMeta)) - } + c.createConnectionPooler(c.installLookupFunction) return nil } @@ -626,6 +598,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } }() + logNiceDiff(c.logger, oldSpec, newSpec) + if oldSpec.Spec.PostgresqlParam.PgVersion > newSpec.Spec.PostgresqlParam.PgVersion { c.logger.Warningf("postgresql version change(%q -> %q) has no effect", oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) @@ -641,7 +615,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // Service if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) || !reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) { - c.logger.Debugf("syncing services") if err := c.syncServices(); err != nil { c.logger.Errorf("could not sync services: %v", err) updateFailed = true @@ -652,7 +625,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // initUsers. Check if it needs to be called. sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) && reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) - needConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec) + needConnectionPooler := needMasterConnectionPoolerWorker(&newSpec.Spec) || + needReplicaConnectionPoolerWorker(&newSpec.Spec) if !sameUsers || needConnectionPooler { c.logger.Debugf("syncing secrets") if err := c.initUsers(); err != nil { @@ -797,10 +771,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // need to process. In the future we may want to do this more careful and // check which databases we need to process, but even repeating the whole // installation process should be good enough. - c.ConnectionPooler.LookupFunction = false - if _, err := c.syncConnectionPooler(oldSpec, newSpec, - c.installLookupFunction); err != nil { + if _, err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil { c.logger.Errorf("could not sync connection pooler: %v", err) updateFailed = true } @@ -808,6 +780,20 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { return nil } +func syncResources(a, b *v1.ResourceRequirements) bool { + for _, res := range []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } { + if !a.Limits[res].Equal(b.Limits[res]) || + !a.Requests[res].Equal(b.Requests[res]) { + return true + } + } + + return false +} + // Delete deletes the cluster and cleans up all objects associated with it (including statefulsets). // The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes // DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint @@ -856,9 +842,12 @@ func (c *Cluster) Delete() { // Delete connection pooler objects anyway, even if it's not mentioned in the // manifest, just to not keep orphaned components in case if something went // wrong - if err := c.deleteConnectionPooler(); err != nil { - c.logger.Warningf("could not remove connection pooler: %v", err) + for _, role := range [2]PostgresRole{Master, Replica} { + if err := c.deleteConnectionPooler(role); err != nil { + c.logger.Warningf("could not remove connection pooler: %v", err) + } } + } //NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status). @@ -928,7 +917,7 @@ func (c *Cluster) initSystemUsers() { // Connection pooler user is an exception, if requested it's going to be // created by operator as a normal pgUser - if c.needConnectionPooler() { + if needConnectionPooler(&c.Spec) { // initialize empty connection pooler if not done yet if c.Spec.ConnectionPooler == nil { c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{} @@ -1423,119 +1412,3 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error { return c.deleteClusterObject(get, deleteConfigMapFn, "configmap") } - -// Test if two connection pooler configuration needs to be synced. For simplicity -// compare not the actual K8S objects, but the configuration itself and request -// sync if there is any difference. -func (c *Cluster) needSyncConnectionPoolerSpecs(oldSpec, newSpec *acidv1.ConnectionPooler) (sync bool, reasons []string) { - reasons = []string{} - sync = false - - changelog, err := diff.Diff(oldSpec, newSpec) - if err != nil { - c.logger.Infof("Cannot get diff, do not do anything, %+v", err) - return false, reasons - } - - if len(changelog) > 0 { - sync = true - } - - for _, change := range changelog { - msg := fmt.Sprintf("%s %+v from '%+v' to '%+v'", - change.Type, change.Path, change.From, change.To) - reasons = append(reasons, msg) - } - - return sync, reasons -} - -func syncResources(a, b *v1.ResourceRequirements) bool { - for _, res := range []v1.ResourceName{ - v1.ResourceCPU, - v1.ResourceMemory, - } { - if !a.Limits[res].Equal(b.Limits[res]) || - !a.Requests[res].Equal(b.Requests[res]) { - return true - } - } - - return false -} - -// Check if we need to synchronize connection pooler deployment due to new -// defaults, that are different from what we see in the DeploymentSpec -func (c *Cluster) needSyncConnectionPoolerDefaults( - spec *acidv1.ConnectionPooler, - deployment *appsv1.Deployment) (sync bool, reasons []string) { - - reasons = []string{} - sync = false - - config := c.OpConfig.ConnectionPooler - podTemplate := deployment.Spec.Template - poolerContainer := podTemplate.Spec.Containers[constants.ConnectionPoolerContainer] - - if spec == nil { - spec = &acidv1.ConnectionPooler{} - } - - if spec.NumberOfInstances == nil && - *deployment.Spec.Replicas != *config.NumberOfInstances { - - sync = true - msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)", - *deployment.Spec.Replicas, *config.NumberOfInstances) - reasons = append(reasons, msg) - } - - if spec.DockerImage == "" && - poolerContainer.Image != config.Image { - - sync = true - msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", - poolerContainer.Image, config.Image) - reasons = append(reasons, msg) - } - - expectedResources, err := generateResourceRequirements(spec.Resources, - c.makeDefaultConnectionPoolerResources()) - - // An error to generate expected resources means something is not quite - // right, but for the purpose of robustness do not panic here, just report - // and ignore resources comparison (in the worst case there will be no - // updates for new resource values). - if err == nil && syncResources(&poolerContainer.Resources, expectedResources) { - sync = true - msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", - poolerContainer.Resources, expectedResources) - reasons = append(reasons, msg) - } - - if err != nil { - c.logger.Warningf("Cannot generate expected resources, %v", err) - } - - for _, env := range poolerContainer.Env { - if spec.User == "" && env.Name == "PGUSER" { - ref := env.ValueFrom.SecretKeyRef.LocalObjectReference - - if ref.Name != c.credentialSecretName(config.User) { - sync = true - msg := fmt.Sprintf("pooler user is different (having %s, required %s)", - ref.Name, config.User) - reasons = append(reasons, msg) - } - } - - if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema { - sync = true - msg := fmt.Sprintf("pooler schema is different (having %s, required %s)", - env.Value, config.Schema) - reasons = append(reasons, msg) - } - } - - return sync, reasons -} diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go new file mode 100644 index 000000000..0d9171b87 --- /dev/null +++ b/pkg/cluster/connection_pooler.go @@ -0,0 +1,905 @@ +package cluster + +import ( + "context" + "fmt" + "strings" + + "github.com/r3labs/diff" + "github.com/sirupsen/logrus" + acidzalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/zalando/postgres-operator/pkg/util" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" +) + +// K8S objects that are belong to connection pooler +type ConnectionPoolerObjects struct { + Deployment *appsv1.Deployment + Service *v1.Service + Name string + ClusterName string + Namespace string + Role PostgresRole + // It could happen that a connection pooler was enabled, but the operator + // was not able to properly process a corresponding event or was restarted. + // In this case we will miss missing/require situation and a lookup function + // will not be installed. To avoid synchronizing it all the time to prevent + // this, we can remember the result in memory at least until the next + // restart. + LookupFunction bool + // Careful with referencing cluster.spec this object pointer changes + // during runtime and lifetime of cluster +} + +func (c *Cluster) connectionPoolerName(role PostgresRole) string { + name := c.Name + "-pooler" + if role == Replica { + name = name + "-repl" + } + return name +} + +// isConnectionPoolerEnabled +func needConnectionPooler(spec *acidv1.PostgresSpec) bool { + return needMasterConnectionPoolerWorker(spec) || + needReplicaConnectionPoolerWorker(spec) +} + +func needMasterConnectionPooler(spec *acidv1.PostgresSpec) bool { + return needMasterConnectionPoolerWorker(spec) +} + +func needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { + return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || + (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) +} + +func needReplicaConnectionPooler(spec *acidv1.PostgresSpec) bool { + return needReplicaConnectionPoolerWorker(spec) +} + +func needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { + return spec.EnableReplicaConnectionPooler != nil && + *spec.EnableReplicaConnectionPooler +} + +// Return connection pooler labels selector, which should from one point of view +// inherit most of the labels from the cluster itself, but at the same time +// have e.g. different `application` label, so that recreatePod operation will +// not interfere with it (it lists all the pods via labels, and if there would +// be no difference, it will recreate also pooler pods). +func (c *Cluster) connectionPoolerLabelsSelector(role PostgresRole) *metav1.LabelSelector { + connectionPoolerLabels := labels.Set(map[string]string{}) + + extraLabels := labels.Set(map[string]string{ + "connection-pooler": c.connectionPoolerName(role), + "application": "db-connection-pooler", + "spilo-role": string(role), + "cluster-name": c.Name, + "Namespace": c.Namespace, + }) + + connectionPoolerLabels = labels.Merge(connectionPoolerLabels, c.labelsSet(false)) + connectionPoolerLabels = labels.Merge(connectionPoolerLabels, extraLabels) + + return &metav1.LabelSelector{ + MatchLabels: connectionPoolerLabels, + MatchExpressions: nil, + } +} + +// Prepare the database for connection pooler to be used, i.e. install lookup +// function (do it first, because it should be fast and if it didn't succeed, +// it doesn't makes sense to create more K8S objects. At this moment we assume +// that necessary connection pooler user exists. +// +// After that create all the objects for connection pooler, namely a deployment +// with a chosen pooler and a service to expose it. + +// have connectionpooler name in the cp object to have it immutable name +// add these cp related functions to a new cp file +// opConfig, cluster, and database name +func (c *Cluster) createConnectionPooler(LookupFunction InstallFunction) (SyncReason, error) { + var reason SyncReason + c.setProcessName("creating connection pooler") + + //this is essentially sync with nil as oldSpec + if reason, err := c.syncConnectionPooler(nil, &c.Postgresql, LookupFunction); err != nil { + return reason, err + } + return reason, nil +} + +// +// Generate pool size related environment variables. +// +// MAX_DB_CONN would specify the global maximum for connections to a target +// database. +// +// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough. +// +// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when +// most of the queries coming through a connection pooler are from the same +// user to the same db). In case if we want to spin up more connection pooler +// instances, take this into account and maintain the same number of +// connections. +// +// MIN_SIZE is a pool's minimal size, to prevent situation when sudden workload +// have to wait for spinning up a new connections. +// +// RESERVE_SIZE is how many additional connections to allow for a pooler. +func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar { + spec := &c.Spec + effectiveMode := util.Coalesce( + spec.ConnectionPooler.Mode, + c.OpConfig.ConnectionPooler.Mode) + + numberOfInstances := spec.ConnectionPooler.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPooler.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + effectiveMaxDBConn := util.CoalesceInt32( + spec.ConnectionPooler.MaxDBConnections, + c.OpConfig.ConnectionPooler.MaxDBConnections) + + if effectiveMaxDBConn == nil { + effectiveMaxDBConn = k8sutil.Int32ToPointer( + constants.ConnectionPoolerMaxDBConnections) + } + + maxDBConn := *effectiveMaxDBConn / *numberOfInstances + + defaultSize := maxDBConn / 2 + minSize := defaultSize / 2 + reserveSize := minSize + + return []v1.EnvVar{ + { + Name: "CONNECTION_POOLER_PORT", + Value: fmt.Sprint(pgPort), + }, + { + Name: "CONNECTION_POOLER_MODE", + Value: effectiveMode, + }, + { + Name: "CONNECTION_POOLER_DEFAULT_SIZE", + Value: fmt.Sprint(defaultSize), + }, + { + Name: "CONNECTION_POOLER_MIN_SIZE", + Value: fmt.Sprint(minSize), + }, + { + Name: "CONNECTION_POOLER_RESERVE_SIZE", + Value: fmt.Sprint(reserveSize), + }, + { + Name: "CONNECTION_POOLER_MAX_CLIENT_CONN", + Value: fmt.Sprint(constants.ConnectionPoolerMaxClientConnections), + }, + { + Name: "CONNECTION_POOLER_MAX_DB_CONN", + Value: fmt.Sprint(maxDBConn), + }, + } +} + +func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) ( + *v1.PodTemplateSpec, error) { + spec := &c.Spec + gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) + resources, err := generateResourceRequirements( + spec.ConnectionPooler.Resources, + makeDefaultConnectionPoolerResources(&c.OpConfig)) + + effectiveDockerImage := util.Coalesce( + spec.ConnectionPooler.DockerImage, + c.OpConfig.ConnectionPooler.Image) + + effectiveSchema := util.Coalesce( + spec.ConnectionPooler.Schema, + c.OpConfig.ConnectionPooler.Schema) + + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements: %v", err) + } + + secretSelector := func(key string) *v1.SecretKeySelector { + effectiveUser := util.Coalesce( + spec.ConnectionPooler.User, + c.OpConfig.ConnectionPooler.User) + + return &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(effectiveUser), + }, + Key: key, + } + } + + envVars := []v1.EnvVar{ + { + Name: "PGHOST", + Value: c.serviceAddress(role), + }, + { + Name: "PGPORT", + Value: c.servicePort(role), + }, + { + Name: "PGUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + // the convention is to use the same schema name as + // connection pooler username + { + Name: "PGSCHEMA", + Value: effectiveSchema, + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("password"), + }, + }, + } + envVars = append(envVars, c.getConnectionPoolerEnvVars()...) + + poolerContainer := v1.Container{ + Name: connectionPoolerContainer, + Image: effectiveDockerImage, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resources, + Ports: []v1.ContainerPort{ + { + ContainerPort: pgPort, + Protocol: v1.ProtocolTCP, + }, + }, + Env: envVars, + ReadinessProbe: &v1.Probe{ + Handler: v1.Handler{ + TCPSocket: &v1.TCPSocketAction{ + Port: intstr.IntOrString{IntVal: pgPort}, + }, + }, + }, + } + + podTemplate := &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: c.connectionPoolerLabelsSelector(role).MatchLabels, + Namespace: c.Namespace, + Annotations: c.generatePodAnnotations(spec), + }, + Spec: v1.PodSpec{ + ServiceAccountName: c.OpConfig.PodServiceAccountName, + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{poolerContainer}, + // TODO: add tolerations to scheduler pooler on the same node + // as database + //Tolerations: *tolerationsSpec, + }, + } + + return podTemplate, nil +} + +func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *ConnectionPoolerObjects) ( + *appsv1.Deployment, error) { + spec := &c.Spec + + // there are two ways to enable connection pooler, either to specify a + // connectionPooler section or enableConnectionPooler. In the second case + // spec.connectionPooler will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPooler == nil { + spec.ConnectionPooler = &acidv1.ConnectionPooler{} + } + podTemplate, err := c.generateConnectionPoolerPodTemplate(connectionPooler.Role) + + numberOfInstances := spec.ConnectionPooler.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPooler.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + if *numberOfInstances < constants.ConnectionPoolerMinInstances { + msg := "Adjusted number of connection pooler instances from %d to %d" + c.logger.Warningf(msg, numberOfInstances, constants.ConnectionPoolerMinInstances) + + *numberOfInstances = constants.ConnectionPoolerMinInstances + } + + if err != nil { + return nil, err + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: connectionPooler.Name, + Namespace: connectionPooler.Namespace, + Labels: c.connectionPoolerLabelsSelector(connectionPooler.Role).MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this deployment, but there is a hope that this object + // will be garbage collected if something went wrong and operator + // didn't deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: numberOfInstances, + Selector: c.connectionPoolerLabelsSelector(connectionPooler.Role), + Template: *podTemplate, + }, + } + + return deployment, nil +} + +func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPoolerObjects) *v1.Service { + + spec := &c.Spec + // there are two ways to enable connection pooler, either to specify a + // connectionPooler section or enableConnectionPooler. In the second case + // spec.connectionPooler will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPooler == nil { + spec.ConnectionPooler = &acidv1.ConnectionPooler{} + } + + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: connectionPooler.Name, + Port: pgPort, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(connectionPooler.Role)}, + }, + }, + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "connection-pooler": c.connectionPoolerName(connectionPooler.Role), + }, + } + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: connectionPooler.Name, + Namespace: connectionPooler.Namespace, + Labels: c.connectionPoolerLabelsSelector(connectionPooler.Role).MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this service, but there is a hope that this object will + // be garbage collected if something went wrong and operator didn't + // deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: serviceSpec, + } + + return service +} + +//delete connection pooler +func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { + c.logger.Infof("deleting connection pooler spilo-role=%s", role) + + // Lack of connection pooler objects is not a fatal error, just log it if + // it was present before in the manifest + if c.ConnectionPooler[role] == nil || role == "" { + c.logger.Debugf("no connection pooler to delete") + return nil + } + + // Clean up the deployment object. If deployment resource we've remembered + // is somehow empty, try to delete based on what would we generate + var deployment *appsv1.Deployment + deployment = c.ConnectionPooler[role].Deployment + + policy := metav1.DeletePropagationForeground + options := metav1.DeleteOptions{PropagationPolicy: &policy} + + if deployment != nil { + + // set delete propagation policy to foreground, so that replica set will be + // also deleted. + + err = c.KubeClient. + Deployments(c.Namespace). + Delete(context.TODO(), deployment.Name, options) + + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("connection pooler deployment was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete connection pooler deployment: %v", err) + } + + c.logger.Infof("connection pooler deployment %s has been deleted for role %s", deployment.Name, role) + } + + // Repeat the same for the service object + var service *v1.Service + service = c.ConnectionPooler[role].Service + if service == nil { + c.logger.Debugf("no connection pooler service object to delete") + } else { + + err = c.KubeClient. + Services(c.Namespace). + Delete(context.TODO(), service.Name, options) + + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("connection pooler service was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete connection pooler service: %v", err) + } + + c.logger.Infof("connection pooler service %s has been deleted for role %s", service.Name, role) + } + + c.ConnectionPooler[role].Deployment = nil + c.ConnectionPooler[role].Service = nil + return nil +} + +//delete connection pooler +func (c *Cluster) deleteConnectionPoolerSecret() (err error) { + // Repeat the same for the secret object + secretName := c.credentialSecretName(c.OpConfig.ConnectionPooler.User) + + secret, err := c.KubeClient. + Secrets(c.Namespace). + Get(context.TODO(), secretName, metav1.GetOptions{}) + + if err != nil { + c.logger.Debugf("could not get connection pooler secret %s: %v", secretName, err) + } else { + if err = c.deleteSecret(secret.UID, *secret); err != nil { + return fmt.Errorf("could not delete pooler secret: %v", err) + } + } + return nil +} + +// Perform actual patching of a connection pooler deployment, assuming that all +// the check were already done before. +func updateConnectionPoolerDeployment(KubeClient k8sutil.KubernetesClient, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { + if newDeployment == nil { + return nil, fmt.Errorf("there is no connection pooler in the cluster") + } + + patchData, err := specPatch(newDeployment.Spec) + if err != nil { + return nil, fmt.Errorf("could not form patch for the connection pooler deployment: %v", err) + } + + // An update probably requires RetryOnConflict, but since only one operator + // worker at one time will try to update it chances of conflicts are + // minimal. + deployment, err := KubeClient. + Deployments(newDeployment.Namespace).Patch( + context.TODO(), + newDeployment.Name, + types.MergePatchType, + patchData, + metav1.PatchOptions{}, + "") + if err != nil { + return nil, fmt.Errorf("could not patch connection pooler deployment: %v", err) + } + + return deployment, nil +} + +//updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment +func updateConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) { + patchData, err := metaAnnotationsPatch(annotations) + if err != nil { + return nil, fmt.Errorf("could not form patch for the connection pooler deployment metadata: %v", err) + } + result, err := KubeClient.Deployments(deployment.Namespace).Patch( + context.TODO(), + deployment.Name, + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "") + if err != nil { + return nil, fmt.Errorf("could not patch connection pooler annotations %q: %v", patchData, err) + } + return result, nil + +} + +// Test if two connection pooler configuration needs to be synced. For simplicity +// compare not the actual K8S objects, but the configuration itself and request +// sync if there is any difference. +func needSyncConnectionPoolerSpecs(oldSpec, newSpec *acidv1.ConnectionPooler) (sync bool, reasons []string) { + reasons = []string{} + sync = false + + changelog, err := diff.Diff(oldSpec, newSpec) + if err != nil { + //c.logger.Infof("Cannot get diff, do not do anything, %+v", err) + return false, reasons + } + + if len(changelog) > 0 { + sync = true + } + + for _, change := range changelog { + msg := fmt.Sprintf("%s %+v from '%+v' to '%+v'", + change.Type, change.Path, change.From, change.To) + reasons = append(reasons, msg) + } + + return sync, reasons +} + +// Check if we need to synchronize connection pooler deployment due to new +// defaults, that are different from what we see in the DeploymentSpec +func needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1.ConnectionPooler, deployment *appsv1.Deployment) (sync bool, reasons []string) { + + reasons = []string{} + sync = false + + config := Config.OpConfig.ConnectionPooler + podTemplate := deployment.Spec.Template + poolerContainer := podTemplate.Spec.Containers[constants.ConnectionPoolerContainer] + + if spec == nil { + spec = &acidv1.ConnectionPooler{} + } + if spec.NumberOfInstances == nil && + *deployment.Spec.Replicas != *config.NumberOfInstances { + + sync = true + msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)", + *deployment.Spec.Replicas, *config.NumberOfInstances) + reasons = append(reasons, msg) + } + + if spec.DockerImage == "" && + poolerContainer.Image != config.Image { + + sync = true + msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", + poolerContainer.Image, config.Image) + reasons = append(reasons, msg) + } + + expectedResources, err := generateResourceRequirements(spec.Resources, + makeDefaultConnectionPoolerResources(&Config.OpConfig)) + + // An error to generate expected resources means something is not quite + // right, but for the purpose of robustness do not panic here, just report + // and ignore resources comparison (in the worst case there will be no + // updates for new resource values). + if err == nil && syncResources(&poolerContainer.Resources, expectedResources) { + sync = true + msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", + poolerContainer.Resources, expectedResources) + reasons = append(reasons, msg) + } + + if err != nil { + return false, reasons + } + + for _, env := range poolerContainer.Env { + if spec.User == "" && env.Name == "PGUSER" { + ref := env.ValueFrom.SecretKeyRef.LocalObjectReference + secretName := Config.OpConfig.SecretNameTemplate.Format( + "username", strings.Replace(config.User, "_", "-", -1), + "cluster", deployment.ClusterName, + "tprkind", acidv1.PostgresCRDResourceKind, + "tprgroup", acidzalando.GroupName) + + if ref.Name != secretName { + sync = true + msg := fmt.Sprintf("pooler user is different (having %s, required %s)", + ref.Name, config.User) + reasons = append(reasons, msg) + } + } + + if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema { + sync = true + msg := fmt.Sprintf("pooler schema is different (having %s, required %s)", + env.Value, config.Schema) + reasons = append(reasons, msg) + } + } + + return sync, reasons +} + +// Generate default resource section for connection pooler deployment, to be +// used if nothing custom is specified in the manifest +func makeDefaultConnectionPoolerResources(config *config.Config) acidv1.Resources { + + defaultRequests := acidv1.ResourceDescription{ + CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPURequest, + Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPULimit, + Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit, + } + + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } +} + +func logPoolerEssentials(log *logrus.Entry, oldSpec, newSpec *acidv1.Postgresql) { + var v []string + + var input []*bool + if oldSpec == nil { + input = []*bool{nil, nil, newSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler} + } else { + input = []*bool{oldSpec.Spec.EnableConnectionPooler, oldSpec.Spec.EnableReplicaConnectionPooler, newSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler} + } + + for _, b := range input { + if b == nil { + v = append(v, "nil") + } else { + v = append(v, fmt.Sprintf("%v", *b)) + } + } + + log.Debugf("syncing connection pooler from (%v, %v) to (%v, %v)", v[0], v[1], v[2], v[3]) +} + +func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, LookupFunction InstallFunction) (SyncReason, error) { + logPoolerEssentials(c.logger, oldSpec, newSpec) + + var reason SyncReason + var err error + var newNeedConnectionPooler, oldNeedConnectionPooler bool + oldNeedConnectionPooler = false + + // Check and perform the sync requirements for each of the roles. + for _, role := range [2]PostgresRole{Master, Replica} { + + if role == Master { + newNeedConnectionPooler = needMasterConnectionPoolerWorker(&newSpec.Spec) + if oldSpec != nil { + oldNeedConnectionPooler = needMasterConnectionPoolerWorker(&oldSpec.Spec) + } + } else { + newNeedConnectionPooler = needReplicaConnectionPoolerWorker(&newSpec.Spec) + if oldSpec != nil { + oldNeedConnectionPooler = needReplicaConnectionPoolerWorker(&oldSpec.Spec) + } + } + + // if the call is via createConnectionPooler, then it is required to initialize + // the structure + if c.ConnectionPooler == nil { + c.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{} + } + if c.ConnectionPooler[role] == nil { + c.ConnectionPooler[role] = &ConnectionPoolerObjects{ + Deployment: nil, + Service: nil, + Name: c.connectionPoolerName(role), + ClusterName: c.ClusterName, + Namespace: c.Namespace, + LookupFunction: false, + Role: role, + } + } + + if newNeedConnectionPooler { + // Try to sync in any case. If we didn't needed connection pooler before, + // it means we want to create it. If it was already present, still sync + // since it could happen that there is no difference in specs, and all + // the resources are remembered, but the deployment was manually deleted + // in between + + // in this case also do not forget to install lookup function as for + // creating cluster + if !oldNeedConnectionPooler || !c.ConnectionPooler[role].LookupFunction { + newConnectionPooler := newSpec.Spec.ConnectionPooler + + specSchema := "" + specUser := "" + + if newConnectionPooler != nil { + specSchema = newConnectionPooler.Schema + specUser = newConnectionPooler.User + } + + schema := util.Coalesce( + specSchema, + c.OpConfig.ConnectionPooler.Schema) + + user := util.Coalesce( + specUser, + c.OpConfig.ConnectionPooler.User) + + if err = LookupFunction(schema, user, role); err != nil { + return NoSync, err + } + } + + if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, role); err != nil { + c.logger.Errorf("could not sync connection pooler: %v", err) + return reason, err + } + } else { + // delete and cleanup resources if they are already detected + if c.ConnectionPooler[role] != nil && + (c.ConnectionPooler[role].Deployment != nil || + c.ConnectionPooler[role].Service != nil) { + + if err = c.deleteConnectionPooler(role); err != nil { + c.logger.Warningf("could not remove connection pooler: %v", err) + } + } + } + } + if !needMasterConnectionPoolerWorker(&newSpec.Spec) && + !needReplicaConnectionPoolerWorker(&newSpec.Spec) { + if err = c.deleteConnectionPoolerSecret(); err != nil { + c.logger.Warningf("could not remove connection pooler secret: %v", err) + } + } + + return reason, nil +} + +// Synchronize connection pooler resources. Effectively we're interested only in +// synchronizing the corresponding deployment, but in case of deployment or +// service is missing, create it. After checking, also remember an object for +// the future references. +func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) ( + SyncReason, error) { + + deployment, err := c.KubeClient. + Deployments(c.Namespace). + Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "deployment %s for connection pooler synchronization is not found, create it" + c.logger.Warningf(msg, c.connectionPoolerName(role)) + + deploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + if err != nil { + msg = "could not generate deployment for connection pooler: %v" + return NoSync, fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + + if err != nil { + return NoSync, err + } + c.ConnectionPooler[role].Deployment = deployment + } else if err != nil { + msg := "could not get connection pooler deployment to sync: %v" + return NoSync, fmt.Errorf(msg, err) + } else { + c.ConnectionPooler[role].Deployment = deployment + // actual synchronization + + var oldConnectionPooler *acidv1.ConnectionPooler + + if oldSpec != nil { + oldConnectionPooler = oldSpec.Spec.ConnectionPooler + } + + newConnectionPooler := newSpec.Spec.ConnectionPooler + // sync implementation below assumes that both old and new specs are + // not nil, but it can happen. To avoid any confusion like updating a + // deployment because the specification changed from nil to an empty + // struct (that was initialized somewhere before) replace any nil with + // an empty spec. + if oldConnectionPooler == nil { + oldConnectionPooler = &acidv1.ConnectionPooler{} + } + + if newConnectionPooler == nil { + newConnectionPooler = &acidv1.ConnectionPooler{} + } + + c.logger.Infof("old: %+v, new %+v", oldConnectionPooler, newConnectionPooler) + + var specSync bool + var specReason []string + + if oldSpec != nil { + specSync, specReason = needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) + } + + defaultsSync, defaultsReason := needSyncConnectionPoolerDefaults(&c.Config, newConnectionPooler, deployment) + reason := append(specReason, defaultsReason...) + + if specSync || defaultsSync { + c.logger.Infof("Update connection pooler deployment %s, reason: %+v", + c.connectionPoolerName(role), reason) + newDeploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + if err != nil { + msg := "could not generate deployment for connection pooler: %v" + return reason, fmt.Errorf(msg, err) + } + + deployment, err := updateConnectionPoolerDeployment(c.KubeClient, + newDeploymentSpec) + + if err != nil { + return reason, err + } + c.ConnectionPooler[role].Deployment = deployment + } + } + + newAnnotations := c.AnnotationsToPropagate(c.ConnectionPooler[role].Deployment.Annotations) + if newAnnotations != nil { + deployment, err = updateConnectionPoolerAnnotations(c.KubeClient, c.ConnectionPooler[role].Deployment, newAnnotations) + if err != nil { + return nil, err + } + c.ConnectionPooler[role].Deployment = deployment + } + + service, err := c.KubeClient. + Services(c.Namespace). + Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Service %s for connection pooler synchronization is not found, create it" + c.logger.Warningf(msg, c.connectionPoolerName(role)) + + serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role]) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) + + if err != nil { + return NoSync, err + } + c.ConnectionPooler[role].Service = service + + } else if err != nil { + msg := "could not get connection pooler service to sync: %v" + return NoSync, fmt.Errorf(msg, err) + } else { + // Service updates are not supported and probably not that useful anyway + c.ConnectionPooler[role].Service = service + } + + return NoSync, nil +} diff --git a/pkg/cluster/connection_pooler_new_test.go b/pkg/cluster/connection_pooler_new_test.go new file mode 100644 index 000000000..72b3408e3 --- /dev/null +++ b/pkg/cluster/connection_pooler_new_test.go @@ -0,0 +1,45 @@ +package cluster + +import ( + "testing" + + "context" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/labels" + + "k8s.io/client-go/kubernetes/fake" +) + +func TestFakeClient(t *testing.T) { + clientSet := fake.NewSimpleClientset() + namespace := "default" + + l := labels.Set(map[string]string{ + "application": "spilo", + }) + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-deployment1", + Namespace: namespace, + Labels: l, + }, + } + + clientSet.AppsV1().Deployments(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{}) + + deployment2, _ := clientSet.AppsV1().Deployments(namespace).Get(context.TODO(), "my-deployment1", metav1.GetOptions{}) + + if deployment.ObjectMeta.Name != deployment2.ObjectMeta.Name { + t.Errorf("Deployments are not equal") + } + + deployments, _ := clientSet.AppsV1().Deployments(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: "application=spilo"}) + + if len(deployments.Items) != 1 { + t.Errorf("Label search does not work") + } +} diff --git a/pkg/cluster/connection_pooler_test.go b/pkg/cluster/connection_pooler_test.go new file mode 100644 index 000000000..b795fe14f --- /dev/null +++ b/pkg/cluster/connection_pooler_test.go @@ -0,0 +1,956 @@ +package cluster + +import ( + "errors" + "fmt" + "strings" + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func mockInstallLookupFunction(schema string, user string, role PostgresRole) error { + return nil +} + +func boolToPointer(value bool) *bool { + return &value +} + +func int32ToPointer(value int32) *int32 { + return &value +} + +func TestConnectionPoolerCreationAndDeletion(t *testing.T) { + testName := "Test connection pooler creation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: int32ToPointer(1), + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + } + + reason, err := cluster.createConnectionPooler(mockInstallLookupFunction) + + if err != nil { + t.Errorf("%s: Cannot create connection pooler, %s, %+v", + testName, err, reason) + } + for _, role := range [2]PostgresRole{Master, Replica} { + if cluster.ConnectionPooler[role] != nil { + if cluster.ConnectionPooler[role].Deployment == nil { + t.Errorf("%s: Connection pooler deployment is empty for role %s", testName, role) + } + + if cluster.ConnectionPooler[role].Service == nil { + t.Errorf("%s: Connection pooler service is empty for role %s", testName, role) + } + } + } + oldSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + EnableReplicaConnectionPooler: boolToPointer(true), + }, + } + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(false), + EnableReplicaConnectionPooler: boolToPointer(false), + }, + } + + // Delete connection pooler via sync + _, err = cluster.syncConnectionPooler(oldSpec, newSpec, mockInstallLookupFunction) + if err != nil { + t.Errorf("%s: Cannot sync connection pooler, %s", testName, err) + } + + for _, role := range [2]PostgresRole{Master, Replica} { + err = cluster.deleteConnectionPooler(role) + if err != nil { + t.Errorf("%s: Cannot delete connection pooler, %s", testName, err) + } + } +} + +func TestNeedConnectionPooler(t *testing.T) { + testName := "Test how connection pooler can be enabled" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if !needMasterConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Connection pooler is not enabled with full definition", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + } + + if !needMasterConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Connection pooler is not enabled with flag", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(false), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if needMasterConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Connection pooler is still enabled with flag being false", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if !needMasterConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Connection pooler is not enabled with flag and full", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(false), + EnableReplicaConnectionPooler: boolToPointer(false), + ConnectionPooler: nil, + } + + if needMasterConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Connection pooler is enabled with flag false and nil", + testName) + } + + // Test for replica connection pooler + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if needReplicaConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Replica Connection pooler is not enabled with full definition", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + } + + if !needReplicaConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Replica Connection pooler is not enabled with flag", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(false), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if needReplicaConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Replica Connection pooler is still enabled with flag being false", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if !needReplicaConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Replica Connection pooler is not enabled with flag and full", + testName) + } +} + +func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error { + for _, role := range [2]PostgresRole{Master, Replica} { + if cluster.ConnectionPooler[role] != nil && cluster.ConnectionPooler[role].Deployment != nil && + (cluster.ConnectionPooler[role].Deployment.Spec.Replicas == nil || + *cluster.ConnectionPooler[role].Deployment.Spec.Replicas != 2) { + return fmt.Errorf("Wrong number of instances") + } + } + return nil +} + +func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { + if cluster.ConnectionPooler == nil { + return fmt.Errorf("Connection pooler resources are empty") + } + + for _, role := range []PostgresRole{Master, Replica} { + if cluster.ConnectionPooler[role].Deployment == nil { + return fmt.Errorf("Deployment was not saved %s", role) + } + + if cluster.ConnectionPooler[role].Service == nil { + return fmt.Errorf("Service was not saved %s", role) + } + } + + return nil +} + +func MasterobjectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { + if cluster.ConnectionPooler == nil { + return fmt.Errorf("Connection pooler resources are empty") + } + + if cluster.ConnectionPooler[Master].Deployment == nil { + return fmt.Errorf("Deployment was not saved") + } + + if cluster.ConnectionPooler[Master].Service == nil { + return fmt.Errorf("Service was not saved") + } + + return nil +} + +func ReplicaobjectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { + if cluster.ConnectionPooler == nil { + return fmt.Errorf("Connection pooler resources are empty") + } + + if cluster.ConnectionPooler[Replica].Deployment == nil { + return fmt.Errorf("Deployment was not saved") + } + + if cluster.ConnectionPooler[Replica].Service == nil { + return fmt.Errorf("Service was not saved") + } + + return nil +} + +func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error { + for _, role := range [2]PostgresRole{Master, Replica} { + if cluster.ConnectionPooler[role] != nil && + (cluster.ConnectionPooler[role].Deployment != nil || cluster.ConnectionPooler[role].Service != nil) { + return fmt.Errorf("Connection pooler was not deleted for role %v", role) + } + } + + return nil +} + +func OnlyMasterDeleted(cluster *Cluster, err error, reason SyncReason) error { + + if cluster.ConnectionPooler[Master] != nil && + (cluster.ConnectionPooler[Master].Deployment != nil || cluster.ConnectionPooler[Master].Service != nil) { + return fmt.Errorf("Connection pooler master was not deleted") + } + return nil +} + +func OnlyReplicaDeleted(cluster *Cluster, err error, reason SyncReason) error { + + if cluster.ConnectionPooler[Replica] != nil && + (cluster.ConnectionPooler[Replica].Deployment != nil || cluster.ConnectionPooler[Replica].Service != nil) { + return fmt.Errorf("Connection pooler replica was not deleted") + } + return nil +} + +func noEmptySync(cluster *Cluster, err error, reason SyncReason) error { + for _, msg := range reason { + if strings.HasPrefix(msg, "update [] from '' to '") { + return fmt.Errorf("There is an empty reason, %s", msg) + } + } + + return nil +} + +func TestConnectionPoolerSynchronization(t *testing.T) { + testName := "Test connection pooler synchronization" + newCluster := func(client k8sutil.KubernetesClient) *Cluster { + return New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: int32ToPointer(1), + }, + }, + }, client, acidv1.Postgresql{}, logger, eventRecorder) + } + cluster := newCluster(k8sutil.KubernetesClient{}) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + tests := []struct { + subTest string + oldSpec *acidv1.Postgresql + newSpec *acidv1.Postgresql + cluster *Cluster + defaultImage string + defaultInstances int32 + check func(cluster *Cluster, err error, reason SyncReason) error + }{ + { + subTest: "create from scratch", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: newCluster(k8sutil.ClientMissingObjects()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: MasterobjectsAreSaved, + }, + { + subTest: "create if doesn't exist", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: newCluster(k8sutil.ClientMissingObjects()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: MasterobjectsAreSaved, + }, + { + subTest: "create if doesn't exist with a flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + }, + }, + cluster: newCluster(k8sutil.ClientMissingObjects()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: MasterobjectsAreSaved, + }, + { + subTest: "create no replica with flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(false), + }, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreDeleted, + }, + { + subTest: "create replica if doesn't exist with a flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: ReplicaobjectsAreSaved, + }, + { + subTest: "create both master and replica", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + EnableConnectionPooler: boolToPointer(true), + }, + }, + cluster: newCluster(k8sutil.ClientMissingObjects()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreSaved, + }, + { + subTest: "delete only replica if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: OnlyReplicaDeleted, + }, + { + subTest: "delete only master if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableConnectionPooler: boolToPointer(true), + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + }, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: OnlyMasterDeleted, + }, + { + subTest: "delete if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreDeleted, + }, + { + subTest: "cleanup if still there", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreDeleted, + }, + { + subTest: "update deployment", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{ + NumberOfInstances: int32ToPointer(1), + }, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{ + NumberOfInstances: int32ToPointer(2), + }, + }, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: deploymentUpdated, + }, + { + subTest: "update deployment", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{ + NumberOfInstances: int32ToPointer(1), + }, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{ + NumberOfInstances: int32ToPointer(2), + }, + }, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: deploymentUpdated, + }, + { + subTest: "update image from changed defaults", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:2.0", + defaultInstances: 2, + check: deploymentUpdated, + }, + { + subTest: "there is no sync from nil to an empty spec", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + ConnectionPooler: nil, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: newCluster(k8sutil.NewMockKubernetesClient()), + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: noEmptySync, + }, + } + for _, tt := range tests { + tt.cluster.OpConfig.ConnectionPooler.Image = tt.defaultImage + tt.cluster.OpConfig.ConnectionPooler.NumberOfInstances = + int32ToPointer(tt.defaultInstances) + + reason, err := tt.cluster.syncConnectionPooler(tt.oldSpec, + tt.newSpec, mockInstallLookupFunction) + + if err := tt.check(tt.cluster, err, reason); err != nil { + t.Errorf("%s [%s]: Could not synchronize, %+v", + testName, tt.subTest, err) + } + } +} + +func TestConnectionPoolerPodSpec(t *testing.T) { + testName := "Test connection pooler pod template generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + MaxDBConnections: int32ToPointer(60), + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + } + var clusterNoDefaultRes = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{}, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + + clusterNoDefaultRes.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + } + + noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { return nil } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "no default resources", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), + cluster: clusterNoDefaultRes, + check: noCheck, + }, + { + subTest: "default resources are set", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + expected: nil, + cluster: cluster, + check: testResources, + }, + { + subTest: "labels for service", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + expected: nil, + cluster: cluster, + check: testLabels, + }, + { + subTest: "required envs", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + expected: nil, + cluster: cluster, + check: testEnvs, + }, + } + for _, role := range [2]PostgresRole{Master, Replica} { + for _, tt := range tests { + podSpec, err := tt.cluster.generateConnectionPoolerPodTemplate(role) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, podSpec, role) + if err != nil { + t.Errorf("%s [%s]: Pod spec is incorrect, %+v", + testName, tt.subTest, err) + } + } + } +} + +func TestConnectionPoolerDeploymentSpec(t *testing.T) { + testName := "Test connection pooler deployment spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{ + Master: { + Deployment: nil, + Service: nil, + LookupFunction: true, + Name: "", + Role: Master, + }, + } + + noCheck := func(cluster *Cluster, deployment *appsv1.Deployment) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, deployment *appsv1.Deployment) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + expected: nil, + cluster: cluster, + check: testDeploymentOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + expected: nil, + cluster: cluster, + check: testSelector, + }, + } + for _, tt := range tests { + deployment, err := tt.cluster.generateConnectionPoolerDeployment(cluster.ConnectionPooler[Master]) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, deployment) + if err != nil { + t.Errorf("%s [%s]: Deployment spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { + cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"] + if cpuReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest { + return fmt.Errorf("CPU request doesn't match, got %s, expected %s", + cpuReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest) + } + + memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"] + if memReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest { + return fmt.Errorf("Memory request doesn't match, got %s, expected %s", + memReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest) + } + + cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"] + if cpuLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit { + return fmt.Errorf("CPU limit doesn't match, got %s, expected %s", + cpuLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit) + } + + memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"] + if memLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit { + return fmt.Errorf("Memory limit doesn't match, got %s, expected %s", + memLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit) + } + + return nil +} + +func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { + poolerLabels := podSpec.ObjectMeta.Labels["connection-pooler"] + + if poolerLabels != cluster.connectionPoolerLabelsSelector(role).MatchLabels["connection-pooler"] { + return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", + podSpec.ObjectMeta.Labels, cluster.connectionPoolerLabelsSelector(role).MatchLabels) + } + + return nil +} + +func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { + labels := deployment.Spec.Selector.MatchLabels + expected := cluster.connectionPoolerLabelsSelector(Master).MatchLabels + + if labels["connection-pooler"] != expected["connection-pooler"] { + return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", + labels, expected) + } + + return nil +} + +func testServiceSelector(cluster *Cluster, service *v1.Service, role PostgresRole) error { + selector := service.Spec.Selector + + if selector["connection-pooler"] != cluster.connectionPoolerName(role) { + return fmt.Errorf("Selector is incorrect, got %s, expected %s", + selector["connection-pooler"], cluster.connectionPoolerName(role)) + } + + return nil +} + +func TestConnectionPoolerServiceSpec(t *testing.T) { + testName := "Test connection pooler service spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{ + Master: { + Deployment: nil, + Service: nil, + LookupFunction: false, + Role: Master, + }, + Replica: { + Deployment: nil, + Service: nil, + LookupFunction: false, + Role: Replica, + }, + } + + noCheck := func(cluster *Cluster, deployment *v1.Service, role PostgresRole) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + cluster *Cluster + check func(cluster *Cluster, deployment *v1.Service, role PostgresRole) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + cluster: cluster, + check: testServiceOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + cluster: cluster, + check: testServiceSelector, + }, + } + for _, role := range [2]PostgresRole{Master, Replica} { + for _, tt := range tests { + service := tt.cluster.generateConnectionPoolerService(tt.cluster.ConnectionPooler[role]) + + if err := tt.check(cluster, service, role); err != nil { + t.Errorf("%s [%s]: Service spec is incorrect, %+v", + testName, tt.subTest, err) + } + } + } +} diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 5e05f443a..760b68d72 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -473,8 +473,8 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi } // Creates a connection pool credentials lookup function in every database to -// perform remote authentification. -func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { +// perform remote authentication. +func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role PostgresRole) error { var stmtBytes bytes.Buffer c.logger.Info("Installing lookup function") @@ -567,12 +567,11 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { failedDatabases = append(failedDatabases, dbname) continue } - c.logger.Infof("pooler lookup function installed into %s", dbname) } if len(failedDatabases) == 0 { - c.ConnectionPooler.LookupFunction = true + c.ConnectionPooler[role].LookupFunction = true } return nil diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 2ca4ad4a8..50957e22a 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -75,10 +75,6 @@ func (c *Cluster) statefulSetName() string { return c.Name } -func (c *Cluster) connectionPoolerName() string { - return c.Name + "-pooler" -} - func (c *Cluster) endpointName(role PostgresRole) string { name := c.Name if role == Replica { @@ -142,26 +138,6 @@ func (c *Cluster) makeDefaultResources() acidv1.Resources { } } -// Generate default resource section for connection pooler deployment, to be -// used if nothing custom is specified in the manifest -func (c *Cluster) makeDefaultConnectionPoolerResources() acidv1.Resources { - config := c.OpConfig - - defaultRequests := acidv1.ResourceDescription{ - CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPURequest, - Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest, - } - defaultLimits := acidv1.ResourceDescription{ - CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPULimit, - Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit, - } - - return acidv1.Resources{ - ResourceRequests: defaultRequests, - ResourceLimits: defaultLimits, - } -} - func generateResourceRequirements(resources acidv1.Resources, defaultResources acidv1.Resources) (*v1.ResourceRequirements, error) { var err error @@ -2052,186 +2028,6 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) { return "logical-backup-" + c.clusterName().Name } -// Generate pool size related environment variables. -// -// MAX_DB_CONN would specify the global maximum for connections to a target -// database. -// -// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough. -// -// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when -// most of the queries coming through a connection pooler are from the same -// user to the same db). In case if we want to spin up more connection pooler -// instances, take this into account and maintain the same number of -// connections. -// -// MIN_SIZE is a pool's minimal size, to prevent situation when sudden workload -// have to wait for spinning up a new connections. -// -// RESERVE_SIZE is how many additional connections to allow for a pooler. -func (c *Cluster) getConnectionPoolerEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar { - effectiveMode := util.Coalesce( - spec.ConnectionPooler.Mode, - c.OpConfig.ConnectionPooler.Mode) - - numberOfInstances := spec.ConnectionPooler.NumberOfInstances - if numberOfInstances == nil { - numberOfInstances = util.CoalesceInt32( - c.OpConfig.ConnectionPooler.NumberOfInstances, - k8sutil.Int32ToPointer(1)) - } - - effectiveMaxDBConn := util.CoalesceInt32( - spec.ConnectionPooler.MaxDBConnections, - c.OpConfig.ConnectionPooler.MaxDBConnections) - - if effectiveMaxDBConn == nil { - effectiveMaxDBConn = k8sutil.Int32ToPointer( - constants.ConnectionPoolerMaxDBConnections) - } - - maxDBConn := *effectiveMaxDBConn / *numberOfInstances - - defaultSize := maxDBConn / 2 - minSize := defaultSize / 2 - reserveSize := minSize - - return []v1.EnvVar{ - { - Name: "CONNECTION_POOLER_PORT", - Value: fmt.Sprint(pgPort), - }, - { - Name: "CONNECTION_POOLER_MODE", - Value: effectiveMode, - }, - { - Name: "CONNECTION_POOLER_DEFAULT_SIZE", - Value: fmt.Sprint(defaultSize), - }, - { - Name: "CONNECTION_POOLER_MIN_SIZE", - Value: fmt.Sprint(minSize), - }, - { - Name: "CONNECTION_POOLER_RESERVE_SIZE", - Value: fmt.Sprint(reserveSize), - }, - { - Name: "CONNECTION_POOLER_MAX_CLIENT_CONN", - Value: fmt.Sprint(constants.ConnectionPoolerMaxClientConnections), - }, - { - Name: "CONNECTION_POOLER_MAX_DB_CONN", - Value: fmt.Sprint(maxDBConn), - }, - } -} - -func (c *Cluster) generateConnectionPoolerPodTemplate(spec *acidv1.PostgresSpec) ( - *v1.PodTemplateSpec, error) { - - gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) - resources, err := generateResourceRequirements( - spec.ConnectionPooler.Resources, - c.makeDefaultConnectionPoolerResources()) - - effectiveDockerImage := util.Coalesce( - spec.ConnectionPooler.DockerImage, - c.OpConfig.ConnectionPooler.Image) - - effectiveSchema := util.Coalesce( - spec.ConnectionPooler.Schema, - c.OpConfig.ConnectionPooler.Schema) - - if err != nil { - return nil, fmt.Errorf("could not generate resource requirements: %v", err) - } - - secretSelector := func(key string) *v1.SecretKeySelector { - effectiveUser := util.Coalesce( - spec.ConnectionPooler.User, - c.OpConfig.ConnectionPooler.User) - - return &v1.SecretKeySelector{ - LocalObjectReference: v1.LocalObjectReference{ - Name: c.credentialSecretName(effectiveUser), - }, - Key: key, - } - } - - envVars := []v1.EnvVar{ - { - Name: "PGHOST", - Value: c.serviceAddress(Master), - }, - { - Name: "PGPORT", - Value: c.servicePort(Master), - }, - { - Name: "PGUSER", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("username"), - }, - }, - // the convention is to use the same schema name as - // connection pooler username - { - Name: "PGSCHEMA", - Value: effectiveSchema, - }, - { - Name: "PGPASSWORD", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("password"), - }, - }, - } - - envVars = append(envVars, c.getConnectionPoolerEnvVars(spec)...) - - poolerContainer := v1.Container{ - Name: connectionPoolerContainer, - Image: effectiveDockerImage, - ImagePullPolicy: v1.PullIfNotPresent, - Resources: *resources, - Ports: []v1.ContainerPort{ - { - ContainerPort: pgPort, - Protocol: v1.ProtocolTCP, - }, - }, - Env: envVars, - ReadinessProbe: &v1.Probe{ - Handler: v1.Handler{ - TCPSocket: &v1.TCPSocketAction{ - Port: intstr.IntOrString{IntVal: pgPort}, - }, - }, - }, - } - - podTemplate := &v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: c.connectionPoolerLabelsSelector().MatchLabels, - Namespace: c.Namespace, - Annotations: c.generatePodAnnotations(spec), - }, - Spec: v1.PodSpec{ - ServiceAccountName: c.OpConfig.PodServiceAccountName, - TerminationGracePeriodSeconds: &gracePeriod, - Containers: []v1.Container{poolerContainer}, - // TODO: add tolerations to scheduler pooler on the same node - // as database - //Tolerations: *tolerationsSpec, - }, - } - - return podTemplate, nil -} - // Return an array of ownerReferences to make an arbitraty object dependent on // the StatefulSet. Dependency is made on StatefulSet instead of PostgreSQL CRD // while the former is represent the actual state, and only it's deletion means @@ -2257,108 +2053,6 @@ func (c *Cluster) ownerReferences() []metav1.OwnerReference { } } -func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec) ( - *appsv1.Deployment, error) { - - // there are two ways to enable connection pooler, either to specify a - // connectionPooler section or enableConnectionPooler. In the second case - // spec.connectionPooler will be nil, so to make it easier to calculate - // default values, initialize it to an empty structure. It could be done - // anywhere, but here is the earliest common entry point between sync and - // create code, so init here. - if spec.ConnectionPooler == nil { - spec.ConnectionPooler = &acidv1.ConnectionPooler{} - } - - podTemplate, err := c.generateConnectionPoolerPodTemplate(spec) - numberOfInstances := spec.ConnectionPooler.NumberOfInstances - if numberOfInstances == nil { - numberOfInstances = util.CoalesceInt32( - c.OpConfig.ConnectionPooler.NumberOfInstances, - k8sutil.Int32ToPointer(1)) - } - - if *numberOfInstances < constants.ConnectionPoolerMinInstances { - msg := "Adjusted number of connection pooler instances from %d to %d" - c.logger.Warningf(msg, numberOfInstances, constants.ConnectionPoolerMinInstances) - - *numberOfInstances = constants.ConnectionPoolerMinInstances - } - - if err != nil { - return nil, err - } - - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: c.connectionPoolerName(), - Namespace: c.Namespace, - Labels: c.connectionPoolerLabelsSelector().MatchLabels, - Annotations: map[string]string{}, - // make StatefulSet object its owner to represent the dependency. - // By itself StatefulSet is being deleted with "Orphaned" - // propagation policy, which means that it's deletion will not - // clean up this deployment, but there is a hope that this object - // will be garbage collected if something went wrong and operator - // didn't deleted it. - OwnerReferences: c.ownerReferences(), - }, - Spec: appsv1.DeploymentSpec{ - Replicas: numberOfInstances, - Selector: c.connectionPoolerLabelsSelector(), - Template: *podTemplate, - }, - } - - return deployment, nil -} - -func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service { - - // there are two ways to enable connection pooler, either to specify a - // connectionPooler section or enableConnectionPooler. In the second case - // spec.connectionPooler will be nil, so to make it easier to calculate - // default values, initialize it to an empty structure. It could be done - // anywhere, but here is the earliest common entry point between sync and - // create code, so init here. - if spec.ConnectionPooler == nil { - spec.ConnectionPooler = &acidv1.ConnectionPooler{} - } - - serviceSpec := v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Name: c.connectionPoolerName(), - Port: pgPort, - TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, - }, - }, - Type: v1.ServiceTypeClusterIP, - Selector: map[string]string{ - "connection-pooler": c.connectionPoolerName(), - }, - } - - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: c.connectionPoolerName(), - Namespace: c.Namespace, - Labels: c.connectionPoolerLabelsSelector().MatchLabels, - Annotations: map[string]string{}, - // make StatefulSet object its owner to represent the dependency. - // By itself StatefulSet is being deleted with "Orphaned" - // propagation policy, which means that it's deletion will not - // clean up this service, but there is a hope that this object will - // be garbage collected if something went wrong and operator didn't - // deleted it. - OwnerReferences: c.ownerReferences(), - }, - Spec: serviceSpec, - } - - return service -} - func ensurePath(file string, defaultDir string, defaultFile string) string { if file == "" { return path.Join(defaultDir, defaultFile) diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index f1c0e968b..52c10cb8b 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2,7 +2,6 @@ package cluster import ( "context" - "errors" "fmt" "reflect" "sort" @@ -840,46 +839,7 @@ func TestPodEnvironmentSecretVariables(t *testing.T) { } -func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { - cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"] - if cpuReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest { - return fmt.Errorf("CPU request doesn't match, got %s, expected %s", - cpuReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest) - } - - memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"] - if memReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest { - return fmt.Errorf("Memory request doesn't match, got %s, expected %s", - memReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest) - } - - cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"] - if cpuLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit { - return fmt.Errorf("CPU limit doesn't match, got %s, expected %s", - cpuLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit) - } - - memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"] - if memLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit { - return fmt.Errorf("Memory limit doesn't match, got %s, expected %s", - memLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit) - } - - return nil -} - -func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { - poolerLabels := podSpec.ObjectMeta.Labels["connection-pooler"] - - if poolerLabels != cluster.connectionPoolerLabelsSelector().MatchLabels["connection-pooler"] { - return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", - podSpec.ObjectMeta.Labels, cluster.connectionPoolerLabelsSelector().MatchLabels) - } - - return nil -} - -func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { +func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { required := map[string]bool{ "PGHOST": false, "PGPORT": false, @@ -913,109 +873,6 @@ func testCustomPodTemplate(cluster *Cluster, podSpec *v1.PodTemplateSpec) error return nil } -func TestConnectionPoolerPodSpec(t *testing.T) { - testName := "Test connection pooler pod template generation" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - MaxDBConnections: int32ToPointer(60), - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - - var clusterNoDefaultRes = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{}, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - - noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } - - tests := []struct { - subTest string - spec *acidv1.PostgresSpec - expected error - cluster *Cluster - check func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error - }{ - { - subTest: "default configuration", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: noCheck, - }, - { - subTest: "no default resources", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), - cluster: clusterNoDefaultRes, - check: noCheck, - }, - { - subTest: "default resources are set", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: testResources, - }, - { - subTest: "labels for service", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: testLabels, - }, - { - subTest: "required envs", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: testEnvs, - }, - } - for _, tt := range tests { - podSpec, err := tt.cluster.generateConnectionPoolerPodTemplate(tt.spec) - - if err != tt.expected && err.Error() != tt.expected.Error() { - t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", - testName, tt.subTest, err, tt.expected) - } - - err = tt.check(cluster, podSpec) - if err != nil { - t.Errorf("%s [%s]: Pod spec is incorrect, %+v", - testName, tt.subTest, err) - } - } -} - func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployment) error { owner := deployment.ObjectMeta.OwnerReferences[0] @@ -1027,98 +884,7 @@ func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployme return nil } -func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { - labels := deployment.Spec.Selector.MatchLabels - expected := cluster.connectionPoolerLabelsSelector().MatchLabels - - if labels["connection-pooler"] != expected["connection-pooler"] { - return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", - labels, expected) - } - - return nil -} - -func TestConnectionPoolerDeploymentSpec(t *testing.T) { - testName := "Test connection pooler deployment spec generation" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - cluster.Statefulset = &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sts", - }, - } - - noCheck := func(cluster *Cluster, deployment *appsv1.Deployment) error { - return nil - } - - tests := []struct { - subTest string - spec *acidv1.PostgresSpec - expected error - cluster *Cluster - check func(cluster *Cluster, deployment *appsv1.Deployment) error - }{ - { - subTest: "default configuration", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: noCheck, - }, - { - subTest: "owner reference", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: testDeploymentOwnwerReference, - }, - { - subTest: "selector", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: testSelector, - }, - } - for _, tt := range tests { - deployment, err := tt.cluster.generateConnectionPoolerDeployment(tt.spec) - - if err != tt.expected && err.Error() != tt.expected.Error() { - t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", - testName, tt.subTest, err, tt.expected) - } - - err = tt.check(cluster, deployment) - if err != nil { - t.Errorf("%s [%s]: Deployment spec is incorrect, %+v", - testName, tt.subTest, err) - } - } -} - -func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error { +func testServiceOwnwerReference(cluster *Cluster, service *v1.Service, role PostgresRole) error { owner := service.ObjectMeta.OwnerReferences[0] if owner.Name != cluster.Statefulset.ObjectMeta.Name { @@ -1129,86 +895,6 @@ func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error { return nil } -func testServiceSelector(cluster *Cluster, service *v1.Service) error { - selector := service.Spec.Selector - - if selector["connection-pooler"] != cluster.connectionPoolerName() { - return fmt.Errorf("Selector is incorrect, got %s, expected %s", - selector["connection-pooler"], cluster.connectionPoolerName()) - } - - return nil -} - -func TestConnectionPoolerServiceSpec(t *testing.T) { - testName := "Test connection pooler service spec generation" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - cluster.Statefulset = &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sts", - }, - } - - noCheck := func(cluster *Cluster, deployment *v1.Service) error { - return nil - } - - tests := []struct { - subTest string - spec *acidv1.PostgresSpec - cluster *Cluster - check func(cluster *Cluster, deployment *v1.Service) error - }{ - { - subTest: "default configuration", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - cluster: cluster, - check: noCheck, - }, - { - subTest: "owner reference", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - cluster: cluster, - check: testServiceOwnwerReference, - }, - { - subTest: "selector", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - cluster: cluster, - check: testServiceSelector, - }, - } - for _, tt := range tests { - service := tt.cluster.generateConnectionPoolerService(tt.spec) - - if err := tt.check(cluster, service); err != nil { - t.Errorf("%s [%s]: Service spec is incorrect, %+v", - testName, tt.subTest, err) - } - } -} - func TestTLS(t *testing.T) { var err error var spec acidv1.PostgresSpec diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 4fb2c13c6..bcc568adc 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -94,150 +94,6 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) { return statefulSet, nil } -// Prepare the database for connection pooler to be used, i.e. install lookup -// function (do it first, because it should be fast and if it didn't succeed, -// it doesn't makes sense to create more K8S objects. At this moment we assume -// that necessary connection pooler user exists. -// -// After that create all the objects for connection pooler, namely a deployment -// with a chosen pooler and a service to expose it. -func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoolerObjects, error) { - var msg string - c.setProcessName("creating connection pooler") - - if c.ConnectionPooler == nil { - c.ConnectionPooler = &ConnectionPoolerObjects{} - } - - schema := c.Spec.ConnectionPooler.Schema - - if schema == "" { - schema = c.OpConfig.ConnectionPooler.Schema - } - - user := c.Spec.ConnectionPooler.User - if user == "" { - user = c.OpConfig.ConnectionPooler.User - } - - err := lookup(schema, user) - - if err != nil { - msg = "could not prepare database for connection pooler: %v" - return nil, fmt.Errorf(msg, err) - } - - deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec) - if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return nil, fmt.Errorf(msg, err) - } - - // client-go does retry 10 times (with NoBackoff by default) when the API - // believe a request can be retried and returns Retry-After header. This - // should be good enough to not think about it here. - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - - serviceSpec := c.generateConnectionPoolerService(&c.Spec) - service, err := c.KubeClient. - Services(serviceSpec.Namespace). - Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - - c.ConnectionPooler = &ConnectionPoolerObjects{ - Deployment: deployment, - Service: service, - } - c.logger.Debugf("created new connection pooler %q, uid: %q", - util.NameFromMeta(deployment.ObjectMeta), deployment.UID) - - return c.ConnectionPooler, nil -} - -func (c *Cluster) deleteConnectionPooler() (err error) { - c.setProcessName("deleting connection pooler") - c.logger.Debugln("deleting connection pooler") - - // Lack of connection pooler objects is not a fatal error, just log it if - // it was present before in the manifest - if c.ConnectionPooler == nil { - c.logger.Infof("No connection pooler to delete") - return nil - } - - // Clean up the deployment object. If deployment resource we've remembered - // is somehow empty, try to delete based on what would we generate - deploymentName := c.connectionPoolerName() - deployment := c.ConnectionPooler.Deployment - - if deployment != nil { - deploymentName = deployment.Name - } - - // set delete propagation policy to foreground, so that replica set will be - // also deleted. - policy := metav1.DeletePropagationForeground - options := metav1.DeleteOptions{PropagationPolicy: &policy} - err = c.KubeClient. - Deployments(c.Namespace). - Delete(context.TODO(), deploymentName, options) - - if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("Connection pooler deployment was already deleted") - } else if err != nil { - return fmt.Errorf("could not delete deployment: %v", err) - } - - c.logger.Infof("Connection pooler deployment %q has been deleted", deploymentName) - - // Repeat the same for the service object - service := c.ConnectionPooler.Service - serviceName := c.connectionPoolerName() - - if service != nil { - serviceName = service.Name - } - - err = c.KubeClient. - Services(c.Namespace). - Delete(context.TODO(), serviceName, options) - - if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("Connection pooler service was already deleted") - } else if err != nil { - return fmt.Errorf("could not delete service: %v", err) - } - - c.logger.Infof("Connection pooler service %q has been deleted", serviceName) - - // Repeat the same for the secret object - secretName := c.credentialSecretName(c.OpConfig.ConnectionPooler.User) - - secret, err := c.KubeClient. - Secrets(c.Namespace). - Get(context.TODO(), secretName, metav1.GetOptions{}) - - if err != nil { - c.logger.Debugf("could not get connection pooler secret %q: %v", secretName, err) - } else { - if err = c.deleteSecret(secret.UID, *secret); err != nil { - return fmt.Errorf("could not delete pooler secret: %v", err) - } - } - - c.ConnectionPooler = nil - return nil -} - func getPodIndex(podName string) (int32, error) { parts := strings.Split(podName, "-") if len(parts) == 0 { @@ -852,57 +708,3 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { return c.PodDisruptionBudget } - -// Perform actual patching of a connection pooler deployment, assuming that all -// the check were already done before. -func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { - c.setProcessName("updating connection pooler") - if c.ConnectionPooler == nil || c.ConnectionPooler.Deployment == nil { - return nil, fmt.Errorf("there is no connection pooler in the cluster") - } - - patchData, err := specPatch(newDeployment.Spec) - if err != nil { - return nil, fmt.Errorf("could not form patch for the deployment: %v", err) - } - - // An update probably requires RetryOnConflict, but since only one operator - // worker at one time will try to update it chances of conflicts are - // minimal. - deployment, err := c.KubeClient. - Deployments(c.ConnectionPooler.Deployment.Namespace).Patch( - context.TODO(), - c.ConnectionPooler.Deployment.Name, - types.MergePatchType, - patchData, - metav1.PatchOptions{}, - "") - if err != nil { - return nil, fmt.Errorf("could not patch deployment: %v", err) - } - - c.ConnectionPooler.Deployment = deployment - - return deployment, nil -} - -//updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment -func (c *Cluster) updateConnectionPoolerAnnotations(annotations map[string]string) (*appsv1.Deployment, error) { - c.logger.Debugf("updating connection pooler annotations") - patchData, err := metaAnnotationsPatch(annotations) - if err != nil { - return nil, fmt.Errorf("could not form patch for the deployment metadata: %v", err) - } - result, err := c.KubeClient.Deployments(c.ConnectionPooler.Deployment.Namespace).Patch( - context.TODO(), - c.ConnectionPooler.Deployment.Name, - types.MergePatchType, - []byte(patchData), - metav1.PatchOptions{}, - "") - if err != nil { - return nil, fmt.Errorf("could not patch connection pooler annotations %q: %v", patchData, err) - } - return result, nil - -} diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go deleted file mode 100644 index 9739cc354..000000000 --- a/pkg/cluster/resources_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package cluster - -import ( - "testing" - - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - "github.com/zalando/postgres-operator/pkg/util/config" - "github.com/zalando/postgres-operator/pkg/util/k8sutil" - - appsv1 "k8s.io/api/apps/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func mockInstallLookupFunction(schema string, user string) error { - return nil -} - -func boolToPointer(value bool) *bool { - return &value -} - -func TestConnectionPoolerCreationAndDeletion(t *testing.T) { - testName := "Test connection pooler creation" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) - - cluster.Statefulset = &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sts", - }, - } - - cluster.Spec = acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - poolerResources, err := cluster.createConnectionPooler(mockInstallLookupFunction) - - if err != nil { - t.Errorf("%s: Cannot create connection pooler, %s, %+v", - testName, err, poolerResources) - } - - if poolerResources.Deployment == nil { - t.Errorf("%s: Connection pooler deployment is empty", testName) - } - - if poolerResources.Service == nil { - t.Errorf("%s: Connection pooler service is empty", testName) - } - - err = cluster.deleteConnectionPooler() - if err != nil { - t.Errorf("%s: Cannot delete connection pooler, %s", testName, err) - } -} - -func TestNeedConnectionPooler(t *testing.T) { - testName := "Test how connection pooler can be enabled" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) - - cluster.Spec = acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - - if !cluster.needConnectionPooler() { - t.Errorf("%s: Connection pooler is not enabled with full definition", - testName) - } - - cluster.Spec = acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - } - - if !cluster.needConnectionPooler() { - t.Errorf("%s: Connection pooler is not enabled with flag", - testName) - } - - cluster.Spec = acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(false), - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - - if cluster.needConnectionPooler() { - t.Errorf("%s: Connection pooler is still enabled with flag being false", - testName) - } - - cluster.Spec = acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - - if !cluster.needConnectionPooler() { - t.Errorf("%s: Connection pooler is not enabled with flag and full", - testName) - } -} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 61be7919d..e91adf757 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -43,15 +43,12 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return err } - c.logger.Debugf("syncing secrets") - //TODO: mind the secrets of the deleted/new users if err = c.syncSecrets(); err != nil { err = fmt.Errorf("could not sync secrets: %v", err) return err } - c.logger.Debugf("syncing services") if err = c.syncServices(); err != nil { err = fmt.Errorf("could not sync services: %v", err) return err @@ -469,6 +466,7 @@ func (c *Cluster) syncSecrets() error { err error secret *v1.Secret ) + c.logger.Info("syncing secrets") c.setProcessName("syncing secrets") secrets := c.generateUserSecrets() @@ -547,7 +545,7 @@ func (c *Cluster) syncRoles() (err error) { userNames = append(userNames, u.Name) } - if c.needConnectionPooler() { + if needMasterConnectionPooler(&c.Spec) || needReplicaConnectionPooler(&c.Spec) { connectionPoolerUser := c.systemUsers[constants.ConnectionPoolerUserKeyName] userNames = append(userNames, connectionPoolerUser.Name) @@ -825,203 +823,3 @@ func (c *Cluster) syncLogicalBackupJob() error { return nil } - -func (c *Cluster) syncConnectionPooler(oldSpec, - newSpec *acidv1.Postgresql, - lookup InstallFunction) (SyncReason, error) { - - var reason SyncReason - var err error - - if c.ConnectionPooler == nil { - c.ConnectionPooler = &ConnectionPoolerObjects{ - LookupFunction: false, - } - } - - newNeedConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec) - oldNeedConnectionPooler := c.needConnectionPoolerWorker(&oldSpec.Spec) - - if newNeedConnectionPooler { - // Try to sync in any case. If we didn't needed connection pooler before, - // it means we want to create it. If it was already present, still sync - // since it could happen that there is no difference in specs, and all - // the resources are remembered, but the deployment was manually deleted - // in between - c.logger.Debug("syncing connection pooler") - - // in this case also do not forget to install lookup function as for - // creating cluster - if !oldNeedConnectionPooler || !c.ConnectionPooler.LookupFunction { - newConnectionPooler := newSpec.Spec.ConnectionPooler - - specSchema := "" - specUser := "" - - if newConnectionPooler != nil { - specSchema = newConnectionPooler.Schema - specUser = newConnectionPooler.User - } - - schema := util.Coalesce( - specSchema, - c.OpConfig.ConnectionPooler.Schema) - - user := util.Coalesce( - specUser, - c.OpConfig.ConnectionPooler.User) - - if err = lookup(schema, user); err != nil { - return NoSync, err - } - } else { - // Lookup function installation seems to be a fragile point, so - // let's log for debugging if we skip it - msg := "Skip lookup function installation, old: %d, already installed %d" - c.logger.Debug(msg, oldNeedConnectionPooler, c.ConnectionPooler.LookupFunction) - } - - if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil { - c.logger.Errorf("could not sync connection pooler: %v", err) - return reason, err - } - } - - if oldNeedConnectionPooler && !newNeedConnectionPooler { - // delete and cleanup resources - if err = c.deleteConnectionPooler(); err != nil { - c.logger.Warningf("could not remove connection pooler: %v", err) - } - } - - if !oldNeedConnectionPooler && !newNeedConnectionPooler { - // delete and cleanup resources if not empty - if c.ConnectionPooler != nil && - (c.ConnectionPooler.Deployment != nil || - c.ConnectionPooler.Service != nil) { - - if err = c.deleteConnectionPooler(); err != nil { - c.logger.Warningf("could not remove connection pooler: %v", err) - } - } - } - - return reason, nil -} - -// Synchronize connection pooler resources. Effectively we're interested only in -// synchronizing the corresponding deployment, but in case of deployment or -// service is missing, create it. After checking, also remember an object for -// the future references. -func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) ( - SyncReason, error) { - - deployment, err := c.KubeClient. - Deployments(c.Namespace). - Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{}) - - if err != nil && k8sutil.ResourceNotFound(err) { - msg := "Deployment %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName()) - - deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec) - if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return NoSync, fmt.Errorf(msg, err) - } - - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) - - if err != nil { - return NoSync, err - } - - c.ConnectionPooler.Deployment = deployment - } else if err != nil { - msg := "could not get connection pooler deployment to sync: %v" - return NoSync, fmt.Errorf(msg, err) - } else { - c.ConnectionPooler.Deployment = deployment - - // actual synchronization - oldConnectionPooler := oldSpec.Spec.ConnectionPooler - newConnectionPooler := newSpec.Spec.ConnectionPooler - - // sync implementation below assumes that both old and new specs are - // not nil, but it can happen. To avoid any confusion like updating a - // deployment because the specification changed from nil to an empty - // struct (that was initialized somewhere before) replace any nil with - // an empty spec. - if oldConnectionPooler == nil { - oldConnectionPooler = &acidv1.ConnectionPooler{} - } - - if newConnectionPooler == nil { - newConnectionPooler = &acidv1.ConnectionPooler{} - } - - logNiceDiff(c.logger, oldConnectionPooler, newConnectionPooler) - - specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) - defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment) - reason := append(specReason, defaultsReason...) - if specSync || defaultsSync { - c.logger.Infof("Update connection pooler deployment %s, reason: %+v", - c.connectionPoolerName(), reason) - - newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec) - if err != nil { - msg := "could not generate deployment for connection pooler: %v" - return reason, fmt.Errorf(msg, err) - } - - oldDeploymentSpec := c.ConnectionPooler.Deployment - - deployment, err := c.updateConnectionPoolerDeployment( - oldDeploymentSpec, - newDeploymentSpec) - - if err != nil { - return reason, err - } - - c.ConnectionPooler.Deployment = deployment - return reason, nil - } - } - - newAnnotations := c.AnnotationsToPropagate(c.ConnectionPooler.Deployment.Annotations) - if newAnnotations != nil { - c.updateConnectionPoolerAnnotations(newAnnotations) - } - - service, err := c.KubeClient. - Services(c.Namespace). - Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{}) - - if err != nil && k8sutil.ResourceNotFound(err) { - msg := "Service %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName()) - - serviceSpec := c.generateConnectionPoolerService(&newSpec.Spec) - service, err := c.KubeClient. - Services(serviceSpec.Namespace). - Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) - - if err != nil { - return NoSync, err - } - - c.ConnectionPooler.Service = service - } else if err != nil { - msg := "could not get connection pooler service to sync: %v" - return NoSync, fmt.Errorf(msg, err) - } else { - // Service updates are not supported and probably not that useful anyway - c.ConnectionPooler.Service = service - } - - return NoSync, nil -} diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go deleted file mode 100644 index d9248ae33..000000000 --- a/pkg/cluster/sync_test.go +++ /dev/null @@ -1,264 +0,0 @@ -package cluster - -import ( - "fmt" - "strings" - "testing" - - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - "github.com/zalando/postgres-operator/pkg/util/config" - "github.com/zalando/postgres-operator/pkg/util/k8sutil" - - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func int32ToPointer(value int32) *int32 { - return &value -} - -func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler.Deployment.Spec.Replicas == nil || - *cluster.ConnectionPooler.Deployment.Spec.Replicas != 2 { - return fmt.Errorf("Wrong nubmer of instances") - } - - return nil -} - -func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler == nil { - return fmt.Errorf("Connection pooler resources are empty") - } - - if cluster.ConnectionPooler.Deployment == nil { - return fmt.Errorf("Deployment was not saved") - } - - if cluster.ConnectionPooler.Service == nil { - return fmt.Errorf("Service was not saved") - } - - return nil -} - -func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler != nil { - return fmt.Errorf("Connection pooler was not deleted") - } - - return nil -} - -func noEmptySync(cluster *Cluster, err error, reason SyncReason) error { - for _, msg := range reason { - if strings.HasPrefix(msg, "update [] from '' to '") { - return fmt.Errorf("There is an empty reason, %s", msg) - } - } - - return nil -} - -func TestConnectionPoolerSynchronization(t *testing.T) { - testName := "Test connection pooler synchronization" - newCluster := func() *Cluster { - return New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - NumberOfInstances: int32ToPointer(1), - }, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - } - cluster := newCluster() - - cluster.Statefulset = &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sts", - }, - } - - clusterMissingObjects := newCluster() - clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects() - - clusterMock := newCluster() - clusterMock.KubeClient = k8sutil.NewMockKubernetesClient() - - clusterDirtyMock := newCluster() - clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() - clusterDirtyMock.ConnectionPooler = &ConnectionPoolerObjects{ - Deployment: &appsv1.Deployment{}, - Service: &v1.Service{}, - } - - clusterNewDefaultsMock := newCluster() - clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() - - tests := []struct { - subTest string - oldSpec *acidv1.Postgresql - newSpec *acidv1.Postgresql - cluster *Cluster - defaultImage string - defaultInstances int32 - check func(cluster *Cluster, err error, reason SyncReason) error - }{ - { - subTest: "create if doesn't exist", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - cluster: clusterMissingObjects, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: objectsAreSaved, - }, - { - subTest: "create if doesn't exist with a flag", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - }, - }, - cluster: clusterMissingObjects, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: objectsAreSaved, - }, - { - subTest: "create from scratch", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - cluster: clusterMissingObjects, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: objectsAreSaved, - }, - { - subTest: "delete if not needed", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - cluster: clusterMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: objectsAreDeleted, - }, - { - subTest: "cleanup if still there", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - cluster: clusterDirtyMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: objectsAreDeleted, - }, - { - subTest: "update deployment", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{ - NumberOfInstances: int32ToPointer(1), - }, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{ - NumberOfInstances: int32ToPointer(2), - }, - }, - }, - cluster: clusterMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: deploymentUpdated, - }, - { - subTest: "update image from changed defaults", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - cluster: clusterNewDefaultsMock, - defaultImage: "pooler:2.0", - defaultInstances: 2, - check: deploymentUpdated, - }, - { - subTest: "there is no sync from nil to an empty spec", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - ConnectionPooler: nil, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - cluster: clusterMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: noEmptySync, - }, - } - for _, tt := range tests { - tt.cluster.OpConfig.ConnectionPooler.Image = tt.defaultImage - tt.cluster.OpConfig.ConnectionPooler.NumberOfInstances = - int32ToPointer(tt.defaultInstances) - - reason, err := tt.cluster.syncConnectionPooler(tt.oldSpec, - tt.newSpec, mockInstallLookupFunction) - - if err := tt.check(tt.cluster, err, reason); err != nil { - t.Errorf("%s [%s]: Could not synchronize, %+v", - testName, tt.subTest, err) - } - } -} diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 199914ccc..8aa519817 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -72,7 +72,7 @@ type ClusterStatus struct { type TemplateParams map[string]interface{} -type InstallFunction func(schema string, user string) error +type InstallFunction func(schema string, user string, role PostgresRole) error type SyncReason []string diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index b8ddb7087..a2fdcb08e 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -449,28 +449,6 @@ func (c *Cluster) labelsSelector() *metav1.LabelSelector { } } -// Return connection pooler labels selector, which should from one point of view -// inherit most of the labels from the cluster itself, but at the same time -// have e.g. different `application` label, so that recreatePod operation will -// not interfere with it (it lists all the pods via labels, and if there would -// be no difference, it will recreate also pooler pods). -func (c *Cluster) connectionPoolerLabelsSelector() *metav1.LabelSelector { - connectionPoolerLabels := labels.Set(map[string]string{}) - - extraLabels := labels.Set(map[string]string{ - "connection-pooler": c.connectionPoolerName(), - "application": "db-connection-pooler", - }) - - connectionPoolerLabels = labels.Merge(connectionPoolerLabels, c.labelsSet(false)) - connectionPoolerLabels = labels.Merge(connectionPoolerLabels, extraLabels) - - return &metav1.LabelSelector{ - MatchLabels: connectionPoolerLabels, - MatchExpressions: nil, - } -} - func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set { lbls := c.labelsSet(shouldAddExtraLabels) lbls[c.OpConfig.PodRoleLabel] = string(role) @@ -553,18 +531,6 @@ func (c *Cluster) patroniKubernetesUseConfigMaps() bool { return c.OpConfig.KubernetesUseConfigMaps } -func (c *Cluster) needConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { - if spec.EnableConnectionPooler == nil { - return spec.ConnectionPooler != nil - } else { - return *spec.EnableConnectionPooler - } -} - -func (c *Cluster) needConnectionPooler() bool { - return c.needConnectionPoolerWorker(&c.Spec) -} - // Earlier arguments take priority func mergeContainers(containers ...[]v1.Container) ([]v1.Container, []string) { containerNameTaken := map[string]bool{} diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 09ed83dc0..4b5d68fe5 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -473,7 +473,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { lg.Errorf("error while queueing cluster event: %v", clusterEvent) } - lg.Infof("%q event has been queued", eventType) + lg.Infof("%s event has been queued", eventType) if eventType != EventDelete { return @@ -494,7 +494,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. if err != nil { lg.Warningf("could not delete event from the queue: %v", err) } else { - lg.Debugf("event %q has been discarded for the cluster", evType) + lg.Debugf("event %s has been discarded for the cluster", evType) } } } diff --git a/ui/operator_ui/main.py b/ui/operator_ui/main.py index d159bee2d..f1242fa37 100644 --- a/ui/operator_ui/main.py +++ b/ui/operator_ui/main.py @@ -620,6 +620,17 @@ def update_postgresql(namespace: str, cluster: str): if 'enableConnectionPooler' in o['spec']: del o['spec']['enableConnectionPooler'] + if 'enableReplicaConnectionPooler' in postgresql['spec']: + cp = postgresql['spec']['enableReplicaConnectionPooler'] + if not cp: + if 'enableReplicaConnectionPooler' in o['spec']: + del o['spec']['enableReplicaConnectionPooler'] + else: + spec['enableReplicaConnectionPooler'] = True + else: + if 'enableReplicaConnectionPooler' in o['spec']: + del o['spec']['enableReplicaConnectionPooler'] + if 'enableReplicaLoadBalancer' in postgresql['spec']: rlb = postgresql['spec']['enableReplicaLoadBalancer'] if not rlb: