rename pooler parts and add example to manifest

This commit is contained in:
Felix Kunde 2020-03-27 13:43:35 +01:00
parent ba9cf68650
commit ecbe3b5e1d
35 changed files with 638 additions and 624 deletions

View File

@ -318,44 +318,44 @@ spec:
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
scalyr_server_url: scalyr_server_url:
type: string type: string
connection_pool: connection_pooler:
type: object type: object
properties: properties:
connection_pool_schema: connection_pooler_schema:
type: string type: string
#default: "pooler" #default: "pooler"
connection_pool_user: connection_pooler_user:
type: string type: string
#default: "pooler" #default: "pooler"
connection_pool_image: connection_pooler_image:
type: string type: string
#default: "registry.opensource.zalan.do/acid/pgbouncer" #default: "registry.opensource.zalan.do/acid/pgbouncer"
connection_pool_max_db_connections: connection_pooler_max_db_connections:
type: integer type: integer
#default: 60 #default: 60
connection_pool_mode: connection_pooler_mode:
type: string type: string
enum: enum:
- "session" - "session"
- "transaction" - "transaction"
#default: "transaction" #default: "transaction"
connection_pool_number_of_instances: connection_pooler_number_of_instances:
type: integer type: integer
minimum: 2 minimum: 2
#default: 2 #default: 2
connection_pool_default_cpu_limit: connection_pooler_default_cpu_limit:
type: string type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$' pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "1" #default: "1"
connection_pool_default_cpu_request: connection_pooler_default_cpu_request:
type: string type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$' pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "500m" #default: "500m"
connection_pool_default_memory_limit: connection_pooler_default_memory_limit:
type: string type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100Mi" #default: "100Mi"
connection_pool_default_memory_request: connection_pooler_default_memory_request:
type: string type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100Mi" #default: "100Mi"

View File

@ -106,7 +106,7 @@ spec:
uid: uid:
format: uuid format: uuid
type: string type: string
connectionPool: connectionPooler:
type: object type: object
properties: properties:
dockerImage: dockerImage:
@ -162,7 +162,7 @@ spec:
# Note: usernames specified here as database owners must be declared in the users key of the spec key. # Note: usernames specified here as database owners must be declared in the users key of the spec key.
dockerImage: dockerImage:
type: string type: string
enableConnectionPool: enableConnectionPooler:
type: boolean type: boolean
enableLogicalBackup: enableLogicalBackup:
type: boolean type: boolean

View File

@ -20,5 +20,5 @@ data:
{{ toYaml .Values.configDebug | indent 2 }} {{ toYaml .Values.configDebug | indent 2 }}
{{ toYaml .Values.configLoggingRestApi | indent 2 }} {{ toYaml .Values.configLoggingRestApi | indent 2 }}
{{ toYaml .Values.configTeamsApi | indent 2 }} {{ toYaml .Values.configTeamsApi | indent 2 }}
{{ toYaml .Values.configConnectionPool | indent 2 }} {{ toYaml .Values.configConnectionPooler | indent 2 }}
{{- end }} {{- end }}

View File

@ -34,6 +34,6 @@ configuration:
{{ toYaml .Values.configLoggingRestApi | indent 4 }} {{ toYaml .Values.configLoggingRestApi | indent 4 }}
scalyr: scalyr:
{{ toYaml .Values.configScalyr | indent 4 }} {{ toYaml .Values.configScalyr | indent 4 }}
connection_pool: connection_pooler:
{{ toYaml .Values.configConnectionPool | indent 4 }} {{ toYaml .Values.configConnectionPooler | indent 4 }}
{{- end }} {{- end }}

View File

@ -267,24 +267,24 @@ configScalyr:
# Memory request value for the Scalyr sidecar # Memory request value for the Scalyr sidecar
scalyr_memory_request: 50Mi scalyr_memory_request: 50Mi
configConnectionPool: configConnectionPooler:
# db schema to install lookup function into # db schema to install lookup function into
connection_pool_schema: "pooler" connection_pooler_schema: "pooler"
# db user for pooler to use # db user for pooler to use
connection_pool_user: "pooler" connection_pooler_user: "pooler"
# docker image # docker image
connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer"
# max db connections the pooler should hold # max db connections the pooler should hold
connection_pool_max_db_connections: 60 connection_pooler_max_db_connections: 60
# default pooling mode # default pooling mode
connection_pool_mode: "transaction" connection_pooler_mode: "transaction"
# number of pooler instances # number of pooler instances
connection_pool_number_of_instances: 2 connection_pooler_number_of_instances: 2
# default resources # default resources
connection_pool_default_cpu_request: 500m connection_pooler_default_cpu_request: 500m
connection_pool_default_memory_request: 100Mi connection_pooler_default_memory_request: 100Mi
connection_pool_default_cpu_limit: "1" connection_pooler_default_cpu_limit: "1"
connection_pool_default_memory_limit: 100Mi connection_pooler_default_memory_limit: 100Mi
rbac: rbac:
# Specifies whether RBAC resources should be created # Specifies whether RBAC resources should be created

View File

@ -244,24 +244,24 @@ configTeamsApi:
# teams_api_url: http://fake-teams-api.default.svc.cluster.local # teams_api_url: http://fake-teams-api.default.svc.cluster.local
# configure connection pooler deployment created by the operator # configure connection pooler deployment created by the operator
configConnectionPool: configConnectionPooler:
# db schema to install lookup function into # db schema to install lookup function into
connection_pool_schema: "pooler" connection_pooler_schema: "pooler"
# db user for pooler to use # db user for pooler to use
connection_pool_user: "pooler" connection_pooler_user: "pooler"
# docker image # docker image
connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer"
# max db connections the pooler should hold # max db connections the pooler should hold
connection_pool_max_db_connections: 60 connection_pooler_max_db_connections: 60
# default pooling mode # default pooling mode
connection_pool_mode: "transaction" connection_pooler_mode: "transaction"
# number of pooler instances # number of pooler instances
connection_pool_number_of_instances: 2 connection_pooler_number_of_instances: 2
# default resources # default resources
connection_pool_default_cpu_request: 500m connection_pooler_default_cpu_request: 500m
connection_pool_default_memory_request: 100Mi connection_pooler_default_memory_request: 100Mi
connection_pool_default_cpu_limit: "1" connection_pooler_default_cpu_limit: "1"
connection_pool_default_memory_limit: 100Mi connection_pooler_default_memory_limit: 100Mi
rbac: rbac:
# Specifies whether RBAC resources should be created # Specifies whether RBAC resources should be created

View File

@ -140,10 +140,10 @@ These parameters are grouped directly under the `spec` key in the manifest.
is `false`, then no volume will be mounted no matter how operator was is `false`, then no volume will be mounted no matter how operator was
configured (so you can override the operator configuration). Optional. configured (so you can override the operator configuration). Optional.
* **enableConnectionPool** * **enableConnectionPooler**
Tells the operator to create a connection pool with a database. If this Tells the operator to create a connection pooler with a database. If this
field is true, a connection pool deployment will be created even if field is true, a connection pooler deployment will be created even if
`connectionPool` section is empty. Optional, not set by default. `connectionPooler` section is empty. Optional, not set by default.
* **enableLogicalBackup** * **enableLogicalBackup**
Determines if the logical backup of this cluster should be taken and uploaded Determines if the logical backup of this cluster should be taken and uploaded
@ -365,34 +365,34 @@ CPU and memory limits for the sidecar container.
memory limits for the sidecar container. Optional, overrides the memory limits for the sidecar container. Optional, overrides the
`default_memory_limits` operator configuration parameter. Optional. `default_memory_limits` operator configuration parameter. Optional.
## Connection pool ## Connection pooler
Parameters are grouped under the `connectionPool` top-level key and specify Parameters are grouped under the `connectionPooler` top-level key and specify
configuration for connection pool. If this section is not empty, a connection configuration for connection pooler. If this section is not empty, a connection
pool will be created for a database even if `enableConnectionPool` is not pooler will be created for a database even if `enableConnectionPooler` is not
present. present.
* **numberOfInstances** * **numberOfInstances**
How many instances of connection pool to create. How many instances of connection pooler to create.
* **schema** * **schema**
Schema to create for credentials lookup function. Schema to create for credentials lookup function.
* **user** * **user**
User to create for connection pool to be able to connect to a database. User to create for connection pooler to be able to connect to a database.
* **dockerImage** * **dockerImage**
Which docker image to use for connection pool deployment. Which docker image to use for connection pooler deployment.
* **maxDBConnections** * **maxDBConnections**
How many connections the pooler can max hold. This value is divided among the How many connections the pooler can max hold. This value is divided among the
pooler pods. pooler pods.
* **mode** * **mode**
In which mode to run connection pool, transaction or session. In which mode to run connection pooler, transaction or session.
* **resources** * **resources**
Resource configuration for connection pool deployment. Resource configuration for connection pooler deployment.
## Custom TLS certificates ## Custom TLS certificates

View File

@ -597,39 +597,39 @@ scalyr sidecar. In the CRD-based configuration they are grouped under the
* **scalyr_memory_limit** * **scalyr_memory_limit**
Memory limit value for the Scalyr sidecar. The default is `500Mi`. Memory limit value for the Scalyr sidecar. The default is `500Mi`.
## Connection pool configuration ## Connection pooler configuration
Parameters are grouped under the `connection_pool` top-level key and specify Parameters are grouped under the `connection_pooler` top-level key and specify
default configuration for connection pool, if a postgres manifest requests it default configuration for connection pooler, if a postgres manifest requests it
but do not specify some of the parameters. All of them are optional with the but do not specify some of the parameters. All of them are optional with the
operator being able to provide some reasonable defaults. operator being able to provide some reasonable defaults.
* **connection_pool_number_of_instances** * **connection_pooler_number_of_instances**
How many instances of connection pool to create. Default is 2 which is also How many instances of connection pooler to create. Default is 2 which is also
the required minimum. the required minimum.
* **connection_pool_schema** * **connection_pooler_schema**
Schema to create for credentials lookup function. Default is `pooler`. Schema to create for credentials lookup function. Default is `pooler`.
* **connection_pool_user** * **connection_pooler_user**
User to create for connection pool to be able to connect to a database. User to create for connection pooler to be able to connect to a database.
Default is `pooler`. Default is `pooler`.
* **connection_pool_image** * **connection_pooler_image**
Docker image to use for connection pool deployment. Docker image to use for connection pooler deployment.
Default: "registry.opensource.zalan.do/acid/pgbouncer" Default: "registry.opensource.zalan.do/acid/pgbouncer"
* **connection_pool_max_db_connections** * **connection_pooler_max_db_connections**
How many connections the pooler can max hold. This value is divided among the How many connections the pooler can max hold. This value is divided among the
pooler pods. Default is 60 which will make up 30 connections per pod for the pooler pods. Default is 60 which will make up 30 connections per pod for the
default setup with two instances. default setup with two instances.
* **connection_pool_mode** * **connection_pooler_mode**
Default pool mode, `session` or `transaction`. Default is `transaction`. Default pooler mode, `session` or `transaction`. Default is `transaction`.
* **connection_pool_default_cpu_request** * **connection_pooler_default_cpu_request**
**connection_pool_default_memory_reques** **connection_pooler_default_memory_reques**
**connection_pool_default_cpu_limit** **connection_pooler_default_cpu_limit**
**connection_pool_default_memory_limit** **connection_pooler_default_memory_limit**
Default resource configuration for connection pool deployment. The internal Default resource configuration for connection pooler deployment. The internal
default for memory request and limit is `100Mi`, for CPU it is `500m` and `1`. default for memory request and limit is `100Mi`, for CPU it is `500m` and `1`.

View File

@ -512,29 +512,29 @@ monitoring is outside the scope of operator responsibilities. See
[administrator documentation](administrator.md) for details on how backups are [administrator documentation](administrator.md) for details on how backups are
executed. executed.
## Connection pool ## Connection pooler
The operator can create a database side connection pool for those applications, The operator can create a database side connection pooler for those applications,
where an application side pool is not feasible, but a number of connections is where an application side pooler is not feasible, but a number of connections is
high. To create a connection pool together with a database, modify the high. To create a connection pooler together with a database, modify the
manifest: manifest:
```yaml ```yaml
spec: spec:
enableConnectionPool: true enableConnectionPooler: true
``` ```
This will tell the operator to create a connection pool with default This will tell the operator to create a connection pooler with default
configuration, through which one can access the master via a separate service configuration, through which one can access the master via a separate service
`{cluster-name}-pooler`. In most of the cases provided default configuration `{cluster-name}-pooler`. In most of the cases provided default configuration
should be good enough. should be good enough.
To configure a new connection pool, specify: To configure a new connection pooler, specify:
``` ```
spec: spec:
connectionPool: connectionPooler:
# how many instances of connection pool to create # how many instances of connection pooler to create
number_of_instances: 2 number_of_instances: 2
# in which mode to run, session or transaction # in which mode to run, session or transaction
@ -544,7 +544,7 @@ spec:
# function # function
schema: "pooler" schema: "pooler"
# user, which operator will create for connection pool # user, which operator will create for connection pooler
user: "pooler" user: "pooler"
# resources for each instance # resources for each instance
@ -557,9 +557,9 @@ spec:
memory: 100Mi memory: 100Mi
``` ```
By default `pgbouncer` is used to create a connection pool. To find out about By default `pgbouncer` is used to create a connection pooler. To find out about
pool modes see [docs](https://www.pgbouncer.org/config.html#pool_mode) (but it pooler modes see [docs](https://www.pgbouncer.org/config.html#pooler_mode) (but
should be general approach between different implementation). it should be general approach between different implementation).
Note, that using `pgbouncer` means meaningful resource CPU limit should be less Note, that using `pgbouncer` means meaningful resource CPU limit should be less
than 1 core (there is a way to utilize more than one, but in K8S it's easier than 1 core (there is a way to utilize more than one, but in K8S it's easier

View File

@ -350,32 +350,32 @@ class EndToEndTestCase(unittest.TestCase):
k8s.update_config(unpatch_custom_service_annotations) k8s.update_config(unpatch_custom_service_annotations)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC) @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_enable_disable_connection_pool(self): def test_enable_disable_connection_pooler(self):
''' '''
For a database without connection pool, then turns it on, scale up, For a database without connection pooler, then turns it on, scale up,
turn off and on again. Test with different ways of doing this (via turn off and on again. Test with different ways of doing this (via
enableConnectionPool or connectionPool configuration section). At the enableConnectionPooler or connectionPooler configuration section). At the
end turn the connection pool off to not interfere with other tests. end turn the connection pooler off to not interfere with other tests.
''' '''
k8s = self.k8s k8s = self.k8s
service_labels = { service_labels = {
'cluster-name': 'acid-minimal-cluster', 'cluster-name': 'acid-minimal-cluster',
} }
pod_labels = dict({ pod_labels = dict({
'connection-pool': 'acid-minimal-cluster-pooler', 'connection-pooler': 'acid-minimal-cluster-pooler',
}) })
pod_selector = to_selector(pod_labels) pod_selector = to_selector(pod_labels)
service_selector = to_selector(service_labels) service_selector = to_selector(service_labels)
try: try:
# enable connection pool # enable connection pooler
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'acid.zalan.do', 'v1', 'default',
'postgresqls', 'acid-minimal-cluster', 'postgresqls', 'acid-minimal-cluster',
{ {
'spec': { 'spec': {
'enableConnectionPool': True, 'enableConnectionPooler': True,
} }
}) })
k8s.wait_for_pod_start(pod_selector) k8s.wait_for_pod_start(pod_selector)
@ -384,7 +384,7 @@ class EndToEndTestCase(unittest.TestCase):
'default', label_selector=pod_selector 'default', label_selector=pod_selector
).items ).items
self.assertTrue(pods, 'No connection pool pods') self.assertTrue(pods, 'No connection pooler pods')
k8s.wait_for_service(service_selector) k8s.wait_for_service(service_selector)
services = k8s.api.core_v1.list_namespaced_service( services = k8s.api.core_v1.list_namespaced_service(
@ -395,15 +395,15 @@ class EndToEndTestCase(unittest.TestCase):
if s.metadata.name.endswith('pooler') if s.metadata.name.endswith('pooler')
] ]
self.assertTrue(services, 'No connection pool service') self.assertTrue(services, 'No connection pooler service')
# scale up connection pool deployment # scale up connection pooler deployment
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'acid.zalan.do', 'v1', 'default',
'postgresqls', 'acid-minimal-cluster', 'postgresqls', 'acid-minimal-cluster',
{ {
'spec': { 'spec': {
'connectionPool': { 'connectionPooler': {
'numberOfInstances': 2, 'numberOfInstances': 2,
}, },
} }
@ -417,7 +417,7 @@ class EndToEndTestCase(unittest.TestCase):
'postgresqls', 'acid-minimal-cluster', 'postgresqls', 'acid-minimal-cluster',
{ {
'spec': { 'spec': {
'enableConnectionPool': False, 'enableConnectionPooler': False,
} }
}) })
k8s.wait_for_pods_to_stop(pod_selector) k8s.wait_for_pods_to_stop(pod_selector)
@ -427,7 +427,7 @@ class EndToEndTestCase(unittest.TestCase):
'postgresqls', 'acid-minimal-cluster', 'postgresqls', 'acid-minimal-cluster',
{ {
'spec': { 'spec': {
'enableConnectionPool': True, 'enableConnectionPooler': True,
} }
}) })
k8s.wait_for_pod_start(pod_selector) k8s.wait_for_pod_start(pod_selector)

View File

@ -19,6 +19,7 @@ spec:
- createdb - createdb
enableMasterLoadBalancer: false enableMasterLoadBalancer: false
enableReplicaLoadBalancer: false enableReplicaLoadBalancer: false
# enableConnectionPooler: true # not needed when connectionPooler section is present (see below)
allowedSourceRanges: # load balancers' source ranges for both master and replica services allowedSourceRanges: # load balancers' source ranges for both master and replica services
- 127.0.0.1/32 - 127.0.0.1/32
databases: databases:
@ -85,6 +86,19 @@ spec:
# - 01:00-06:00 #UTC # - 01:00-06:00 #UTC
# - Sat:00:00-04:00 # - Sat:00:00-04:00
connectionPooler:
number_of_instances: 2
mode: "transaction"
schema: "pooler"
user: "pooler"
resources:
requests:
cpu: 300m
memory: 100Mi
limits:
cpu: "1"
memory: 100Mi
initContainers: initContainers:
- name: date - name: date
image: busybox image: busybox

View File

@ -11,16 +11,16 @@ data:
cluster_history_entries: "1000" cluster_history_entries: "1000"
cluster_labels: application:spilo cluster_labels: application:spilo
cluster_name_label: cluster-name cluster_name_label: cluster-name
# connection_pool_default_cpu_limit: "1" # connection_pooler_default_cpu_limit: "1"
# connection_pool_default_cpu_request: "500m" # connection_pooler_default_cpu_request: "500m"
# connection_pool_default_memory_limit: 100Mi # connection_pooler_default_memory_limit: 100Mi
# connection_pool_default_memory_request: 100Mi # connection_pooler_default_memory_request: 100Mi
connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5" connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5"
# connection_pool_max_db_connections: 60 # connection_pooler_max_db_connections: 60
# connection_pool_mode: "transaction" # connection_pooler_mode: "transaction"
# connection_pool_number_of_instances: 2 # connection_pooler_number_of_instances: 2
# connection_pool_schema: "pooler" # connection_pooler_schema: "pooler"
# connection_pool_user: "pooler" # connection_pooler_user: "pooler"
# custom_service_annotations: "keyx:valuez,keya:valuea" # custom_service_annotations: "keyx:valuez,keya:valuea"
# custom_pod_annotations: "keya:valuea,keyb:valueb" # custom_pod_annotations: "keya:valuea,keyb:valueb"
db_hosted_zone: db.example.com db_hosted_zone: db.example.com

View File

@ -294,44 +294,44 @@ spec:
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
scalyr_server_url: scalyr_server_url:
type: string type: string
connection_pool: connection_pooler:
type: object type: object
properties: properties:
connection_pool_schema: connection_pooler_schema:
type: string type: string
#default: "pooler" #default: "pooler"
connection_pool_user: connection_pooler_user:
type: string type: string
#default: "pooler" #default: "pooler"
connection_pool_image: connection_pooler_image:
type: string type: string
#default: "registry.opensource.zalan.do/acid/pgbouncer" #default: "registry.opensource.zalan.do/acid/pgbouncer"
connection_pool_max_db_connections: connection_pooler_max_db_connections:
type: integer type: integer
#default: 60 #default: 60
connection_pool_mode: connection_pooler_mode:
type: string type: string
enum: enum:
- "session" - "session"
- "transaction" - "transaction"
#default: "transaction" #default: "transaction"
connection_pool_number_of_instances: connection_pooler_number_of_instances:
type: integer type: integer
minimum: 2 minimum: 2
#default: 2 #default: 2
connection_pool_default_cpu_limit: connection_pooler_default_cpu_limit:
type: string type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$' pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "1" #default: "1"
connection_pool_default_cpu_request: connection_pooler_default_cpu_request:
type: string type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$' pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "500m" #default: "500m"
connection_pool_default_memory_limit: connection_pooler_default_memory_limit:
type: string type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100Mi" #default: "100Mi"
connection_pool_default_memory_request: connection_pooler_default_memory_request:
type: string type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100Mi" #default: "100Mi"

View File

@ -121,14 +121,14 @@ configuration:
scalyr_memory_limit: 500Mi scalyr_memory_limit: 500Mi
scalyr_memory_request: 50Mi scalyr_memory_request: 50Mi
# scalyr_server_url: "" # scalyr_server_url: ""
connection_pool: connection_pooler:
connection_pool_default_cpu_limit: "1" connection_pooler_default_cpu_limit: "1"
connection_pool_default_cpu_request: "500m" connection_pooler_default_cpu_request: "500m"
connection_pool_default_memory_limit: 100Mi connection_pooler_default_memory_limit: 100Mi
connection_pool_default_memory_request: 100Mi connection_pooler_default_memory_request: 100Mi
connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5" connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5"
# connection_pool_max_db_connections: 60 # connection_pooler_max_db_connections: 60
connection_pool_mode: "transaction" connection_pooler_mode: "transaction"
connection_pool_number_of_instances: 2 connection_pooler_number_of_instances: 2
# connection_pool_schema: "pooler" # connection_pooler_schema: "pooler"
# connection_pool_user: "pooler" # connection_pooler_user: "pooler"

View File

@ -70,7 +70,7 @@ spec:
uid: uid:
format: uuid format: uuid
type: string type: string
connectionPool: connectionPooler:
type: object type: object
properties: properties:
dockerImage: dockerImage:
@ -126,7 +126,7 @@ spec:
# Note: usernames specified here as database owners must be declared in the users key of the spec key. # Note: usernames specified here as database owners must be declared in the users key of the spec key.
dockerImage: dockerImage:
type: string type: string
enableConnectionPool: enableConnectionPooler:
type: boolean type: boolean
enableLogicalBackup: enableLogicalBackup:
type: boolean type: boolean

View File

@ -177,7 +177,7 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
}, },
}, },
}, },
"connectionPool": { "connectionPooler": {
Type: "object", Type: "object",
Properties: map[string]apiextv1beta1.JSONSchemaProps{ Properties: map[string]apiextv1beta1.JSONSchemaProps{
"dockerImage": { "dockerImage": {
@ -259,7 +259,7 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
"dockerImage": { "dockerImage": {
Type: "string", Type: "string",
}, },
"enableConnectionPool": { "enableConnectionPooler": {
Type: "boolean", Type: "boolean",
}, },
"enableLogicalBackup": { "enableLogicalBackup": {
@ -1129,32 +1129,32 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation
}, },
}, },
}, },
"connection_pool": { "connection_pooler": {
Type: "object", Type: "object",
Properties: map[string]apiextv1beta1.JSONSchemaProps{ Properties: map[string]apiextv1beta1.JSONSchemaProps{
"connection_pool_default_cpu_limit": { "connection_pooler_default_cpu_limit": {
Type: "string", Type: "string",
Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$",
}, },
"connection_pool_default_cpu_request": { "connection_pooler_default_cpu_request": {
Type: "string", Type: "string",
Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$",
}, },
"connection_pool_default_memory_limit": { "connection_pooler_default_memory_limit": {
Type: "string", Type: "string",
Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$",
}, },
"connection_pool_default_memory_request": { "connection_pooler_default_memory_request": {
Type: "string", Type: "string",
Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$",
}, },
"connection_pool_image": { "connection_pooler_image": {
Type: "string", Type: "string",
}, },
"connection_pool_max_db_connections": { "connection_pooler_max_db_connections": {
Type: "integer", Type: "integer",
}, },
"connection_pool_mode": { "connection_pooler_mode": {
Type: "string", Type: "string",
Enum: []apiextv1beta1.JSON{ Enum: []apiextv1beta1.JSON{
{ {
@ -1165,14 +1165,14 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation
}, },
}, },
}, },
"connection_pool_number_of_instances": { "connection_pooler_number_of_instances": {
Type: "integer", Type: "integer",
Minimum: &min2, Minimum: &min2,
}, },
"connection_pool_schema": { "connection_pooler_schema": {
Type: "string", Type: "string",
}, },
"connection_pool_user": { "connection_pooler_user": {
Type: "string", Type: "string",
}, },
}, },

View File

@ -153,18 +153,18 @@ type ScalyrConfiguration struct {
ScalyrMemoryLimit string `json:"scalyr_memory_limit,omitempty"` ScalyrMemoryLimit string `json:"scalyr_memory_limit,omitempty"`
} }
// Defines default configuration for connection pool // Defines default configuration for connection pooler
type ConnectionPoolConfiguration struct { type ConnectionPoolerConfiguration struct {
NumberOfInstances *int32 `json:"connection_pool_number_of_instances,omitempty"` NumberOfInstances *int32 `json:"connection_pooler_number_of_instances,omitempty"`
Schema string `json:"connection_pool_schema,omitempty"` Schema string `json:"connection_pooler_schema,omitempty"`
User string `json:"connection_pool_user,omitempty"` User string `json:"connection_pooler_user,omitempty"`
Image string `json:"connection_pool_image,omitempty"` Image string `json:"connection_pooler_image,omitempty"`
Mode string `json:"connection_pool_mode,omitempty"` Mode string `json:"connection_pooler_mode,omitempty"`
MaxDBConnections *int32 `json:"connection_pool_max_db_connections,omitempty"` MaxDBConnections *int32 `json:"connection_pooler_max_db_connections,omitempty"`
DefaultCPURequest string `json:"connection_pool_default_cpu_request,omitempty"` DefaultCPURequest string `json:"connection_pooler_default_cpu_request,omitempty"`
DefaultMemoryRequest string `json:"connection_pool_default_memory_request,omitempty"` DefaultMemoryRequest string `json:"connection_pooler_default_memory_request,omitempty"`
DefaultCPULimit string `json:"connection_pool_default_cpu_limit,omitempty"` DefaultCPULimit string `json:"connection_pooler_default_cpu_limit,omitempty"`
DefaultMemoryLimit string `json:"connection_pool_default_memory_limit,omitempty"` DefaultMemoryLimit string `json:"connection_pooler_default_memory_limit,omitempty"`
} }
// OperatorLogicalBackupConfiguration defines configuration for logical backup // OperatorLogicalBackupConfiguration defines configuration for logical backup
@ -203,7 +203,7 @@ type OperatorConfigurationData struct {
LoggingRESTAPI LoggingRESTAPIConfiguration `json:"logging_rest_api"` LoggingRESTAPI LoggingRESTAPIConfiguration `json:"logging_rest_api"`
Scalyr ScalyrConfiguration `json:"scalyr"` Scalyr ScalyrConfiguration `json:"scalyr"`
LogicalBackup OperatorLogicalBackupConfiguration `json:"logical_backup"` LogicalBackup OperatorLogicalBackupConfiguration `json:"logical_backup"`
ConnectionPool ConnectionPoolConfiguration `json:"connection_pool"` ConnectionPooler ConnectionPoolerConfiguration `json:"connection_pooler"`
} }
//Duration shortens this frequently used name //Duration shortens this frequently used name

View File

@ -29,8 +29,8 @@ type PostgresSpec struct {
Patroni `json:"patroni,omitempty"` Patroni `json:"patroni,omitempty"`
Resources `json:"resources,omitempty"` Resources `json:"resources,omitempty"`
EnableConnectionPool *bool `json:"enableConnectionPool,omitempty"` EnableConnectionPooler *bool `json:"enableConnectionPooler,omitempty"`
ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` ConnectionPooler *ConnectionPooler `json:"connectionPooler,omitempty"`
TeamID string `json:"teamId"` TeamID string `json:"teamId"`
DockerImage string `json:"dockerImage,omitempty"` DockerImage string `json:"dockerImage,omitempty"`
@ -175,10 +175,10 @@ type PostgresStatus struct {
// resources) // resources)
// Type string `json:"type,omitempty"` // Type string `json:"type,omitempty"`
// //
// TODO: figure out what other important parameters of the connection pool it // TODO: figure out what other important parameters of the connection pooler it
// makes sense to expose. E.g. pool size (min/max boundaries), max client // makes sense to expose. E.g. pool size (min/max boundaries), max client
// connections etc. // connections etc.
type ConnectionPool struct { type ConnectionPooler struct {
NumberOfInstances *int32 `json:"numberOfInstances,omitempty"` NumberOfInstances *int32 `json:"numberOfInstances,omitempty"`
Schema string `json:"schema,omitempty"` Schema string `json:"schema,omitempty"`
User string `json:"user,omitempty"` User string `json:"user,omitempty"`

View File

@ -69,7 +69,7 @@ func (in *CloneDescription) DeepCopy() *CloneDescription {
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ConnectionPool) DeepCopyInto(out *ConnectionPool) { func (in *ConnectionPooler) DeepCopyInto(out *ConnectionPooler) {
*out = *in *out = *in
if in.NumberOfInstances != nil { if in.NumberOfInstances != nil {
in, out := &in.NumberOfInstances, &out.NumberOfInstances in, out := &in.NumberOfInstances, &out.NumberOfInstances
@ -85,18 +85,18 @@ func (in *ConnectionPool) DeepCopyInto(out *ConnectionPool) {
return return
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPool. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPooler.
func (in *ConnectionPool) DeepCopy() *ConnectionPool { func (in *ConnectionPooler) DeepCopy() *ConnectionPooler {
if in == nil { if in == nil {
return nil return nil
} }
out := new(ConnectionPool) out := new(ConnectionPooler)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ConnectionPoolConfiguration) DeepCopyInto(out *ConnectionPoolConfiguration) { func (in *ConnectionPoolerConfiguration) DeepCopyInto(out *ConnectionPoolerConfiguration) {
*out = *in *out = *in
if in.NumberOfInstances != nil { if in.NumberOfInstances != nil {
in, out := &in.NumberOfInstances, &out.NumberOfInstances in, out := &in.NumberOfInstances, &out.NumberOfInstances
@ -111,12 +111,12 @@ func (in *ConnectionPoolConfiguration) DeepCopyInto(out *ConnectionPoolConfigura
return return
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPoolConfiguration. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPoolerConfiguration.
func (in *ConnectionPoolConfiguration) DeepCopy() *ConnectionPoolConfiguration { func (in *ConnectionPoolerConfiguration) DeepCopy() *ConnectionPoolerConfiguration {
if in == nil { if in == nil {
return nil return nil
} }
out := new(ConnectionPoolConfiguration) out := new(ConnectionPoolerConfiguration)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }
@ -308,7 +308,7 @@ func (in *OperatorConfigurationData) DeepCopyInto(out *OperatorConfigurationData
out.LoggingRESTAPI = in.LoggingRESTAPI out.LoggingRESTAPI = in.LoggingRESTAPI
out.Scalyr = in.Scalyr out.Scalyr = in.Scalyr
out.LogicalBackup = in.LogicalBackup out.LogicalBackup = in.LogicalBackup
in.ConnectionPool.DeepCopyInto(&out.ConnectionPool) in.ConnectionPooler.DeepCopyInto(&out.ConnectionPooler)
return return
} }
@ -471,14 +471,14 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) {
out.Volume = in.Volume out.Volume = in.Volume
in.Patroni.DeepCopyInto(&out.Patroni) in.Patroni.DeepCopyInto(&out.Patroni)
out.Resources = in.Resources out.Resources = in.Resources
if in.EnableConnectionPool != nil { if in.EnableConnectionPooler != nil {
in, out := &in.EnableConnectionPool, &out.EnableConnectionPool in, out := &in.EnableConnectionPooler, &out.EnableConnectionPooler
*out = new(bool) *out = new(bool)
**out = **in **out = **in
} }
if in.ConnectionPool != nil { if in.ConnectionPooler != nil {
in, out := &in.ConnectionPool, &out.ConnectionPool in, out := &in.ConnectionPooler, &out.ConnectionPooler
*out = new(ConnectionPool) *out = new(ConnectionPooler)
(*in).DeepCopyInto(*out) (*in).DeepCopyInto(*out)
} }
if in.SpiloFSGroup != nil { if in.SpiloFSGroup != nil {

View File

@ -49,14 +49,14 @@ type Config struct {
PodServiceAccountRoleBinding *rbacv1.RoleBinding PodServiceAccountRoleBinding *rbacv1.RoleBinding
} }
// K8S objects that are belongs to a connection pool // K8S objects that are belongs to a connection pooler
type ConnectionPoolObjects struct { type ConnectionPoolerObjects struct {
Deployment *appsv1.Deployment Deployment *appsv1.Deployment
Service *v1.Service Service *v1.Service
// It could happen that a connection pool was enabled, but the operator was // It could happen that a connection pooler was enabled, but the operator
// not able to properly process a corresponding event or was restarted. In // was not able to properly process a corresponding event or was restarted.
// this case we will miss missing/require situation and a lookup function // In this case we will miss missing/require situation and a lookup function
// will not be installed. To avoid synchronizing it all the time to prevent // will not be installed. To avoid synchronizing it all the time to prevent
// this, we can remember the result in memory at least until the next // this, we can remember the result in memory at least until the next
// restart. // restart.
@ -68,7 +68,7 @@ type kubeResources struct {
Endpoints map[PostgresRole]*v1.Endpoints Endpoints map[PostgresRole]*v1.Endpoints
Secrets map[types.UID]*v1.Secret Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet Statefulset *appsv1.StatefulSet
ConnectionPool *ConnectionPoolObjects ConnectionPooler *ConnectionPoolerObjects
PodDisruptionBudget *policybeta1.PodDisruptionBudget PodDisruptionBudget *policybeta1.PodDisruptionBudget
//Pods are treated separately //Pods are treated separately
//PVCs are treated separately //PVCs are treated separately
@ -335,24 +335,24 @@ func (c *Cluster) Create() error {
c.logger.Errorf("could not list resources: %v", err) c.logger.Errorf("could not list resources: %v", err)
} }
// Create connection pool deployment and services if necessary. Since we // Create connection pooler deployment and services if necessary. Since we
// need to peform some operations with the database itself (e.g. install // need to perform some operations with the database itself (e.g. install
// lookup function), do it as the last step, when everything is available. // lookup function), do it as the last step, when everything is available.
// //
// Do not consider connection pool as a strict requirement, and if // Do not consider connection pooler as a strict requirement, and if
// something fails, report warning // something fails, report warning
if c.needConnectionPool() { if c.needConnectionPooler() {
if c.ConnectionPool != nil { if c.ConnectionPooler != nil {
c.logger.Warning("Connection pool already exists in the cluster") c.logger.Warning("Connection pooler already exists in the cluster")
return nil return nil
} }
connPool, err := c.createConnectionPool(c.installLookupFunction) connectionPooler, err := c.createConnectionPooler(c.installLookupFunction)
if err != nil { if err != nil {
c.logger.Warningf("could not create connection pool: %v", err) c.logger.Warningf("could not create connection pooler: %v", err)
return nil return nil
} }
c.logger.Infof("connection pool %q has been successfully created", c.logger.Infof("connection pooler %q has been successfully created",
util.NameFromMeta(connPool.Deployment.ObjectMeta)) util.NameFromMeta(connectionPooler.Deployment.ObjectMeta))
} }
return nil return nil
@ -610,11 +610,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
} }
} }
// connection pool needs one system user created, which is done in // connection pooler needs one system user created, which is done in
// initUsers. Check if it needs to be called. // initUsers. Check if it needs to be called.
sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users)
needConnPool := c.needConnectionPoolWorker(&newSpec.Spec) needConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec)
if !sameUsers || needConnPool { if !sameUsers || needConnectionPooler {
c.logger.Debugf("syncing secrets") c.logger.Debugf("syncing secrets")
if err := c.initUsers(); err != nil { if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users: %v", err) c.logger.Errorf("could not init users: %v", err)
@ -738,9 +738,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
} }
} }
// sync connection pool // sync connection pooler
if err := c.syncConnectionPool(oldSpec, newSpec, c.installLookupFunction); err != nil { if err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil {
return fmt.Errorf("could not sync connection pool: %v", err) return fmt.Errorf("could not sync connection pooler: %v", err)
} }
return nil return nil
@ -794,11 +794,11 @@ func (c *Cluster) Delete() {
c.logger.Warningf("could not remove leftover patroni objects; %v", err) c.logger.Warningf("could not remove leftover patroni objects; %v", err)
} }
// Delete connection pool objects anyway, even if it's not mentioned in the // Delete connection pooler objects anyway, even if it's not mentioned in the
// manifest, just to not keep orphaned components in case if something went // manifest, just to not keep orphaned components in case if something went
// wrong // wrong
if err := c.deleteConnectionPool(); err != nil { if err := c.deleteConnectionPooler(); err != nil {
c.logger.Warningf("could not remove connection pool: %v", err) c.logger.Warningf("could not remove connection pooler: %v", err)
} }
} }
@ -867,32 +867,32 @@ func (c *Cluster) initSystemUsers() {
Password: util.RandomPassword(constants.PasswordLength), Password: util.RandomPassword(constants.PasswordLength),
} }
// Connection pool user is an exception, if requested it's going to be // Connection pooler user is an exception, if requested it's going to be
// created by operator as a normal pgUser // created by operator as a normal pgUser
if c.needConnectionPool() { if c.needConnectionPooler() {
// initialize empty connection pool if not done yet // initialize empty connection pooler if not done yet
if c.Spec.ConnectionPool == nil { if c.Spec.ConnectionPooler == nil {
c.Spec.ConnectionPool = &acidv1.ConnectionPool{} c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{}
} }
username := util.Coalesce( username := util.Coalesce(
c.Spec.ConnectionPool.User, c.Spec.ConnectionPooler.User,
c.OpConfig.ConnectionPool.User) c.OpConfig.ConnectionPooler.User)
// connection pooler application should be able to login with this role // connection pooler application should be able to login with this role
connPoolUser := spec.PgUser{ connectionPoolerUser := spec.PgUser{
Origin: spec.RoleConnectionPool, Origin: spec.RoleConnectionPooler,
Name: username, Name: username,
Flags: []string{constants.RoleFlagLogin}, Flags: []string{constants.RoleFlagLogin},
Password: util.RandomPassword(constants.PasswordLength), Password: util.RandomPassword(constants.PasswordLength),
} }
if _, exists := c.pgUsers[username]; !exists { if _, exists := c.pgUsers[username]; !exists {
c.pgUsers[username] = connPoolUser c.pgUsers[username] = connectionPoolerUser
} }
if _, exists := c.systemUsers[constants.ConnectionPoolUserKeyName]; !exists { if _, exists := c.systemUsers[constants.ConnectionPoolerUserKeyName]; !exists {
c.systemUsers[constants.ConnectionPoolUserKeyName] = connPoolUser c.systemUsers[constants.ConnectionPoolerUserKeyName] = connectionPoolerUser
} }
} }
} }
@ -1222,10 +1222,10 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error {
return c.deleteClusterObject(get, deleteConfigMapFn, "configmap") return c.deleteClusterObject(get, deleteConfigMapFn, "configmap")
} }
// Test if two connection pool configuration needs to be synced. For simplicity // Test if two connection pooler configuration needs to be synced. For simplicity
// compare not the actual K8S objects, but the configuration itself and request // compare not the actual K8S objects, but the configuration itself and request
// sync if there is any difference. // sync if there is any difference.
func (c *Cluster) needSyncConnPoolSpecs(oldSpec, newSpec *acidv1.ConnectionPool) (sync bool, reasons []string) { func (c *Cluster) needSyncConnectionPoolerSpecs(oldSpec, newSpec *acidv1.ConnectionPooler) (sync bool, reasons []string) {
reasons = []string{} reasons = []string{}
sync = false sync = false
@ -1262,21 +1262,21 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
return false return false
} }
// Check if we need to synchronize connection pool deployment due to new // Check if we need to synchronize connection pooler deployment due to new
// defaults, that are different from what we see in the DeploymentSpec // defaults, that are different from what we see in the DeploymentSpec
func (c *Cluster) needSyncConnPoolDefaults( func (c *Cluster) needSyncConnectionPoolerDefaults(
spec *acidv1.ConnectionPool, spec *acidv1.ConnectionPooler,
deployment *appsv1.Deployment) (sync bool, reasons []string) { deployment *appsv1.Deployment) (sync bool, reasons []string) {
reasons = []string{} reasons = []string{}
sync = false sync = false
config := c.OpConfig.ConnectionPool config := c.OpConfig.ConnectionPooler
podTemplate := deployment.Spec.Template podTemplate := deployment.Spec.Template
poolContainer := podTemplate.Spec.Containers[constants.ConnPoolContainer] poolerContainer := podTemplate.Spec.Containers[constants.ConnectionPoolerContainer]
if spec == nil { if spec == nil {
spec = &acidv1.ConnectionPool{} spec = &acidv1.ConnectionPooler{}
} }
if spec.NumberOfInstances == nil && if spec.NumberOfInstances == nil &&
@ -1289,25 +1289,25 @@ func (c *Cluster) needSyncConnPoolDefaults(
} }
if spec.DockerImage == "" && if spec.DockerImage == "" &&
poolContainer.Image != config.Image { poolerContainer.Image != config.Image {
sync = true sync = true
msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", msg := fmt.Sprintf("DockerImage is different (having %s, required %s)",
poolContainer.Image, config.Image) poolerContainer.Image, config.Image)
reasons = append(reasons, msg) reasons = append(reasons, msg)
} }
expectedResources, err := generateResourceRequirements(spec.Resources, expectedResources, err := generateResourceRequirements(spec.Resources,
c.makeDefaultConnPoolResources()) c.makeDefaultConnectionPoolerResources())
// An error to generate expected resources means something is not quite // An error to generate expected resources means something is not quite
// right, but for the purpose of robustness do not panic here, just report // right, but for the purpose of robustness do not panic here, just report
// and ignore resources comparison (in the worst case there will be no // and ignore resources comparison (in the worst case there will be no
// updates for new resource values). // updates for new resource values).
if err == nil && syncResources(&poolContainer.Resources, expectedResources) { if err == nil && syncResources(&poolerContainer.Resources, expectedResources) {
sync = true sync = true
msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", msg := fmt.Sprintf("Resources are different (having %+v, required %+v)",
poolContainer.Resources, expectedResources) poolerContainer.Resources, expectedResources)
reasons = append(reasons, msg) reasons = append(reasons, msg)
} }
@ -1315,13 +1315,13 @@ func (c *Cluster) needSyncConnPoolDefaults(
c.logger.Warningf("Cannot generate expected resources, %v", err) c.logger.Warningf("Cannot generate expected resources, %v", err)
} }
for _, env := range poolContainer.Env { for _, env := range poolerContainer.Env {
if spec.User == "" && env.Name == "PGUSER" { if spec.User == "" && env.Name == "PGUSER" {
ref := env.ValueFrom.SecretKeyRef.LocalObjectReference ref := env.ValueFrom.SecretKeyRef.LocalObjectReference
if ref.Name != c.credentialSecretName(config.User) { if ref.Name != c.credentialSecretName(config.User) {
sync = true sync = true
msg := fmt.Sprintf("Pool user is different (having %s, required %s)", msg := fmt.Sprintf("pooler user is different (having %s, required %s)",
ref.Name, config.User) ref.Name, config.User)
reasons = append(reasons, msg) reasons = append(reasons, msg)
} }
@ -1329,7 +1329,7 @@ func (c *Cluster) needSyncConnPoolDefaults(
if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema { if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema {
sync = true sync = true
msg := fmt.Sprintf("Pool schema is different (having %s, required %s)", msg := fmt.Sprintf("pooler schema is different (having %s, required %s)",
env.Value, config.Schema) env.Value, config.Schema)
reasons = append(reasons, msg) reasons = append(reasons, msg)
} }

View File

@ -709,16 +709,16 @@ func TestServiceAnnotations(t *testing.T) {
func TestInitSystemUsers(t *testing.T) { func TestInitSystemUsers(t *testing.T) {
testName := "Test system users initialization" testName := "Test system users initialization"
// default cluster without connection pool // default cluster without connection pooler
cl.initSystemUsers() cl.initSystemUsers()
if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; exist { if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; exist {
t.Errorf("%s, connection pool user is present", testName) t.Errorf("%s, connection pooler user is present", testName)
} }
// cluster with connection pool // cluster with connection pooler
cl.Spec.EnableConnectionPool = boolToPointer(true) cl.Spec.EnableConnectionPooler = boolToPointer(true)
cl.initSystemUsers() cl.initSystemUsers()
if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; !exist { if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; !exist {
t.Errorf("%s, connection pool user is not present", testName) t.Errorf("%s, connection pooler user is not present", testName)
} }
} }

View File

@ -30,10 +30,10 @@ const (
getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;`
createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";`
alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";`
connectionPoolLookup = ` connectionPoolerLookup = `
CREATE SCHEMA IF NOT EXISTS {{.pool_schema}}; CREATE SCHEMA IF NOT EXISTS {{.pooler_schema}};
CREATE OR REPLACE FUNCTION {{.pool_schema}}.user_lookup( CREATE OR REPLACE FUNCTION {{.pooler_schema}}.user_lookup(
in i_username text, out uname text, out phash text) in i_username text, out uname text, out phash text)
RETURNS record AS $$ RETURNS record AS $$
BEGIN BEGIN
@ -43,11 +43,11 @@ const (
END; END;
$$ LANGUAGE plpgsql SECURITY DEFINER; $$ LANGUAGE plpgsql SECURITY DEFINER;
REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text) REVOKE ALL ON FUNCTION {{.pooler_schema}}.user_lookup(text)
FROM public, {{.pool_user}}; FROM public, {{.pooler_user}};
GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text) GRANT EXECUTE ON FUNCTION {{.pooler_schema}}.user_lookup(text)
TO {{.pool_user}}; TO {{.pooler_user}};
GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}}; GRANT USAGE ON SCHEMA {{.pooler_schema}} TO {{.pooler_user}};
` `
) )
@ -278,9 +278,9 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin
return result return result
} }
// Creates a connection pool credentials lookup function in every database to // Creates a connection pooler credentials lookup function in every database to
// perform remote authentification. // perform remote authentication.
func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error { func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
var stmtBytes bytes.Buffer var stmtBytes bytes.Buffer
c.logger.Info("Installing lookup function") c.logger.Info("Installing lookup function")
@ -299,11 +299,11 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
currentDatabases, err := c.getDatabases() currentDatabases, err := c.getDatabases()
if err != nil { if err != nil {
msg := "could not get databases to install pool lookup function: %v" msg := "could not get databases to install pooler lookup function: %v"
return fmt.Errorf(msg, err) return fmt.Errorf(msg, err)
} }
templater := template.Must(template.New("sql").Parse(connectionPoolLookup)) templater := template.Must(template.New("sql").Parse(connectionPoolerLookup))
for dbname, _ := range currentDatabases { for dbname, _ := range currentDatabases {
if dbname == "template0" || dbname == "template1" { if dbname == "template0" || dbname == "template1" {
@ -314,11 +314,11 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
return fmt.Errorf("could not init database connection to %s", dbname) return fmt.Errorf("could not init database connection to %s", dbname)
} }
c.logger.Infof("Install pool lookup function into %s", dbname) c.logger.Infof("Install pooler lookup function into %s", dbname)
params := TemplateParams{ params := TemplateParams{
"pool_schema": poolSchema, "pooler_schema": poolerSchema,
"pool_user": poolUser, "pooler_user": poolerUser,
} }
if err := templater.Execute(&stmtBytes, params); err != nil { if err := templater.Execute(&stmtBytes, params); err != nil {
@ -353,12 +353,12 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
continue continue
} }
c.logger.Infof("Pool lookup function installed into %s", dbname) c.logger.Infof("pooler lookup function installed into %s", dbname)
if err := c.closeDbConn(); err != nil { if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err) c.logger.Errorf("could not close database connection: %v", err)
} }
} }
c.ConnectionPool.LookupFunction = true c.ConnectionPooler.LookupFunction = true
return nil return nil
} }

View File

@ -34,7 +34,7 @@ const (
patroniPGParametersParameterName = "parameters" patroniPGParametersParameterName = "parameters"
patroniPGHBAConfParameterName = "pg_hba" patroniPGHBAConfParameterName = "pg_hba"
localHost = "127.0.0.1/32" localHost = "127.0.0.1/32"
connectionPoolContainer = "connection-pool" connectionPoolerContainer = "connection-pooler"
pgPort = 5432 pgPort = 5432
// the gid of the postgres user in the default spilo image // the gid of the postgres user in the default spilo image
@ -74,7 +74,7 @@ func (c *Cluster) statefulSetName() string {
return c.Name return c.Name
} }
func (c *Cluster) connPoolName() string { func (c *Cluster) connectionPoolerName() string {
return c.Name + "-pooler" return c.Name + "-pooler"
} }
@ -141,18 +141,18 @@ func (c *Cluster) makeDefaultResources() acidv1.Resources {
} }
} }
// Generate default resource section for connection pool deployment, to be used // Generate default resource section for connection pooler deployment, to be
// if nothing custom is specified in the manifest // used if nothing custom is specified in the manifest
func (c *Cluster) makeDefaultConnPoolResources() acidv1.Resources { func (c *Cluster) makeDefaultConnectionPoolerResources() acidv1.Resources {
config := c.OpConfig config := c.OpConfig
defaultRequests := acidv1.ResourceDescription{ defaultRequests := acidv1.ResourceDescription{
CPU: config.ConnectionPool.ConnPoolDefaultCPURequest, CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPURequest,
Memory: config.ConnectionPool.ConnPoolDefaultMemoryRequest, Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest,
} }
defaultLimits := acidv1.ResourceDescription{ defaultLimits := acidv1.ResourceDescription{
CPU: config.ConnectionPool.ConnPoolDefaultCPULimit, CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPULimit,
Memory: config.ConnectionPool.ConnPoolDefaultMemoryLimit, Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit,
} }
return acidv1.Resources{ return acidv1.Resources{
@ -1852,33 +1852,33 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) {
// //
// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when // DEFAULT_SIZE is a pool size per db/user (having in mind the use case when
// most of the queries coming through a connection pooler are from the same // most of the queries coming through a connection pooler are from the same
// user to the same db). In case if we want to spin up more connection pool // user to the same db). In case if we want to spin up more connection pooler
// instances, take this into account and maintain the same number of // instances, take this into account and maintain the same number of
// connections. // connections.
// //
// MIN_SIZE is a pool minimal size, to prevent situation when sudden workload // MIN_SIZE is a pool's minimal size, to prevent situation when sudden workload
// have to wait for spinning up a new connections. // have to wait for spinning up a new connections.
// //
// RESERVE_SIZE is how many additional connections to allow for a pool. // RESERVE_SIZE is how many additional connections to allow for a pooler.
func (c *Cluster) getConnPoolEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar { func (c *Cluster) getConnectionPoolerEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar {
effectiveMode := util.Coalesce( effectiveMode := util.Coalesce(
spec.ConnectionPool.Mode, spec.ConnectionPooler.Mode,
c.OpConfig.ConnectionPool.Mode) c.OpConfig.ConnectionPooler.Mode)
numberOfInstances := spec.ConnectionPool.NumberOfInstances numberOfInstances := spec.ConnectionPooler.NumberOfInstances
if numberOfInstances == nil { if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32( numberOfInstances = util.CoalesceInt32(
c.OpConfig.ConnectionPool.NumberOfInstances, c.OpConfig.ConnectionPooler.NumberOfInstances,
k8sutil.Int32ToPointer(1)) k8sutil.Int32ToPointer(1))
} }
effectiveMaxDBConn := util.CoalesceInt32( effectiveMaxDBConn := util.CoalesceInt32(
spec.ConnectionPool.MaxDBConnections, spec.ConnectionPooler.MaxDBConnections,
c.OpConfig.ConnectionPool.MaxDBConnections) c.OpConfig.ConnectionPooler.MaxDBConnections)
if effectiveMaxDBConn == nil { if effectiveMaxDBConn == nil {
effectiveMaxDBConn = k8sutil.Int32ToPointer( effectiveMaxDBConn = k8sutil.Int32ToPointer(
constants.ConnPoolMaxDBConnections) constants.ConnectionPoolerMaxDBConnections)
} }
maxDBConn := *effectiveMaxDBConn / *numberOfInstances maxDBConn := *effectiveMaxDBConn / *numberOfInstances
@ -1889,51 +1889,51 @@ func (c *Cluster) getConnPoolEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar {
return []v1.EnvVar{ return []v1.EnvVar{
{ {
Name: "CONNECTION_POOL_PORT", Name: "CONNECTION_POOLER_PORT",
Value: fmt.Sprint(pgPort), Value: fmt.Sprint(pgPort),
}, },
{ {
Name: "CONNECTION_POOL_MODE", Name: "CONNECTION_POOLER_MODE",
Value: effectiveMode, Value: effectiveMode,
}, },
{ {
Name: "CONNECTION_POOL_DEFAULT_SIZE", Name: "CONNECTION_POOLER_DEFAULT_SIZE",
Value: fmt.Sprint(defaultSize), Value: fmt.Sprint(defaultSize),
}, },
{ {
Name: "CONNECTION_POOL_MIN_SIZE", Name: "CONNECTION_POOLER_MIN_SIZE",
Value: fmt.Sprint(minSize), Value: fmt.Sprint(minSize),
}, },
{ {
Name: "CONNECTION_POOL_RESERVE_SIZE", Name: "CONNECTION_POOLER_RESERVE_SIZE",
Value: fmt.Sprint(reserveSize), Value: fmt.Sprint(reserveSize),
}, },
{ {
Name: "CONNECTION_POOL_MAX_CLIENT_CONN", Name: "CONNECTION_POOLER_MAX_CLIENT_CONN",
Value: fmt.Sprint(constants.ConnPoolMaxClientConnections), Value: fmt.Sprint(constants.ConnectionPoolerMaxClientConnections),
}, },
{ {
Name: "CONNECTION_POOL_MAX_DB_CONN", Name: "CONNECTION_POOLER_MAX_DB_CONN",
Value: fmt.Sprint(maxDBConn), Value: fmt.Sprint(maxDBConn),
}, },
} }
} }
func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( func (c *Cluster) generateConnectionPoolerPodTemplate(spec *acidv1.PostgresSpec) (
*v1.PodTemplateSpec, error) { *v1.PodTemplateSpec, error) {
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
resources, err := generateResourceRequirements( resources, err := generateResourceRequirements(
spec.ConnectionPool.Resources, spec.ConnectionPooler.Resources,
c.makeDefaultConnPoolResources()) c.makeDefaultConnectionPoolerResources())
effectiveDockerImage := util.Coalesce( effectiveDockerImage := util.Coalesce(
spec.ConnectionPool.DockerImage, spec.ConnectionPooler.DockerImage,
c.OpConfig.ConnectionPool.Image) c.OpConfig.ConnectionPooler.Image)
effectiveSchema := util.Coalesce( effectiveSchema := util.Coalesce(
spec.ConnectionPool.Schema, spec.ConnectionPooler.Schema,
c.OpConfig.ConnectionPool.Schema) c.OpConfig.ConnectionPooler.Schema)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not generate resource requirements: %v", err) return nil, fmt.Errorf("could not generate resource requirements: %v", err)
@ -1941,8 +1941,8 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) (
secretSelector := func(key string) *v1.SecretKeySelector { secretSelector := func(key string) *v1.SecretKeySelector {
effectiveUser := util.Coalesce( effectiveUser := util.Coalesce(
spec.ConnectionPool.User, spec.ConnectionPooler.User,
c.OpConfig.ConnectionPool.User) c.OpConfig.ConnectionPooler.User)
return &v1.SecretKeySelector{ return &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{ LocalObjectReference: v1.LocalObjectReference{
@ -1968,7 +1968,7 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) (
}, },
}, },
// the convention is to use the same schema name as // the convention is to use the same schema name as
// connection pool username // connection pooler username
{ {
Name: "PGSCHEMA", Name: "PGSCHEMA",
Value: effectiveSchema, Value: effectiveSchema,
@ -1981,10 +1981,10 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) (
}, },
} }
envVars = append(envVars, c.getConnPoolEnvVars(spec)...) envVars = append(envVars, c.getConnectionPoolerEnvVars(spec)...)
poolerContainer := v1.Container{ poolerContainer := v1.Container{
Name: connectionPoolContainer, Name: connectionPoolerContainer,
Image: effectiveDockerImage, Image: effectiveDockerImage,
ImagePullPolicy: v1.PullIfNotPresent, ImagePullPolicy: v1.PullIfNotPresent,
Resources: *resources, Resources: *resources,
@ -1999,7 +1999,7 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) (
podTemplate := &v1.PodTemplateSpec{ podTemplate := &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: c.connPoolLabelsSelector().MatchLabels, Labels: c.connectionPoolerLabelsSelector().MatchLabels,
Namespace: c.Namespace, Namespace: c.Namespace,
Annotations: c.generatePodAnnotations(spec), Annotations: c.generatePodAnnotations(spec),
}, },
@ -2041,32 +2041,32 @@ func (c *Cluster) ownerReferences() []metav1.OwnerReference {
} }
} }
func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec) (
*appsv1.Deployment, error) { *appsv1.Deployment, error) {
// there are two ways to enable connection pooler, either to specify a // there are two ways to enable connection pooler, either to specify a
// connectionPool section or enableConnectionPool. In the second case // connectionPooler section or enableConnectionPooler. In the second case
// spec.connectionPool will be nil, so to make it easier to calculate // spec.connectionPooler will be nil, so to make it easier to calculate
// default values, initialize it to an empty structure. It could be done // default values, initialize it to an empty structure. It could be done
// anywhere, but here is the earliest common entry point between sync and // anywhere, but here is the earliest common entry point between sync and
// create code, so init here. // create code, so init here.
if spec.ConnectionPool == nil { if spec.ConnectionPooler == nil {
spec.ConnectionPool = &acidv1.ConnectionPool{} spec.ConnectionPooler = &acidv1.ConnectionPooler{}
} }
podTemplate, err := c.generateConnPoolPodTemplate(spec) podTemplate, err := c.generateConnectionPoolerPodTemplate(spec)
numberOfInstances := spec.ConnectionPool.NumberOfInstances numberOfInstances := spec.ConnectionPooler.NumberOfInstances
if numberOfInstances == nil { if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32( numberOfInstances = util.CoalesceInt32(
c.OpConfig.ConnectionPool.NumberOfInstances, c.OpConfig.ConnectionPooler.NumberOfInstances,
k8sutil.Int32ToPointer(1)) k8sutil.Int32ToPointer(1))
} }
if *numberOfInstances < constants.ConnPoolMinInstances { if *numberOfInstances < constants.ConnectionPoolerMinInstances {
msg := "Adjusted number of connection pool instances from %d to %d" msg := "Adjusted number of connection pooler instances from %d to %d"
c.logger.Warningf(msg, numberOfInstances, constants.ConnPoolMinInstances) c.logger.Warningf(msg, numberOfInstances, constants.ConnectionPoolerMinInstances)
*numberOfInstances = constants.ConnPoolMinInstances *numberOfInstances = constants.ConnectionPoolerMinInstances
} }
if err != nil { if err != nil {
@ -2075,9 +2075,9 @@ func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) (
deployment := &appsv1.Deployment{ deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.connPoolName(), Name: c.connectionPoolerName(),
Namespace: c.Namespace, Namespace: c.Namespace,
Labels: c.connPoolLabelsSelector().MatchLabels, Labels: c.connectionPoolerLabelsSelector().MatchLabels,
Annotations: map[string]string{}, Annotations: map[string]string{},
// make StatefulSet object its owner to represent the dependency. // make StatefulSet object its owner to represent the dependency.
// By itself StatefulSet is being deleted with "Orphaned" // By itself StatefulSet is being deleted with "Orphaned"
@ -2089,7 +2089,7 @@ func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) (
}, },
Spec: appsv1.DeploymentSpec{ Spec: appsv1.DeploymentSpec{
Replicas: numberOfInstances, Replicas: numberOfInstances,
Selector: c.connPoolLabelsSelector(), Selector: c.connectionPoolerLabelsSelector(),
Template: *podTemplate, Template: *podTemplate,
}, },
} }
@ -2097,37 +2097,37 @@ func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) (
return deployment, nil return deployment, nil
} }
func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service { func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service {
// there are two ways to enable connection pooler, either to specify a // there are two ways to enable connection pooler, either to specify a
// connectionPool section or enableConnectionPool. In the second case // connectionPooler section or enableConnectionPooler. In the second case
// spec.connectionPool will be nil, so to make it easier to calculate // spec.connectionPooler will be nil, so to make it easier to calculate
// default values, initialize it to an empty structure. It could be done // default values, initialize it to an empty structure. It could be done
// anywhere, but here is the earliest common entry point between sync and // anywhere, but here is the earliest common entry point between sync and
// create code, so init here. // create code, so init here.
if spec.ConnectionPool == nil { if spec.ConnectionPooler == nil {
spec.ConnectionPool = &acidv1.ConnectionPool{} spec.ConnectionPooler = &acidv1.ConnectionPooler{}
} }
serviceSpec := v1.ServiceSpec{ serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{ Ports: []v1.ServicePort{
{ {
Name: c.connPoolName(), Name: c.connectionPoolerName(),
Port: pgPort, Port: pgPort,
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)},
}, },
}, },
Type: v1.ServiceTypeClusterIP, Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{ Selector: map[string]string{
"connection-pool": c.connPoolName(), "connection-pooler": c.connectionPoolerName(),
}, },
} }
service := &v1.Service{ service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.connPoolName(), Name: c.connectionPoolerName(),
Namespace: c.Namespace, Namespace: c.Namespace,
Labels: c.connPoolLabelsSelector().MatchLabels, Labels: c.connectionPoolerLabelsSelector().MatchLabels,
Annotations: map[string]string{}, Annotations: map[string]string{},
// make StatefulSet object its owner to represent the dependency. // make StatefulSet object its owner to represent the dependency.
// By itself StatefulSet is being deleted with "Orphaned" // By itself StatefulSet is being deleted with "Orphaned"

View File

@ -587,38 +587,38 @@ func TestSecretVolume(t *testing.T) {
func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec) error {
cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"] cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"]
if cpuReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest { if cpuReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest {
return fmt.Errorf("CPU request doesn't match, got %s, expected %s", return fmt.Errorf("CPU request doesn't match, got %s, expected %s",
cpuReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest) cpuReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest)
} }
memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"] memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"]
if memReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest { if memReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest {
return fmt.Errorf("Memory request doesn't match, got %s, expected %s", return fmt.Errorf("Memory request doesn't match, got %s, expected %s",
memReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest) memReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest)
} }
cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"] cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"]
if cpuLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit { if cpuLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit {
return fmt.Errorf("CPU limit doesn't match, got %s, expected %s", return fmt.Errorf("CPU limit doesn't match, got %s, expected %s",
cpuLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit) cpuLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit)
} }
memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"] memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"]
if memLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit { if memLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit {
return fmt.Errorf("Memory limit doesn't match, got %s, expected %s", return fmt.Errorf("Memory limit doesn't match, got %s, expected %s",
memLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit) memLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit)
} }
return nil return nil
} }
func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec) error {
poolLabels := podSpec.ObjectMeta.Labels["connection-pool"] poolerLabels := podSpec.ObjectMeta.Labels["connection-pooler"]
if poolLabels != cluster.connPoolLabelsSelector().MatchLabels["connection-pool"] { if poolerLabels != cluster.connectionPoolerLabelsSelector().MatchLabels["connection-pooler"] {
return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", return fmt.Errorf("Pod labels do not match, got %+v, expected %+v",
podSpec.ObjectMeta.Labels, cluster.connPoolLabelsSelector().MatchLabels) podSpec.ObjectMeta.Labels, cluster.connectionPoolerLabelsSelector().MatchLabels)
} }
return nil return nil
@ -631,8 +631,8 @@ func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec) error {
"PGUSER": false, "PGUSER": false,
"PGSCHEMA": false, "PGSCHEMA": false,
"PGPASSWORD": false, "PGPASSWORD": false,
"CONNECTION_POOL_MODE": false, "CONNECTION_POOLER_MODE": false,
"CONNECTION_POOL_PORT": false, "CONNECTION_POOLER_PORT": false,
} }
envs := podSpec.Spec.Containers[0].Env envs := podSpec.Spec.Containers[0].Env
@ -658,8 +658,8 @@ func testCustomPodTemplate(cluster *Cluster, podSpec *v1.PodTemplateSpec) error
return nil return nil
} }
func TestConnPoolPodSpec(t *testing.T) { func TestConnectionPoolerPodSpec(t *testing.T) {
testName := "Test connection pool pod template generation" testName := "Test connection pooler pod template generation"
var cluster = New( var cluster = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
@ -668,12 +668,12 @@ func TestConnPoolPodSpec(t *testing.T) {
SuperUsername: superUserName, SuperUsername: superUserName,
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
ConnectionPool: config.ConnectionPool{ ConnectionPooler: config.ConnectionPooler{
MaxDBConnections: int32ToPointer(60), MaxDBConnections: int32ToPointer(60),
ConnPoolDefaultCPURequest: "100m", ConnectionPoolerDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m", ConnectionPoolerDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
@ -686,7 +686,7 @@ func TestConnPoolPodSpec(t *testing.T) {
SuperUsername: superUserName, SuperUsername: superUserName,
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
ConnectionPool: config.ConnectionPool{}, ConnectionPooler: config.ConnectionPooler{},
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
@ -702,7 +702,7 @@ func TestConnPoolPodSpec(t *testing.T) {
{ {
subTest: "default configuration", subTest: "default configuration",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
expected: nil, expected: nil,
cluster: cluster, cluster: cluster,
@ -711,7 +711,7 @@ func TestConnPoolPodSpec(t *testing.T) {
{ {
subTest: "no default resources", subTest: "no default resources",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`),
cluster: clusterNoDefaultRes, cluster: clusterNoDefaultRes,
@ -720,7 +720,7 @@ func TestConnPoolPodSpec(t *testing.T) {
{ {
subTest: "default resources are set", subTest: "default resources are set",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
expected: nil, expected: nil,
cluster: cluster, cluster: cluster,
@ -729,7 +729,7 @@ func TestConnPoolPodSpec(t *testing.T) {
{ {
subTest: "labels for service", subTest: "labels for service",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
expected: nil, expected: nil,
cluster: cluster, cluster: cluster,
@ -738,7 +738,7 @@ func TestConnPoolPodSpec(t *testing.T) {
{ {
subTest: "required envs", subTest: "required envs",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
expected: nil, expected: nil,
cluster: cluster, cluster: cluster,
@ -746,7 +746,7 @@ func TestConnPoolPodSpec(t *testing.T) {
}, },
} }
for _, tt := range tests { for _, tt := range tests {
podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec) podSpec, err := tt.cluster.generateConnectionPoolerPodTemplate(tt.spec)
if err != tt.expected && err.Error() != tt.expected.Error() { if err != tt.expected && err.Error() != tt.expected.Error() {
t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v",
@ -774,9 +774,9 @@ func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployme
func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error {
labels := deployment.Spec.Selector.MatchLabels labels := deployment.Spec.Selector.MatchLabels
expected := cluster.connPoolLabelsSelector().MatchLabels expected := cluster.connectionPoolerLabelsSelector().MatchLabels
if labels["connection-pool"] != expected["connection-pool"] { if labels["connection-pooler"] != expected["connection-pooler"] {
return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", return fmt.Errorf("Labels are incorrect, got %+v, expected %+v",
labels, expected) labels, expected)
} }
@ -784,8 +784,8 @@ func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error {
return nil return nil
} }
func TestConnPoolDeploymentSpec(t *testing.T) { func TestConnectionPoolerDeploymentSpec(t *testing.T) {
testName := "Test connection pool deployment spec generation" testName := "Test connection pooler deployment spec generation"
var cluster = New( var cluster = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
@ -794,11 +794,11 @@ func TestConnPoolDeploymentSpec(t *testing.T) {
SuperUsername: superUserName, SuperUsername: superUserName,
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
ConnectionPool: config.ConnectionPool{ ConnectionPooler: config.ConnectionPooler{
ConnPoolDefaultCPURequest: "100m", ConnectionPoolerDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m", ConnectionPoolerDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
@ -822,7 +822,7 @@ func TestConnPoolDeploymentSpec(t *testing.T) {
{ {
subTest: "default configuration", subTest: "default configuration",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
expected: nil, expected: nil,
cluster: cluster, cluster: cluster,
@ -831,7 +831,7 @@ func TestConnPoolDeploymentSpec(t *testing.T) {
{ {
subTest: "owner reference", subTest: "owner reference",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
expected: nil, expected: nil,
cluster: cluster, cluster: cluster,
@ -840,7 +840,7 @@ func TestConnPoolDeploymentSpec(t *testing.T) {
{ {
subTest: "selector", subTest: "selector",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
expected: nil, expected: nil,
cluster: cluster, cluster: cluster,
@ -848,7 +848,7 @@ func TestConnPoolDeploymentSpec(t *testing.T) {
}, },
} }
for _, tt := range tests { for _, tt := range tests {
deployment, err := tt.cluster.generateConnPoolDeployment(tt.spec) deployment, err := tt.cluster.generateConnectionPoolerDeployment(tt.spec)
if err != tt.expected && err.Error() != tt.expected.Error() { if err != tt.expected && err.Error() != tt.expected.Error() {
t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v",
@ -877,16 +877,16 @@ func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error {
func testServiceSelector(cluster *Cluster, service *v1.Service) error { func testServiceSelector(cluster *Cluster, service *v1.Service) error {
selector := service.Spec.Selector selector := service.Spec.Selector
if selector["connection-pool"] != cluster.connPoolName() { if selector["connection-pooler"] != cluster.connectionPoolerName() {
return fmt.Errorf("Selector is incorrect, got %s, expected %s", return fmt.Errorf("Selector is incorrect, got %s, expected %s",
selector["connection-pool"], cluster.connPoolName()) selector["connection-pooler"], cluster.connectionPoolerName())
} }
return nil return nil
} }
func TestConnPoolServiceSpec(t *testing.T) { func TestConnectionPoolerServiceSpec(t *testing.T) {
testName := "Test connection pool service spec generation" testName := "Test connection pooler service spec generation"
var cluster = New( var cluster = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
@ -895,11 +895,11 @@ func TestConnPoolServiceSpec(t *testing.T) {
SuperUsername: superUserName, SuperUsername: superUserName,
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
ConnectionPool: config.ConnectionPool{ ConnectionPooler: config.ConnectionPooler{
ConnPoolDefaultCPURequest: "100m", ConnectionPoolerDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m", ConnectionPoolerDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
@ -922,7 +922,7 @@ func TestConnPoolServiceSpec(t *testing.T) {
{ {
subTest: "default configuration", subTest: "default configuration",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
cluster: cluster, cluster: cluster,
check: noCheck, check: noCheck,
@ -930,7 +930,7 @@ func TestConnPoolServiceSpec(t *testing.T) {
{ {
subTest: "owner reference", subTest: "owner reference",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
cluster: cluster, cluster: cluster,
check: testServiceOwnwerReference, check: testServiceOwnwerReference,
@ -938,14 +938,14 @@ func TestConnPoolServiceSpec(t *testing.T) {
{ {
subTest: "selector", subTest: "selector",
spec: &acidv1.PostgresSpec{ spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
cluster: cluster, cluster: cluster,
check: testServiceSelector, check: testServiceSelector,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
service := tt.cluster.generateConnPoolService(tt.spec) service := tt.cluster.generateConnectionPoolerService(tt.spec)
if err := tt.check(cluster, service); err != nil { if err := tt.check(cluster, service); err != nil {
t.Errorf("%s [%s]: Service spec is incorrect, %+v", t.Errorf("%s [%s]: Service spec is incorrect, %+v",

View File

@ -90,37 +90,37 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) {
return statefulSet, nil return statefulSet, nil
} }
// Prepare the database for connection pool to be used, i.e. install lookup // Prepare the database for connection pooler to be used, i.e. install lookup
// function (do it first, because it should be fast and if it didn't succeed, // function (do it first, because it should be fast and if it didn't succeed,
// it doesn't makes sense to create more K8S objects. At this moment we assume // it doesn't makes sense to create more K8S objects. At this moment we assume
// that necessary connection pool user exists. // that necessary connection pooler user exists.
// //
// After that create all the objects for connection pool, namely a deployment // After that create all the objects for connection pooler, namely a deployment
// with a chosen pooler and a service to expose it. // with a chosen pooler and a service to expose it.
func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolObjects, error) { func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoolerObjects, error) {
var msg string var msg string
c.setProcessName("creating connection pool") c.setProcessName("creating connection pooler")
schema := c.Spec.ConnectionPool.Schema schema := c.Spec.ConnectionPooler.Schema
if schema == "" { if schema == "" {
schema = c.OpConfig.ConnectionPool.Schema schema = c.OpConfig.ConnectionPooler.Schema
} }
user := c.Spec.ConnectionPool.User user := c.Spec.ConnectionPooler.User
if user == "" { if user == "" {
user = c.OpConfig.ConnectionPool.User user = c.OpConfig.ConnectionPooler.User
} }
err := lookup(schema, user) err := lookup(schema, user)
if err != nil { if err != nil {
msg = "could not prepare database for connection pool: %v" msg = "could not prepare database for connection pooler: %v"
return nil, fmt.Errorf(msg, err) return nil, fmt.Errorf(msg, err)
} }
deploymentSpec, err := c.generateConnPoolDeployment(&c.Spec) deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec)
if err != nil { if err != nil {
msg = "could not generate deployment for connection pool: %v" msg = "could not generate deployment for connection pooler: %v"
return nil, fmt.Errorf(msg, err) return nil, fmt.Errorf(msg, err)
} }
@ -135,7 +135,7 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolO
return nil, err return nil, err
} }
serviceSpec := c.generateConnPoolService(&c.Spec) serviceSpec := c.generateConnectionPoolerService(&c.Spec)
service, err := c.KubeClient. service, err := c.KubeClient.
Services(serviceSpec.Namespace). Services(serviceSpec.Namespace).
Create(serviceSpec) Create(serviceSpec)
@ -144,31 +144,31 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolO
return nil, err return nil, err
} }
c.ConnectionPool = &ConnectionPoolObjects{ c.ConnectionPooler = &ConnectionPoolerObjects{
Deployment: deployment, Deployment: deployment,
Service: service, Service: service,
} }
c.logger.Debugf("created new connection pool %q, uid: %q", c.logger.Debugf("created new connection pooler %q, uid: %q",
util.NameFromMeta(deployment.ObjectMeta), deployment.UID) util.NameFromMeta(deployment.ObjectMeta), deployment.UID)
return c.ConnectionPool, nil return c.ConnectionPooler, nil
} }
func (c *Cluster) deleteConnectionPool() (err error) { func (c *Cluster) deleteConnectionPooler() (err error) {
c.setProcessName("deleting connection pool") c.setProcessName("deleting connection pooler")
c.logger.Debugln("deleting connection pool") c.logger.Debugln("deleting connection pooler")
// Lack of connection pooler objects is not a fatal error, just log it if // Lack of connection pooler objects is not a fatal error, just log it if
// it was present before in the manifest // it was present before in the manifest
if c.ConnectionPool == nil { if c.ConnectionPooler == nil {
c.logger.Infof("No connection pool to delete") c.logger.Infof("No connection pooler to delete")
return nil return nil
} }
// Clean up the deployment object. If deployment resource we've remembered // Clean up the deployment object. If deployment resource we've remembered
// is somehow empty, try to delete based on what would we generate // is somehow empty, try to delete based on what would we generate
deploymentName := c.connPoolName() deploymentName := c.connectionPoolerName()
deployment := c.ConnectionPool.Deployment deployment := c.ConnectionPooler.Deployment
if deployment != nil { if deployment != nil {
deploymentName = deployment.Name deploymentName = deployment.Name
@ -183,16 +183,16 @@ func (c *Cluster) deleteConnectionPool() (err error) {
Delete(deploymentName, &options) Delete(deploymentName, &options)
if !k8sutil.ResourceNotFound(err) { if !k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Connection pool deployment was already deleted") c.logger.Debugf("Connection pooler deployment was already deleted")
} else if err != nil { } else if err != nil {
return fmt.Errorf("could not delete deployment: %v", err) return fmt.Errorf("could not delete deployment: %v", err)
} }
c.logger.Infof("Connection pool deployment %q has been deleted", deploymentName) c.logger.Infof("Connection pooler deployment %q has been deleted", deploymentName)
// Repeat the same for the service object // Repeat the same for the service object
service := c.ConnectionPool.Service service := c.ConnectionPooler.Service
serviceName := c.connPoolName() serviceName := c.connectionPoolerName()
if service != nil { if service != nil {
serviceName = service.Name serviceName = service.Name
@ -205,14 +205,14 @@ func (c *Cluster) deleteConnectionPool() (err error) {
Delete(serviceName, &options) Delete(serviceName, &options)
if !k8sutil.ResourceNotFound(err) { if !k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Connection pool service was already deleted") c.logger.Debugf("Connection pooler service was already deleted")
} else if err != nil { } else if err != nil {
return fmt.Errorf("could not delete service: %v", err) return fmt.Errorf("could not delete service: %v", err)
} }
c.logger.Infof("Connection pool service %q has been deleted", serviceName) c.logger.Infof("Connection pooler service %q has been deleted", serviceName)
c.ConnectionPool = nil c.ConnectionPooler = nil
return nil return nil
} }
@ -801,12 +801,12 @@ func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget {
return c.PodDisruptionBudget return c.PodDisruptionBudget
} }
// Perform actual patching of a connection pool deployment, assuming that all // Perform actual patching of a connection pooler deployment, assuming that all
// the check were already done before. // the check were already done before.
func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) {
c.setProcessName("updating connection pool") c.setProcessName("updating connection pooler")
if c.ConnectionPool == nil || c.ConnectionPool.Deployment == nil { if c.ConnectionPooler == nil || c.ConnectionPooler.Deployment == nil {
return nil, fmt.Errorf("there is no connection pool in the cluster") return nil, fmt.Errorf("there is no connection pooler in the cluster")
} }
patchData, err := specPatch(newDeployment.Spec) patchData, err := specPatch(newDeployment.Spec)
@ -818,16 +818,16 @@ func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *app
// worker at one time will try to update it chances of conflicts are // worker at one time will try to update it chances of conflicts are
// minimal. // minimal.
deployment, err := c.KubeClient. deployment, err := c.KubeClient.
Deployments(c.ConnectionPool.Deployment.Namespace). Deployments(c.ConnectionPooler.Deployment.Namespace).
Patch( Patch(
c.ConnectionPool.Deployment.Name, c.ConnectionPooler.Deployment.Name,
types.MergePatchType, types.MergePatchType,
patchData, "") patchData, "")
if err != nil { if err != nil {
return nil, fmt.Errorf("could not patch deployment: %v", err) return nil, fmt.Errorf("could not patch deployment: %v", err)
} }
c.ConnectionPool.Deployment = deployment c.ConnectionPooler.Deployment = deployment
return deployment, nil return deployment, nil
} }

View File

@ -19,8 +19,8 @@ func boolToPointer(value bool) *bool {
return &value return &value
} }
func TestConnPoolCreationAndDeletion(t *testing.T) { func TestConnectionPoolerCreationAndDeletion(t *testing.T) {
testName := "Test connection pool creation" testName := "Test connection pooler creation"
var cluster = New( var cluster = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
@ -29,11 +29,11 @@ func TestConnPoolCreationAndDeletion(t *testing.T) {
SuperUsername: superUserName, SuperUsername: superUserName,
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
ConnectionPool: config.ConnectionPool{ ConnectionPooler: config.ConnectionPooler{
ConnPoolDefaultCPURequest: "100m", ConnectionPoolerDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m", ConnectionPoolerDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger)
@ -45,31 +45,31 @@ func TestConnPoolCreationAndDeletion(t *testing.T) {
} }
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
} }
poolResources, err := cluster.createConnectionPool(mockInstallLookupFunction) poolerResources, err := cluster.createConnectionPooler(mockInstallLookupFunction)
if err != nil { if err != nil {
t.Errorf("%s: Cannot create connection pool, %s, %+v", t.Errorf("%s: Cannot create connection pooler, %s, %+v",
testName, err, poolResources) testName, err, poolerResources)
} }
if poolResources.Deployment == nil { if poolerResources.Deployment == nil {
t.Errorf("%s: Connection pool deployment is empty", testName) t.Errorf("%s: Connection pooler deployment is empty", testName)
} }
if poolResources.Service == nil { if poolerResources.Service == nil {
t.Errorf("%s: Connection pool service is empty", testName) t.Errorf("%s: Connection pooler service is empty", testName)
} }
err = cluster.deleteConnectionPool() err = cluster.deleteConnectionPooler()
if err != nil { if err != nil {
t.Errorf("%s: Cannot delete connection pool, %s", testName, err) t.Errorf("%s: Cannot delete connection pooler, %s", testName, err)
} }
} }
func TestNeedConnPool(t *testing.T) { func TestNeedConnectionPooler(t *testing.T) {
testName := "Test how connection pool can be enabled" testName := "Test how connection pooler can be enabled"
var cluster = New( var cluster = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
@ -78,50 +78,50 @@ func TestNeedConnPool(t *testing.T) {
SuperUsername: superUserName, SuperUsername: superUserName,
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
ConnectionPool: config.ConnectionPool{ ConnectionPooler: config.ConnectionPooler{
ConnPoolDefaultCPURequest: "100m", ConnectionPoolerDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m", ConnectionPoolerDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger)
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
} }
if !cluster.needConnectionPool() { if !cluster.needConnectionPooler() {
t.Errorf("%s: Connection pool is not enabled with full definition", t.Errorf("%s: Connection pooler is not enabled with full definition",
testName) testName)
} }
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
EnableConnectionPool: boolToPointer(true), EnableConnectionPooler: boolToPointer(true),
} }
if !cluster.needConnectionPool() { if !cluster.needConnectionPooler() {
t.Errorf("%s: Connection pool is not enabled with flag", t.Errorf("%s: Connection pooler is not enabled with flag",
testName) testName)
} }
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
EnableConnectionPool: boolToPointer(false), EnableConnectionPooler: boolToPointer(false),
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
} }
if cluster.needConnectionPool() { if cluster.needConnectionPooler() {
t.Errorf("%s: Connection pool is still enabled with flag being false", t.Errorf("%s: Connection pooler is still enabled with flag being false",
testName) testName)
} }
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
EnableConnectionPool: boolToPointer(true), EnableConnectionPooler: boolToPointer(true),
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
} }
if !cluster.needConnectionPool() { if !cluster.needConnectionPooler() {
t.Errorf("%s: Connection pool is not enabled with flag and full", t.Errorf("%s: Connection pooler is not enabled with flag and full",
testName) testName)
} }
} }

View File

@ -109,9 +109,9 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
} }
} }
// sync connection pool // sync connection pooler
if err = c.syncConnectionPool(&oldSpec, newSpec, c.installLookupFunction); err != nil { if err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil {
return fmt.Errorf("could not sync connection pool: %v", err) return fmt.Errorf("could not sync connection pooler: %v", err)
} }
return err return err
@ -477,12 +477,12 @@ func (c *Cluster) syncRoles() (err error) {
userNames = append(userNames, u.Name) userNames = append(userNames, u.Name)
} }
if c.needConnectionPool() { if c.needConnectionPooler() {
connPoolUser := c.systemUsers[constants.ConnectionPoolUserKeyName] connectionPoolerUser := c.systemUsers[constants.ConnectionPoolerUserKeyName]
userNames = append(userNames, connPoolUser.Name) userNames = append(userNames, connectionPoolerUser.Name)
if _, exists := c.pgUsers[connPoolUser.Name]; !exists { if _, exists := c.pgUsers[connectionPoolerUser.Name]; !exists {
c.pgUsers[connPoolUser.Name] = connPoolUser c.pgUsers[connectionPoolerUser.Name] = connectionPoolerUser
} }
} }
@ -619,69 +619,69 @@ func (c *Cluster) syncLogicalBackupJob() error {
return nil return nil
} }
func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error { func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error {
if c.ConnectionPool == nil { if c.ConnectionPooler == nil {
c.ConnectionPool = &ConnectionPoolObjects{} c.ConnectionPooler = &ConnectionPoolerObjects{}
} }
newNeedConnPool := c.needConnectionPoolWorker(&newSpec.Spec) newNeedConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec)
oldNeedConnPool := c.needConnectionPoolWorker(&oldSpec.Spec) oldNeedConnectionPooler := c.needConnectionPoolerWorker(&oldSpec.Spec)
if newNeedConnPool { if newNeedConnectionPooler {
// Try to sync in any case. If we didn't needed connection pool before, // Try to sync in any case. If we didn't needed connection pooler before,
// it means we want to create it. If it was already present, still sync // it means we want to create it. If it was already present, still sync
// since it could happen that there is no difference in specs, and all // since it could happen that there is no difference in specs, and all
// the resources are remembered, but the deployment was manualy deleted // the resources are remembered, but the deployment was manually deleted
// in between // in between
c.logger.Debug("syncing connection pool") c.logger.Debug("syncing connection pooler")
// in this case also do not forget to install lookup function as for // in this case also do not forget to install lookup function as for
// creating cluster // creating cluster
if !oldNeedConnPool || !c.ConnectionPool.LookupFunction { if !oldNeedConnectionPooler || !c.ConnectionPooler.LookupFunction {
newConnPool := newSpec.Spec.ConnectionPool newConnectionPooler := newSpec.Spec.ConnectionPooler
specSchema := "" specSchema := ""
specUser := "" specUser := ""
if newConnPool != nil { if newConnectionPooler != nil {
specSchema = newConnPool.Schema specSchema = newConnectionPooler.Schema
specUser = newConnPool.User specUser = newConnectionPooler.User
} }
schema := util.Coalesce( schema := util.Coalesce(
specSchema, specSchema,
c.OpConfig.ConnectionPool.Schema) c.OpConfig.ConnectionPooler.Schema)
user := util.Coalesce( user := util.Coalesce(
specUser, specUser,
c.OpConfig.ConnectionPool.User) c.OpConfig.ConnectionPooler.User)
if err := lookup(schema, user); err != nil { if err := lookup(schema, user); err != nil {
return err return err
} }
} }
if err := c.syncConnectionPoolWorker(oldSpec, newSpec); err != nil { if err := c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {
c.logger.Errorf("could not sync connection pool: %v", err) c.logger.Errorf("could not sync connection pooler: %v", err)
return err return err
} }
} }
if oldNeedConnPool && !newNeedConnPool { if oldNeedConnectionPooler && !newNeedConnectionPooler {
// delete and cleanup resources // delete and cleanup resources
if err := c.deleteConnectionPool(); err != nil { if err := c.deleteConnectionPooler(); err != nil {
c.logger.Warningf("could not remove connection pool: %v", err) c.logger.Warningf("could not remove connection pooler: %v", err)
} }
} }
if !oldNeedConnPool && !newNeedConnPool { if !oldNeedConnectionPooler && !newNeedConnectionPooler {
// delete and cleanup resources if not empty // delete and cleanup resources if not empty
if c.ConnectionPool != nil && if c.ConnectionPooler != nil &&
(c.ConnectionPool.Deployment != nil || (c.ConnectionPooler.Deployment != nil ||
c.ConnectionPool.Service != nil) { c.ConnectionPooler.Service != nil) {
if err := c.deleteConnectionPool(); err != nil { if err := c.deleteConnectionPooler(); err != nil {
c.logger.Warningf("could not remove connection pool: %v", err) c.logger.Warningf("could not remove connection pooler: %v", err)
} }
} }
} }
@ -689,22 +689,22 @@ func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup
return nil return nil
} }
// Synchronize connection pool resources. Effectively we're interested only in // Synchronize connection pooler resources. Effectively we're interested only in
// synchronizing the corresponding deployment, but in case of deployment or // synchronizing the corresponding deployment, but in case of deployment or
// service is missing, create it. After checking, also remember an object for // service is missing, create it. After checking, also remember an object for
// the future references. // the future references.
func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) error { func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) error {
deployment, err := c.KubeClient. deployment, err := c.KubeClient.
Deployments(c.Namespace). Deployments(c.Namespace).
Get(c.connPoolName(), metav1.GetOptions{}) Get(c.connectionPoolerName(), metav1.GetOptions{})
if err != nil && k8sutil.ResourceNotFound(err) { if err != nil && k8sutil.ResourceNotFound(err) {
msg := "Deployment %s for connection pool synchronization is not found, create it" msg := "Deployment %s for connection pooler synchronization is not found, create it"
c.logger.Warningf(msg, c.connPoolName()) c.logger.Warningf(msg, c.connectionPoolerName())
deploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
if err != nil { if err != nil {
msg = "could not generate deployment for connection pool: %v" msg = "could not generate deployment for connection pooler: %v"
return fmt.Errorf(msg, err) return fmt.Errorf(msg, err)
} }
@ -716,31 +716,31 @@ func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql)
return err return err
} }
c.ConnectionPool.Deployment = deployment c.ConnectionPooler.Deployment = deployment
} else if err != nil { } else if err != nil {
return fmt.Errorf("could not get connection pool deployment to sync: %v", err) return fmt.Errorf("could not get connection pooler deployment to sync: %v", err)
} else { } else {
c.ConnectionPool.Deployment = deployment c.ConnectionPooler.Deployment = deployment
// actual synchronization // actual synchronization
oldConnPool := oldSpec.Spec.ConnectionPool oldConnectionPooler := oldSpec.Spec.ConnectionPooler
newConnPool := newSpec.Spec.ConnectionPool newConnectionPooler := newSpec.Spec.ConnectionPooler
specSync, specReason := c.needSyncConnPoolSpecs(oldConnPool, newConnPool) specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler)
defaultsSync, defaultsReason := c.needSyncConnPoolDefaults(newConnPool, deployment) defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment)
reason := append(specReason, defaultsReason...) reason := append(specReason, defaultsReason...)
if specSync || defaultsSync { if specSync || defaultsSync {
c.logger.Infof("Update connection pool deployment %s, reason: %+v", c.logger.Infof("Update connection pooler deployment %s, reason: %+v",
c.connPoolName(), reason) c.connectionPoolerName(), reason)
newDeploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
if err != nil { if err != nil {
msg := "could not generate deployment for connection pool: %v" msg := "could not generate deployment for connection pooler: %v"
return fmt.Errorf(msg, err) return fmt.Errorf(msg, err)
} }
oldDeploymentSpec := c.ConnectionPool.Deployment oldDeploymentSpec := c.ConnectionPooler.Deployment
deployment, err := c.updateConnPoolDeployment( deployment, err := c.updateConnectionPoolerDeployment(
oldDeploymentSpec, oldDeploymentSpec,
newDeploymentSpec) newDeploymentSpec)
@ -748,20 +748,20 @@ func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql)
return err return err
} }
c.ConnectionPool.Deployment = deployment c.ConnectionPooler.Deployment = deployment
return nil return nil
} }
} }
service, err := c.KubeClient. service, err := c.KubeClient.
Services(c.Namespace). Services(c.Namespace).
Get(c.connPoolName(), metav1.GetOptions{}) Get(c.connectionPoolerName(), metav1.GetOptions{})
if err != nil && k8sutil.ResourceNotFound(err) { if err != nil && k8sutil.ResourceNotFound(err) {
msg := "Service %s for connection pool synchronization is not found, create it" msg := "Service %s for connection pooler synchronization is not found, create it"
c.logger.Warningf(msg, c.connPoolName()) c.logger.Warningf(msg, c.connectionPoolerName())
serviceSpec := c.generateConnPoolService(&newSpec.Spec) serviceSpec := c.generateConnectionPoolerService(&newSpec.Spec)
service, err := c.KubeClient. service, err := c.KubeClient.
Services(serviceSpec.Namespace). Services(serviceSpec.Namespace).
Create(serviceSpec) Create(serviceSpec)
@ -770,12 +770,12 @@ func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql)
return err return err
} }
c.ConnectionPool.Service = service c.ConnectionPooler.Service = service
} else if err != nil { } else if err != nil {
return fmt.Errorf("could not get connection pool service to sync: %v", err) return fmt.Errorf("could not get connection pooler service to sync: %v", err)
} else { } else {
// Service updates are not supported and probably not that useful anyway // Service updates are not supported and probably not that useful anyway
c.ConnectionPool.Service = service c.ConnectionPooler.Service = service
} }
return nil return nil

View File

@ -18,8 +18,8 @@ func int32ToPointer(value int32) *int32 {
} }
func deploymentUpdated(cluster *Cluster, err error) error { func deploymentUpdated(cluster *Cluster, err error) error {
if cluster.ConnectionPool.Deployment.Spec.Replicas == nil || if cluster.ConnectionPooler.Deployment.Spec.Replicas == nil ||
*cluster.ConnectionPool.Deployment.Spec.Replicas != 2 { *cluster.ConnectionPooler.Deployment.Spec.Replicas != 2 {
return fmt.Errorf("Wrong nubmer of instances") return fmt.Errorf("Wrong nubmer of instances")
} }
@ -27,15 +27,15 @@ func deploymentUpdated(cluster *Cluster, err error) error {
} }
func objectsAreSaved(cluster *Cluster, err error) error { func objectsAreSaved(cluster *Cluster, err error) error {
if cluster.ConnectionPool == nil { if cluster.ConnectionPooler == nil {
return fmt.Errorf("Connection pool resources are empty") return fmt.Errorf("Connection pooler resources are empty")
} }
if cluster.ConnectionPool.Deployment == nil { if cluster.ConnectionPooler.Deployment == nil {
return fmt.Errorf("Deployment was not saved") return fmt.Errorf("Deployment was not saved")
} }
if cluster.ConnectionPool.Service == nil { if cluster.ConnectionPooler.Service == nil {
return fmt.Errorf("Service was not saved") return fmt.Errorf("Service was not saved")
} }
@ -43,15 +43,15 @@ func objectsAreSaved(cluster *Cluster, err error) error {
} }
func objectsAreDeleted(cluster *Cluster, err error) error { func objectsAreDeleted(cluster *Cluster, err error) error {
if cluster.ConnectionPool != nil { if cluster.ConnectionPooler != nil {
return fmt.Errorf("Connection pool was not deleted") return fmt.Errorf("Connection pooler was not deleted")
} }
return nil return nil
} }
func TestConnPoolSynchronization(t *testing.T) { func TestConnectionPoolerSynchronization(t *testing.T) {
testName := "Test connection pool synchronization" testName := "Test connection pooler synchronization"
var cluster = New( var cluster = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
@ -60,11 +60,11 @@ func TestConnPoolSynchronization(t *testing.T) {
SuperUsername: superUserName, SuperUsername: superUserName,
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
ConnectionPool: config.ConnectionPool{ ConnectionPooler: config.ConnectionPooler{
ConnPoolDefaultCPURequest: "100m", ConnectionPoolerDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m", ConnectionPoolerDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
NumberOfInstances: int32ToPointer(1), NumberOfInstances: int32ToPointer(1),
}, },
}, },
@ -84,15 +84,15 @@ func TestConnPoolSynchronization(t *testing.T) {
clusterDirtyMock := *cluster clusterDirtyMock := *cluster
clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient()
clusterDirtyMock.ConnectionPool = &ConnectionPoolObjects{ clusterDirtyMock.ConnectionPooler = &ConnectionPoolerObjects{
Deployment: &appsv1.Deployment{}, Deployment: &appsv1.Deployment{},
Service: &v1.Service{}, Service: &v1.Service{},
} }
clusterNewDefaultsMock := *cluster clusterNewDefaultsMock := *cluster
clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient()
cluster.OpConfig.ConnectionPool.Image = "pooler:2.0" cluster.OpConfig.ConnectionPooler.Image = "pooler:2.0"
cluster.OpConfig.ConnectionPool.NumberOfInstances = int32ToPointer(2) cluster.OpConfig.ConnectionPooler.NumberOfInstances = int32ToPointer(2)
tests := []struct { tests := []struct {
subTest string subTest string
@ -105,12 +105,12 @@ func TestConnPoolSynchronization(t *testing.T) {
subTest: "create if doesn't exist", subTest: "create if doesn't exist",
oldSpec: &acidv1.Postgresql{ oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
cluster: &clusterMissingObjects, cluster: &clusterMissingObjects,
@ -123,7 +123,7 @@ func TestConnPoolSynchronization(t *testing.T) {
}, },
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
EnableConnectionPool: boolToPointer(true), EnableConnectionPooler: boolToPointer(true),
}, },
}, },
cluster: &clusterMissingObjects, cluster: &clusterMissingObjects,
@ -136,7 +136,7 @@ func TestConnPoolSynchronization(t *testing.T) {
}, },
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
cluster: &clusterMissingObjects, cluster: &clusterMissingObjects,
@ -146,7 +146,7 @@ func TestConnPoolSynchronization(t *testing.T) {
subTest: "delete if not needed", subTest: "delete if not needed",
oldSpec: &acidv1.Postgresql{ oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
@ -170,14 +170,14 @@ func TestConnPoolSynchronization(t *testing.T) {
subTest: "update deployment", subTest: "update deployment",
oldSpec: &acidv1.Postgresql{ oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{ ConnectionPooler: &acidv1.ConnectionPooler{
NumberOfInstances: int32ToPointer(1), NumberOfInstances: int32ToPointer(1),
}, },
}, },
}, },
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{ ConnectionPooler: &acidv1.ConnectionPooler{
NumberOfInstances: int32ToPointer(2), NumberOfInstances: int32ToPointer(2),
}, },
}, },
@ -189,12 +189,12 @@ func TestConnPoolSynchronization(t *testing.T) {
subTest: "update image from changed defaults", subTest: "update image from changed defaults",
oldSpec: &acidv1.Postgresql{ oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
cluster: &clusterNewDefaultsMock, cluster: &clusterNewDefaultsMock,
@ -202,7 +202,7 @@ func TestConnPoolSynchronization(t *testing.T) {
}, },
} }
for _, tt := range tests { for _, tt := range tests {
err := tt.cluster.syncConnectionPool(tt.oldSpec, tt.newSpec, mockInstallLookupFunction) err := tt.cluster.syncConnectionPooler(tt.oldSpec, tt.newSpec, mockInstallLookupFunction)
if err := tt.check(tt.cluster, err); err != nil { if err := tt.check(tt.cluster, err); err != nil {
t.Errorf("%s [%s]: Could not synchronize, %+v", t.Errorf("%s [%s]: Could not synchronize, %+v",

View File

@ -414,24 +414,24 @@ func (c *Cluster) labelsSelector() *metav1.LabelSelector {
} }
} }
// Return connection pool labels selector, which should from one point of view // Return connection pooler labels selector, which should from one point of view
// inherit most of the labels from the cluster itself, but at the same time // inherit most of the labels from the cluster itself, but at the same time
// have e.g. different `application` label, so that recreatePod operation will // have e.g. different `application` label, so that recreatePod operation will
// not interfere with it (it lists all the pods via labels, and if there would // not interfere with it (it lists all the pods via labels, and if there would
// be no difference, it will recreate also pooler pods). // be no difference, it will recreate also pooler pods).
func (c *Cluster) connPoolLabelsSelector() *metav1.LabelSelector { func (c *Cluster) connectionPoolerLabelsSelector() *metav1.LabelSelector {
connPoolLabels := labels.Set(map[string]string{}) connectionPoolerLabels := labels.Set(map[string]string{})
extraLabels := labels.Set(map[string]string{ extraLabels := labels.Set(map[string]string{
"connection-pool": c.connPoolName(), "connection-pooler": c.connectionPoolerName(),
"application": "db-connection-pool", "application": "db-connection-pooler",
}) })
connPoolLabels = labels.Merge(connPoolLabels, c.labelsSet(false)) connectionPoolerLabels = labels.Merge(connectionPoolerLabels, c.labelsSet(false))
connPoolLabels = labels.Merge(connPoolLabels, extraLabels) connectionPoolerLabels = labels.Merge(connectionPoolerLabels, extraLabels)
return &metav1.LabelSelector{ return &metav1.LabelSelector{
MatchLabels: connPoolLabels, MatchLabels: connectionPoolerLabels,
MatchExpressions: nil, MatchExpressions: nil,
} }
} }
@ -509,14 +509,14 @@ func (c *Cluster) patroniUsesKubernetes() bool {
return c.OpConfig.EtcdHost == "" return c.OpConfig.EtcdHost == ""
} }
func (c *Cluster) needConnectionPoolWorker(spec *acidv1.PostgresSpec) bool { func (c *Cluster) needConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
if spec.EnableConnectionPool == nil { if spec.EnableConnectionPooler == nil {
return spec.ConnectionPool != nil return spec.ConnectionPooler != nil
} else { } else {
return *spec.EnableConnectionPool return *spec.EnableConnectionPooler
} }
} }
func (c *Cluster) needConnectionPool() bool { func (c *Cluster) needConnectionPooler() bool {
return c.needConnectionPoolWorker(&c.Spec) return c.needConnectionPoolerWorker(&c.Spec)
} }

View File

@ -148,51 +148,51 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit
result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit
// Connection pool. Looks like we can't use defaulting in CRD before 1.17, // Connection pooler. Looks like we can't use defaulting in CRD before 1.17,
// so ensure default values here. // so ensure default values here.
result.ConnectionPool.NumberOfInstances = util.CoalesceInt32( result.ConnectionPooler.NumberOfInstances = util.CoalesceInt32(
fromCRD.ConnectionPool.NumberOfInstances, fromCRD.ConnectionPooler.NumberOfInstances,
int32ToPointer(2)) int32ToPointer(2))
result.ConnectionPool.NumberOfInstances = util.MaxInt32( result.ConnectionPooler.NumberOfInstances = util.MaxInt32(
result.ConnectionPool.NumberOfInstances, result.ConnectionPooler.NumberOfInstances,
int32ToPointer(2)) int32ToPointer(2))
result.ConnectionPool.Schema = util.Coalesce( result.ConnectionPooler.Schema = util.Coalesce(
fromCRD.ConnectionPool.Schema, fromCRD.ConnectionPooler.Schema,
constants.ConnectionPoolSchemaName) constants.ConnectionPoolerSchemaName)
result.ConnectionPool.User = util.Coalesce( result.ConnectionPooler.User = util.Coalesce(
fromCRD.ConnectionPool.User, fromCRD.ConnectionPooler.User,
constants.ConnectionPoolUserName) constants.ConnectionPoolerUserName)
result.ConnectionPool.Image = util.Coalesce( result.ConnectionPooler.Image = util.Coalesce(
fromCRD.ConnectionPool.Image, fromCRD.ConnectionPooler.Image,
"registry.opensource.zalan.do/acid/pgbouncer") "registry.opensource.zalan.do/acid/pgbouncer")
result.ConnectionPool.Mode = util.Coalesce( result.ConnectionPooler.Mode = util.Coalesce(
fromCRD.ConnectionPool.Mode, fromCRD.ConnectionPooler.Mode,
constants.ConnectionPoolDefaultMode) constants.ConnectionPoolerDefaultMode)
result.ConnectionPool.ConnPoolDefaultCPURequest = util.Coalesce( result.ConnectionPooler.ConnectionPoolerDefaultCPURequest = util.Coalesce(
fromCRD.ConnectionPool.DefaultCPURequest, fromCRD.ConnectionPooler.DefaultCPURequest,
constants.ConnectionPoolDefaultCpuRequest) constants.ConnectionPoolerDefaultCpuRequest)
result.ConnectionPool.ConnPoolDefaultMemoryRequest = util.Coalesce( result.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest = util.Coalesce(
fromCRD.ConnectionPool.DefaultMemoryRequest, fromCRD.ConnectionPooler.DefaultMemoryRequest,
constants.ConnectionPoolDefaultMemoryRequest) constants.ConnectionPoolerDefaultMemoryRequest)
result.ConnectionPool.ConnPoolDefaultCPULimit = util.Coalesce( result.ConnectionPooler.ConnectionPoolerDefaultCPULimit = util.Coalesce(
fromCRD.ConnectionPool.DefaultCPULimit, fromCRD.ConnectionPooler.DefaultCPULimit,
constants.ConnectionPoolDefaultCpuLimit) constants.ConnectionPoolerDefaultCpuLimit)
result.ConnectionPool.ConnPoolDefaultMemoryLimit = util.Coalesce( result.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit = util.Coalesce(
fromCRD.ConnectionPool.DefaultMemoryLimit, fromCRD.ConnectionPooler.DefaultMemoryLimit,
constants.ConnectionPoolDefaultMemoryLimit) constants.ConnectionPoolerDefaultMemoryLimit)
result.ConnectionPool.MaxDBConnections = util.CoalesceInt32( result.ConnectionPooler.MaxDBConnections = util.CoalesceInt32(
fromCRD.ConnectionPool.MaxDBConnections, fromCRD.ConnectionPooler.MaxDBConnections,
int32ToPointer(constants.ConnPoolMaxDBConnections)) int32ToPointer(constants.ConnectionPoolerMaxDBConnections))
return result return result
} }

View File

@ -55,7 +55,7 @@ func (f *genericInformer) Lister() cache.GenericLister {
} }
// ForResource gives generic access to a shared informer of the matching type // ForResource gives generic access to a shared informer of the matching type
// TODO extend this to unknown resources with a client pool // TODO extend this to unknown resources with a client pooler
func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) {
switch resource { switch resource {
// Group=acid.zalan.do, Version=v1 // Group=acid.zalan.do, Version=v1

View File

@ -31,7 +31,7 @@ const (
RoleOriginInfrastructure RoleOriginInfrastructure
RoleOriginTeamsAPI RoleOriginTeamsAPI
RoleOriginSystem RoleOriginSystem
RoleConnectionPool RoleConnectionPooler
) )
type syncUserOperation int type syncUserOperation int
@ -180,8 +180,8 @@ func (r RoleOrigin) String() string {
return "teams API role" return "teams API role"
case RoleOriginSystem: case RoleOriginSystem:
return "system role" return "system role"
case RoleConnectionPool: case RoleConnectionPooler:
return "connection pool role" return "connection pooler role"
default: default:
panic(fmt.Sprintf("bogus role origin value %d", r)) panic(fmt.Sprintf("bogus role origin value %d", r))
} }

View File

@ -85,17 +85,17 @@ type LogicalBackup struct {
} }
// Operator options for connection pooler // Operator options for connection pooler
type ConnectionPool struct { type ConnectionPooler struct {
NumberOfInstances *int32 `name:"connection_pool_number_of_instances" default:"2"` NumberOfInstances *int32 `name:"connection_pooler_number_of_instances" default:"2"`
Schema string `name:"connection_pool_schema" default:"pooler"` Schema string `name:"connection_pooler_schema" default:"pooler"`
User string `name:"connection_pool_user" default:"pooler"` User string `name:"connection_pooler_user" default:"pooler"`
Image string `name:"connection_pool_image" default:"registry.opensource.zalan.do/acid/pgbouncer"` Image string `name:"connection_pooler_image" default:"registry.opensource.zalan.do/acid/pgbouncer"`
Mode string `name:"connection_pool_mode" default:"transaction"` Mode string `name:"connection_pooler_mode" default:"transaction"`
MaxDBConnections *int32 `name:"connection_pool_max_db_connections" default:"60"` MaxDBConnections *int32 `name:"connection_pooler_max_db_connections" default:"60"`
ConnPoolDefaultCPURequest string `name:"connection_pool_default_cpu_request" default:"500m"` ConnectionPoolerDefaultCPURequest string `name:"connection_pooler_default_cpu_request" default:"500m"`
ConnPoolDefaultMemoryRequest string `name:"connection_pool_default_memory_request" default:"100Mi"` ConnectionPoolerDefaultMemoryRequest string `name:"connection_pooler_default_memory_request" default:"100Mi"`
ConnPoolDefaultCPULimit string `name:"connection_pool_default_cpu_limit" default:"1"` ConnectionPoolerDefaultCPULimit string `name:"connection_pooler_default_cpu_limit" default:"1"`
ConnPoolDefaultMemoryLimit string `name:"connection_pool_default_memory_limit" default:"100Mi"` ConnectionPoolerDefaultMemoryLimit string `name:"connection_pooler_default_memory_limit" default:"100Mi"`
} }
// Config describes operator config // Config describes operator config
@ -105,7 +105,7 @@ type Config struct {
Auth Auth
Scalyr Scalyr
LogicalBackup LogicalBackup
ConnectionPool ConnectionPooler
WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to' WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to'
EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS
@ -213,9 +213,9 @@ func validate(cfg *Config) (err error) {
err = fmt.Errorf("number of workers should be higher than 0") err = fmt.Errorf("number of workers should be higher than 0")
} }
if *cfg.ConnectionPool.NumberOfInstances < constants.ConnPoolMinInstances { if *cfg.ConnectionPooler.NumberOfInstances < constants.ConnectionPoolerMinInstances {
msg := "number of connection pool instances should be higher than %d" msg := "number of connection pooler instances should be higher than %d"
err = fmt.Errorf(msg, constants.ConnPoolMinInstances) err = fmt.Errorf(msg, constants.ConnectionPoolerMinInstances)
} }
return return
} }

View File

@ -1,18 +1,18 @@
package constants package constants
// Connection pool specific constants // Connection pooler specific constants
const ( const (
ConnectionPoolUserName = "pooler" ConnectionPoolerUserName = "pooler"
ConnectionPoolSchemaName = "pooler" ConnectionPoolerSchemaName = "pooler"
ConnectionPoolDefaultType = "pgbouncer" ConnectionPoolerDefaultType = "pgbouncer"
ConnectionPoolDefaultMode = "transaction" ConnectionPoolerDefaultMode = "transaction"
ConnectionPoolDefaultCpuRequest = "500m" ConnectionPoolerDefaultCpuRequest = "500m"
ConnectionPoolDefaultCpuLimit = "1" ConnectionPoolerDefaultCpuLimit = "1"
ConnectionPoolDefaultMemoryRequest = "100Mi" ConnectionPoolerDefaultMemoryRequest = "100Mi"
ConnectionPoolDefaultMemoryLimit = "100Mi" ConnectionPoolerDefaultMemoryLimit = "100Mi"
ConnPoolContainer = 0 ConnectionPoolerContainer = 0
ConnPoolMaxDBConnections = 60 ConnectionPoolerMaxDBConnections = 60
ConnPoolMaxClientConnections = 10000 ConnectionPoolerMaxClientConnections = 10000
ConnPoolMinInstances = 2 ConnectionPoolerMinInstances = 2
) )

View File

@ -4,7 +4,7 @@ package constants
const ( const (
PasswordLength = 64 PasswordLength = 64
SuperuserKeyName = "superuser" SuperuserKeyName = "superuser"
ConnectionPoolUserKeyName = "pooler" ConnectionPoolerUserKeyName = "pooler"
ReplicationUserKeyName = "replication" ReplicationUserKeyName = "replication"
RoleFlagSuperuser = "SUPERUSER" RoleFlagSuperuser = "SUPERUSER"
RoleFlagInherit = "INHERIT" RoleFlagInherit = "INHERIT"