merge with master and revert spec naming in k8sres

This commit is contained in:
Felix Kunde 2020-03-25 15:15:59 +01:00
commit b3aeaec33d
46 changed files with 3024 additions and 146 deletions

View File

@ -318,6 +318,47 @@ spec:
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
scalyr_server_url:
type: string
connection_pool:
type: object
properties:
connection_pool_schema:
type: string
#default: "pooler"
connection_pool_user:
type: string
#default: "pooler"
connection_pool_image:
type: string
#default: "registry.opensource.zalan.do/acid/pgbouncer"
connection_pool_max_db_connections:
type: integer
#default: 60
connection_pool_mode:
type: string
enum:
- "session"
- "transaction"
#default: "transaction"
connection_pool_number_of_instances:
type: integer
minimum: 2
#default: 2
connection_pool_default_cpu_limit:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "1"
connection_pool_default_cpu_request:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "500m"
connection_pool_default_memory_limit:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100Mi"
connection_pool_default_memory_request:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100Mi"
status:
type: object
additionalProperties:

View File

@ -106,6 +106,55 @@ spec:
uid:
format: uuid
type: string
connectionPool:
type: object
properties:
dockerImage:
type: string
maxDBConnections:
type: integer
mode:
type: string
enum:
- "session"
- "transaction"
numberOfInstances:
type: integer
minimum: 2
resources:
type: object
required:
- requests
- limits
properties:
limits:
type: object
required:
- cpu
- memory
properties:
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
requests:
type: object
required:
- cpu
- memory
properties:
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
schema:
type: string
user:
type: string
databases:
type: object
additionalProperties:
@ -113,6 +162,8 @@ spec:
# Note: usernames specified here as database owners must be declared in the users key of the spec key.
dockerImage:
type: string
enableConnectionPool:
type: boolean
enableLogicalBackup:
type: boolean
enableMasterLoadBalancer:

View File

@ -31,6 +31,20 @@ Create a service account name.
{{ default (include "postgres-operator.fullname" .) .Values.serviceAccount.name }}
{{- end -}}
{{/*
Create a pod service account name.
*/}}
{{- define "postgres-pod.serviceAccountName" -}}
{{ default (printf "%s-%v" (include "postgres-operator.fullname" .) "pod") .Values.podServiceAccount.name }}
{{- end -}}
{{/*
Create a controller ID.
*/}}
{{- define "postgres-operator.controllerID" -}}
{{ default (include "postgres-operator.fullname" .) .Values.controllerID.name }}
{{- end -}}
{{/*
Create chart name and version as used by the chart label.
*/}}

View File

@ -2,7 +2,7 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: postgres-pod
name: {{ include "postgres-pod.serviceAccountName" . }}
labels:
app.kubernetes.io/name: {{ template "postgres-operator.name" . }}
helm.sh/chart: {{ template "postgres-operator.chart" . }}

View File

@ -128,6 +128,7 @@ rules:
- apps
resources:
- statefulsets
- deployments
verbs:
- create
- delete

View File

@ -9,6 +9,7 @@ metadata:
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
data:
pod_service_account_name: {{ include "postgres-pod.serviceAccountName" . }}
{{ toYaml .Values.configGeneral | indent 2 }}
{{ toYaml .Values.configUsers | indent 2 }}
{{ toYaml .Values.configKubernetes | indent 2 }}
@ -19,4 +20,5 @@ data:
{{ toYaml .Values.configDebug | indent 2 }}
{{ toYaml .Values.configLoggingRestApi | indent 2 }}
{{ toYaml .Values.configTeamsApi | indent 2 }}
{{ toYaml .Values.configConnectionPool | indent 2 }}
{{- end }}

View File

@ -43,6 +43,10 @@ spec:
{{- else }}
- name: POSTGRES_OPERATOR_CONFIGURATION_OBJECT
value: {{ template "postgres-operator.fullname" . }}
{{- end }}
{{- if .Values.controllerID.create }}
- name: CONTROLLER_ID
value: {{ template "postgres-operator.controllerID" . }}
{{- end }}
resources:
{{ toYaml .Values.resources | indent 10 }}

View File

@ -13,6 +13,7 @@ configuration:
users:
{{ toYaml .Values.configUsers | indent 4 }}
kubernetes:
pod_service_account_name: {{ include "postgres-pod.serviceAccountName" . }}
oauth_token_secret_name: {{ template "postgres-operator.fullname" . }}
{{ toYaml .Values.configKubernetes | indent 4 }}
postgres_pod_resources:
@ -33,4 +34,6 @@ configuration:
{{ toYaml .Values.configLoggingRestApi | indent 4 }}
scalyr:
{{ toYaml .Values.configScalyr | indent 4 }}
connection_pool:
{{ toYaml .Values.configConnectionPool | indent 4 }}
{{- end }}

View File

@ -103,8 +103,6 @@ configKubernetes:
# service account definition as JSON/YAML string to be used by postgres cluster pods
# pod_service_account_definition: ""
# name of service account to be used by postgres cluster pods
pod_service_account_name: "postgres-pod"
# role binding definition as JSON/YAML string to be used by pod service account
# pod_service_account_role_binding_definition: ""
@ -269,6 +267,25 @@ configScalyr:
# Memory request value for the Scalyr sidecar
scalyr_memory_request: 50Mi
configConnectionPool:
# db schema to install lookup function into
connection_pool_schema: "pooler"
# db user for pooler to use
connection_pool_user: "pooler"
# docker image
connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer"
# max db connections the pooler should hold
connection_pool_max_db_connections: 60
# default pooling mode
connection_pool_mode: "transaction"
# number of pooler instances
connection_pool_number_of_instances: 2
# default resources
connection_pool_default_cpu_request: 500m
connection_pool_default_memory_request: 100Mi
connection_pool_default_cpu_limit: "1"
connection_pool_default_memory_limit: 100Mi
rbac:
# Specifies whether RBAC resources should be created
create: true
@ -284,6 +301,11 @@ serviceAccount:
# If not set and create is true, a name is generated using the fullname template
name:
podServiceAccount:
# The name of the ServiceAccount to be used by postgres cluster pods
# If not set a name is generated using the fullname template and "-pod" suffix
name: "postgres-pod"
priorityClassName: ""
resources:
@ -305,3 +327,12 @@ tolerations: []
# Node labels for pod assignment
# Ref: https://kubernetes.io/docs/user-guide/node-selection/
nodeSelector: {}
controllerID:
# Specifies whether a controller ID should be defined for the operator
# Note, all postgres manifest must then contain the following annotation to be found by this operator
# "acid.zalan.do/controller": <controller-ID-of-the-operator>
create: false
# The name of the controller ID to use.
# If not set and create is true, a name is generated using the fullname template
name:

View File

@ -96,8 +96,6 @@ configKubernetes:
# service account definition as JSON/YAML string to be used by postgres cluster pods
# pod_service_account_definition: ""
# name of service account to be used by postgres cluster pods
pod_service_account_name: "postgres-pod"
# role binding definition as JSON/YAML string to be used by pod service account
# pod_service_account_role_binding_definition: ""
@ -245,6 +243,26 @@ configTeamsApi:
# URL of the Teams API service
# teams_api_url: http://fake-teams-api.default.svc.cluster.local
# configure connection pooler deployment created by the operator
configConnectionPool:
# db schema to install lookup function into
connection_pool_schema: "pooler"
# db user for pooler to use
connection_pool_user: "pooler"
# docker image
connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer"
# max db connections the pooler should hold
connection_pool_max_db_connections: 60
# default pooling mode
connection_pool_mode: "transaction"
# number of pooler instances
connection_pool_number_of_instances: 2
# default resources
connection_pool_default_cpu_request: 500m
connection_pool_default_memory_request: 100Mi
connection_pool_default_cpu_limit: "1"
connection_pool_default_memory_limit: 100Mi
rbac:
# Specifies whether RBAC resources should be created
create: true
@ -260,6 +278,11 @@ serviceAccount:
# If not set and create is true, a name is generated using the fullname template
name:
podServiceAccount:
# The name of the ServiceAccount to be used by postgres cluster pods
# If not set a name is generated using the fullname template and "-pod" suffix
name: "postgres-pod"
priorityClassName: ""
resources:
@ -281,3 +304,12 @@ tolerations: []
# Node labels for pod assignment
# Ref: https://kubernetes.io/docs/user-guide/node-selection/
nodeSelector: {}
controllerID:
# Specifies whether a controller ID should be defined for the operator
# Note, all postgres manifest must then contain the following annotation to be found by this operator
# "acid.zalan.do/controller": <controller-ID-of-the-operator>
create: false
# The name of the controller ID to use.
# If not set and create is true, a name is generated using the fullname template
name:

View File

@ -3,8 +3,17 @@ MAINTAINER Team ACID @ Zalando <team-acid@zalando.de>
# We need root certificates to deal with teams api over https
RUN apk --no-cache add ca-certificates go git musl-dev
RUN go get github.com/derekparker/delve/cmd/dlv
COPY build/* /
CMD ["/root/go/bin/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"]
RUN addgroup -g 1000 pgo
RUN adduser -D -u 1000 -G pgo -g 'Postgres Operator' pgo
RUN go get github.com/derekparker/delve/cmd/dlv
RUN cp /root/go/bin/dlv /dlv
RUN chown -R pgo:pgo /dlv
USER pgo:pgo
RUN ls -l /
CMD ["/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"]

View File

@ -111,12 +111,12 @@ These parameters are grouped directly under the `spec` key in the manifest.
value overrides the `pod_toleration` setting from the operator. Optional.
* **podPriorityClassName**
a name of the [priority
class](https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/#priorityclass)
that should be assigned to the cluster pods. When not specified, the value
is taken from the `pod_priority_class_name` operator parameter, if not set
then the default priority class is taken. The priority class itself must be
defined in advance. Optional.
a name of the [priority
class](https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/#priorityclass)
that should be assigned to the cluster pods. When not specified, the value
is taken from the `pod_priority_class_name` operator parameter, if not set
then the default priority class is taken. The priority class itself must be
defined in advance. Optional.
* **podAnnotations**
A map of key value pairs that gets attached as [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/)
@ -140,6 +140,11 @@ These parameters are grouped directly under the `spec` key in the manifest.
is `false`, then no volume will be mounted no matter how operator was
configured (so you can override the operator configuration). Optional.
* **enableConnectionPool**
Tells the operator to create a connection pool with a database. If this
field is true, a connection pool deployment will be created even if
`connectionPool` section is empty. Optional, not set by default.
* **enableLogicalBackup**
Determines if the logical backup of this cluster should be taken and uploaded
to S3. Default: false. Optional.
@ -184,9 +189,9 @@ explanation of `ttl` and `loop_wait` parameters.
```
hostssl all +pamrole all pam
```
, where pamrole is the name of the role for the pam authentication; any
custom `pg_hba` should include the pam line to avoid breaking pam
authentication. Optional.
where pamrole is the name of the role for the pam authentication; any
custom `pg_hba` should include the pam line to avoid breaking pam
authentication. Optional.
* **ttl**
Patroni `ttl` parameter value, optional. The default is set by the Spilo
@ -360,6 +365,35 @@ CPU and memory limits for the sidecar container.
memory limits for the sidecar container. Optional, overrides the
`default_memory_limits` operator configuration parameter. Optional.
## Connection pool
Parameters are grouped under the `connectionPool` top-level key and specify
configuration for connection pool. If this section is not empty, a connection
pool will be created for a database even if `enableConnectionPool` is not
present.
* **numberOfInstances**
How many instances of connection pool to create.
* **schema**
Schema to create for credentials lookup function.
* **user**
User to create for connection pool to be able to connect to a database.
* **dockerImage**
Which docker image to use for connection pool deployment.
* **maxDBConnections**
How many connections the pooler can max hold. This value is divided among the
pooler pods.
* **mode**
In which mode to run connection pool, transaction or session.
* **resources**
Resource configuration for connection pool deployment.
## Custom TLS certificates
Those parameters are grouped under the `tls` top-level key.
@ -379,4 +413,4 @@ Those parameters are grouped under the `tls` top-level key.
* **caFile**
Optional filename to the CA certificate. Useful when the client connects
with `sslmode=verify-ca` or `sslmode=verify-full`.
with `sslmode=verify-ca` or `sslmode=verify-full`. Default is empty.

View File

@ -286,11 +286,11 @@ configuration they are grouped under the `kubernetes` key.
capability. The default is `false`.
* **master_pod_move_timeout**
The period of time to wait for the success of migration of master pods from
an unschedulable node. The migration includes Patroni switchovers to
respective replicas on healthy nodes. The situation where master pods still
exist on the old node after this timeout expires has to be fixed manually.
The default is 20 minutes.
The period of time to wait for the success of migration of master pods from
an unschedulable node. The migration includes Patroni switchovers to
respective replicas on healthy nodes. The situation where master pods still
exist on the old node after this timeout expires has to be fixed manually.
The default is 20 minutes.
* **enable_pod_antiaffinity**
toggles [pod anti affinity](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/)
@ -596,3 +596,40 @@ scalyr sidecar. In the CRD-based configuration they are grouped under the
* **scalyr_memory_limit**
Memory limit value for the Scalyr sidecar. The default is `500Mi`.
## Connection pool configuration
Parameters are grouped under the `connection_pool` top-level key and specify
default configuration for connection pool, if a postgres manifest requests it
but do not specify some of the parameters. All of them are optional with the
operator being able to provide some reasonable defaults.
* **connection_pool_number_of_instances**
How many instances of connection pool to create. Default is 2 which is also
the required minimum.
* **connection_pool_schema**
Schema to create for credentials lookup function. Default is `pooler`.
* **connection_pool_user**
User to create for connection pool to be able to connect to a database.
Default is `pooler`.
* **connection_pool_image**
Docker image to use for connection pool deployment.
Default: "registry.opensource.zalan.do/acid/pgbouncer"
* **connection_pool_max_db_connections**
How many connections the pooler can max hold. This value is divided among the
pooler pods. Default is 60 which will make up 30 connections per pod for the
default setup with two instances.
* **connection_pool_mode**
Default pool mode, `session` or `transaction`. Default is `transaction`.
* **connection_pool_default_cpu_request**
**connection_pool_default_memory_reques**
**connection_pool_default_cpu_limit**
**connection_pool_default_memory_limit**
Default resource configuration for connection pool deployment. The internal
default for memory request and limit is `100Mi`, for CPU it is `500m` and `1`.

View File

@ -30,7 +30,7 @@ spec:
databases:
foo: zalando
postgresql:
version: "11"
version: "12"
```
Once you cloned the Postgres Operator [repository](https://github.com/zalando/postgres-operator)
@ -512,12 +512,65 @@ monitoring is outside the scope of operator responsibilities. See
[administrator documentation](administrator.md) for details on how backups are
executed.
## Connection pool
The operator can create a database side connection pool for those applications,
where an application side pool is not feasible, but a number of connections is
high. To create a connection pool together with a database, modify the
manifest:
```yaml
spec:
enableConnectionPool: true
```
This will tell the operator to create a connection pool with default
configuration, through which one can access the master via a separate service
`{cluster-name}-pooler`. In most of the cases provided default configuration
should be good enough.
To configure a new connection pool, specify:
```
spec:
connectionPool:
# how many instances of connection pool to create
number_of_instances: 2
# in which mode to run, session or transaction
mode: "transaction"
# schema, which operator will create to install credentials lookup
# function
schema: "pooler"
# user, which operator will create for connection pool
user: "pooler"
# resources for each instance
resources:
requests:
cpu: 500m
memory: 100Mi
limits:
cpu: "1"
memory: 100Mi
```
By default `pgbouncer` is used to create a connection pool. To find out about
pool modes see [docs](https://www.pgbouncer.org/config.html#pool_mode) (but it
should be general approach between different implementation).
Note, that using `pgbouncer` means meaningful resource CPU limit should be less
than 1 core (there is a way to utilize more than one, but in K8S it's easier
just to spin up more instances).
## Custom TLS certificates
By default, the spilo image generates its own TLS certificate during startup.
This certificate is not secure since it cannot be verified and thus doesn't
protect from active MITM attacks. In this section we show how a Kubernete
Secret resources can be loaded with a custom TLS certificate.
However, this certificate cannot be verified and thus doesn't protect from
active MITM attacks. In this section we show how to specify a custom TLS
certificate which is mounted in the database pods via a K8s Secret.
Before applying these changes, the operator must also be configured with the
`spilo_fsgroup` set to the GID matching the postgres user group. If the value

View File

@ -9,6 +9,10 @@ import yaml
from kubernetes import client, config
def to_selector(labels):
return ",".join(["=".join(l) for l in labels.items()])
class EndToEndTestCase(unittest.TestCase):
'''
Test interaction of the operator with multiple K8s components.
@ -47,7 +51,8 @@ class EndToEndTestCase(unittest.TestCase):
for filename in ["operator-service-account-rbac.yaml",
"configmap.yaml",
"postgres-operator.yaml"]:
k8s.create_with_kubectl("manifests/" + filename)
result = k8s.create_with_kubectl("manifests/" + filename)
print("stdout: {}, stderr: {}".format(result.stdout, result.stderr))
k8s.wait_for_operator_pod_start()
@ -55,9 +60,14 @@ class EndToEndTestCase(unittest.TestCase):
'default', label_selector='name=postgres-operator').items[0].spec.containers[0].image
print("Tested operator image: {}".format(actual_operator_image)) # shows up after tests finish
k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml")
k8s.wait_for_pod_start('spilo-role=master')
k8s.wait_for_pod_start('spilo-role=replica')
result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml")
print('stdout: {}, stderr: {}'.format(result.stdout, result.stderr))
try:
k8s.wait_for_pod_start('spilo-role=master')
k8s.wait_for_pod_start('spilo-role=replica')
except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
raise
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_enable_load_balancer(self):
@ -66,7 +76,7 @@ class EndToEndTestCase(unittest.TestCase):
'''
k8s = self.k8s
cluster_label = 'cluster-name=acid-minimal-cluster'
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
# enable load balancer services
pg_patch_enable_lbs = {
@ -178,7 +188,7 @@ class EndToEndTestCase(unittest.TestCase):
Lower resource limits below configured minimum and let operator fix it
'''
k8s = self.k8s
cluster_label = 'cluster-name=acid-minimal-cluster'
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
labels = 'spilo-role=master,' + cluster_label
_, failover_targets = k8s.get_pg_nodes(cluster_label)
@ -247,7 +257,7 @@ class EndToEndTestCase(unittest.TestCase):
Remove node readiness label from master node. This must cause a failover.
'''
k8s = self.k8s
cluster_label = 'cluster-name=acid-minimal-cluster'
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
labels = 'spilo-role=master,' + cluster_label
readiness_label = 'lifecycle-status'
readiness_value = 'ready'
@ -289,7 +299,7 @@ class EndToEndTestCase(unittest.TestCase):
Scale up from 2 to 3 and back to 2 pods by updating the Postgres manifest at runtime.
'''
k8s = self.k8s
labels = "cluster-name=acid-minimal-cluster"
labels = "application=spilo,cluster-name=acid-minimal-cluster"
k8s.wait_for_pg_to_scale(3)
self.assertEqual(3, k8s.count_pods_with_label(labels))
@ -339,13 +349,99 @@ class EndToEndTestCase(unittest.TestCase):
}
k8s.update_config(unpatch_custom_service_annotations)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_enable_disable_connection_pool(self):
'''
For a database without connection pool, then turns it on, scale up,
turn off and on again. Test with different ways of doing this (via
enableConnectionPool or connectionPool configuration section). At the
end turn the connection pool off to not interfere with other tests.
'''
k8s = self.k8s
service_labels = {
'cluster-name': 'acid-minimal-cluster',
}
pod_labels = dict({
'connection-pool': 'acid-minimal-cluster-pooler',
})
pod_selector = to_selector(pod_labels)
service_selector = to_selector(service_labels)
try:
# enable connection pool
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default',
'postgresqls', 'acid-minimal-cluster',
{
'spec': {
'enableConnectionPool': True,
}
})
k8s.wait_for_pod_start(pod_selector)
pods = k8s.api.core_v1.list_namespaced_pod(
'default', label_selector=pod_selector
).items
self.assertTrue(pods, 'No connection pool pods')
k8s.wait_for_service(service_selector)
services = k8s.api.core_v1.list_namespaced_service(
'default', label_selector=service_selector
).items
services = [
s for s in services
if s.metadata.name.endswith('pooler')
]
self.assertTrue(services, 'No connection pool service')
# scale up connection pool deployment
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default',
'postgresqls', 'acid-minimal-cluster',
{
'spec': {
'connectionPool': {
'numberOfInstances': 2,
},
}
})
k8s.wait_for_running_pods(pod_selector, 2)
# turn it off, keeping configuration section
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default',
'postgresqls', 'acid-minimal-cluster',
{
'spec': {
'enableConnectionPool': False,
}
})
k8s.wait_for_pods_to_stop(pod_selector)
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default',
'postgresqls', 'acid-minimal-cluster',
{
'spec': {
'enableConnectionPool': True,
}
})
k8s.wait_for_pod_start(pod_selector)
except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
raise
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_taint_based_eviction(self):
'''
Add taint "postgres=:NoExecute" to node with master. This must cause a failover.
'''
k8s = self.k8s
cluster_label = 'cluster-name=acid-minimal-cluster'
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
# get nodes of master and replica(s) (expected target of new master)
current_master_node, current_replica_nodes = k8s.get_pg_nodes(cluster_label)
@ -473,7 +569,7 @@ class K8s:
Wraps around K8 api client and helper methods.
'''
RETRY_TIMEOUT_SEC = 5
RETRY_TIMEOUT_SEC = 10
def __init__(self):
self.api = K8sApi()
@ -496,12 +592,39 @@ class K8s:
# for local execution ~ 10 seconds suffices
time.sleep(60)
def get_operator_pod(self):
pods = self.api.core_v1.list_namespaced_pod(
'default', label_selector='name=postgres-operator'
).items
if pods:
return pods[0]
return None
def get_operator_log(self):
operator_pod = self.get_operator_pod()
pod_name = operator_pod.metadata.name
return self.api.core_v1.read_namespaced_pod_log(
name=pod_name,
namespace='default'
)
def wait_for_pod_start(self, pod_labels, namespace='default'):
pod_phase = 'No pod running'
while pod_phase != 'Running':
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items
if pods:
pod_phase = pods[0].status.phase
if pods and pod_phase != 'Running':
pod_name = pods[0].metadata.name
response = self.api.core_v1.read_namespaced_pod(
name=pod_name,
namespace=namespace
)
print("Pod description {}".format(response))
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_service_type(self, svc_labels, namespace='default'):
@ -531,10 +654,27 @@ class K8s:
_ = self.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body)
labels = 'cluster-name=acid-minimal-cluster'
labels = 'application=spilo,cluster-name=acid-minimal-cluster'
while self.count_pods_with_label(labels) != number_of_instances:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_running_pods(self, labels, number, namespace=''):
while self.count_pods_with_label(labels) != number:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_pods_to_stop(self, labels, namespace=''):
while self.count_pods_with_label(labels) != 0:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_service(self, labels, namespace='default'):
def get_services():
return self.api.core_v1.list_namespaced_service(
namespace, label_selector=labels
).items
while not get_services():
time.sleep(self.RETRY_TIMEOUT_SEC)
def count_pods_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items)
@ -571,7 +711,10 @@ class K8s:
self.wait_for_operator_pod_start()
def create_with_kubectl(self, path):
subprocess.run(["kubectl", "create", "-f", path])
return subprocess.run(
["kubectl", "create", "-f", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if __name__ == '__main__':

View File

@ -24,7 +24,7 @@ spec:
databases:
foo: zalando
postgresql:
version: "11"
version: "12"
parameters: # Expert section
shared_buffers: "32MB"
max_connections: "10"

View File

@ -11,6 +11,16 @@ data:
cluster_history_entries: "1000"
cluster_labels: application:spilo
cluster_name_label: cluster-name
# connection_pool_default_cpu_limit: "1"
# connection_pool_default_cpu_request: "500m"
# connection_pool_default_memory_limit: 100Mi
# connection_pool_default_memory_request: 100Mi
connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5"
# connection_pool_max_db_connections: 60
# connection_pool_mode: "transaction"
# connection_pool_number_of_instances: 2
# connection_pool_schema: "pooler"
# connection_pool_user: "pooler"
# custom_service_annotations: "keyx:valuez,keya:valuea"
# custom_pod_annotations: "keya:valuea,keyb:valueb"
db_hosted_zone: db.example.com

View File

@ -16,4 +16,4 @@ spec:
databases:
foo: zalando # dbname: owner
postgresql:
version: "11"
version: "12"

View File

@ -129,6 +129,7 @@ rules:
- apps
resources:
- statefulsets
- deployments
verbs:
- create
- delete

View File

@ -294,6 +294,47 @@ spec:
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
scalyr_server_url:
type: string
connection_pool:
type: object
properties:
connection_pool_schema:
type: string
#default: "pooler"
connection_pool_user:
type: string
#default: "pooler"
connection_pool_image:
type: string
#default: "registry.opensource.zalan.do/acid/pgbouncer"
connection_pool_max_db_connections:
type: integer
#default: 60
connection_pool_mode:
type: string
enum:
- "session"
- "transaction"
#default: "transaction"
connection_pool_number_of_instances:
type: integer
minimum: 2
#default: 2
connection_pool_default_cpu_limit:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "1"
connection_pool_default_cpu_request:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "500m"
connection_pool_default_memory_limit:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100Mi"
connection_pool_default_memory_request:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100Mi"
status:
type: object
additionalProperties:

View File

@ -121,3 +121,14 @@ configuration:
scalyr_memory_limit: 500Mi
scalyr_memory_request: 50Mi
# scalyr_server_url: ""
connection_pool:
connection_pool_default_cpu_limit: "1"
connection_pool_default_cpu_request: "500m"
connection_pool_default_memory_limit: 100Mi
connection_pool_default_memory_request: 100Mi
connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5"
# connection_pool_max_db_connections: 60
connection_pool_mode: "transaction"
connection_pool_number_of_instances: 2
# connection_pool_schema: "pooler"
# connection_pool_user: "pooler"

View File

@ -70,6 +70,55 @@ spec:
uid:
format: uuid
type: string
connectionPool:
type: object
properties:
dockerImage:
type: string
maxDBConnections:
type: integer
mode:
type: string
enum:
- "session"
- "transaction"
numberOfInstances:
type: integer
minimum: 2
resources:
type: object
required:
- requests
- limits
properties:
limits:
type: object
required:
- cpu
- memory
properties:
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
requests:
type: object
required:
- cpu
- memory
properties:
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
schema:
type: string
user:
type: string
databases:
type: object
additionalProperties:
@ -77,6 +126,8 @@ spec:
# Note: usernames specified here as database owners must be declared in the users key of the spec key.
dockerImage:
type: string
enableConnectionPool:
type: boolean
enableLogicalBackup:
type: boolean
enableMasterLoadBalancer:

View File

@ -9,7 +9,7 @@ spec:
size: 1Gi
numberOfInstances: 1
postgresql:
version: "11"
version: "12"
# Make this a standby cluster and provide the s3 bucket path of source cluster for continuous streaming.
standby:
s3_wal_path: "s3://path/to/bucket/containing/wal/of/source/cluster/"

View File

@ -105,6 +105,7 @@ var OperatorConfigCRDResourceColumns = []apiextv1beta1.CustomResourceColumnDefin
var min0 = 0.0
var min1 = 1.0
var min2 = 2.0
var minDisable = -1.0
// PostgresCRDResourceValidation to check applied manifest parameters
@ -176,6 +177,76 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
},
},
},
"connectionPool": {
Type: "object",
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"dockerImage": {
Type: "string",
},
"maxDBConnections": {
Type: "integer",
},
"mode": {
Type: "string",
Enum: []apiextv1beta1.JSON{
{
Raw: []byte(`"session"`),
},
{
Raw: []byte(`"transaction"`),
},
},
},
"numberOfInstances": {
Type: "integer",
Minimum: &min2,
},
"resources": {
Type: "object",
Required: []string{"requests", "limits"},
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"limits": {
Type: "object",
Required: []string{"cpu", "memory"},
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"cpu": {
Type: "string",
Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0",
Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$",
},
"memory": {
Type: "string",
Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0",
Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$",
},
},
},
"requests": {
Type: "object",
Required: []string{"cpu", "memory"},
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"cpu": {
Type: "string",
Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0",
Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$",
},
"memory": {
Type: "string",
Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0",
Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$",
},
},
},
},
},
"schema": {
Type: "string",
},
"user": {
Type: "string",
},
},
},
"databases": {
Type: "object",
AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{
@ -188,6 +259,9 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
"dockerImage": {
Type: "string",
},
"enableConnectionPool": {
Type: "boolean",
},
"enableLogicalBackup": {
Type: "boolean",
},
@ -418,7 +492,7 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
Type: "string",
},
"tls": {
Type: "object",
Type: "object",
Required: []string{"secretName"},
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"secretName": {
@ -1055,6 +1129,54 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation
},
},
},
"connection_pool": {
Type: "object",
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"connection_pool_default_cpu_limit": {
Type: "string",
Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$",
},
"connection_pool_default_cpu_request": {
Type: "string",
Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$",
},
"connection_pool_default_memory_limit": {
Type: "string",
Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$",
},
"connection_pool_default_memory_request": {
Type: "string",
Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$",
},
"connection_pool_image": {
Type: "string",
},
"connection_pool_max_db_connections": {
Type: "integer",
},
"connection_pool_mode": {
Type: "string",
Enum: []apiextv1beta1.JSON{
{
Raw: []byte(`"session"`),
},
{
Raw: []byte(`"transaction"`),
},
},
},
"connection_pool_number_of_instances": {
Type: "integer",
Minimum: &min2,
},
"connection_pool_schema": {
Type: "string",
},
"connection_pool_user": {
Type: "string",
},
},
},
},
},
"status": {

View File

@ -1,5 +1,7 @@
package v1
// Operator configuration CRD definition, please use snake_case for field names.
import (
"github.com/zalando/postgres-operator/pkg/util/config"
@ -151,6 +153,20 @@ type ScalyrConfiguration struct {
ScalyrMemoryLimit string `json:"scalyr_memory_limit,omitempty"`
}
// Defines default configuration for connection pool
type ConnectionPoolConfiguration struct {
NumberOfInstances *int32 `json:"connection_pool_number_of_instances,omitempty"`
Schema string `json:"connection_pool_schema,omitempty"`
User string `json:"connection_pool_user,omitempty"`
Image string `json:"connection_pool_image,omitempty"`
Mode string `json:"connection_pool_mode,omitempty"`
MaxDBConnections *int32 `json:"connection_pool_max_db_connections,omitempty"`
DefaultCPURequest string `json:"connection_pool_default_cpu_request,omitempty"`
DefaultMemoryRequest string `json:"connection_pool_default_memory_request,omitempty"`
DefaultCPULimit string `json:"connection_pool_default_cpu_limit,omitempty"`
DefaultMemoryLimit string `json:"connection_pool_default_memory_limit,omitempty"`
}
// OperatorLogicalBackupConfiguration defines configuration for logical backup
type OperatorLogicalBackupConfiguration struct {
Schedule string `json:"logical_backup_schedule,omitempty"`
@ -187,6 +203,7 @@ type OperatorConfigurationData struct {
LoggingRESTAPI LoggingRESTAPIConfiguration `json:"logging_rest_api"`
Scalyr ScalyrConfiguration `json:"scalyr"`
LogicalBackup OperatorLogicalBackupConfiguration `json:"logical_backup"`
ConnectionPool ConnectionPoolConfiguration `json:"connection_pool"`
}
//Duration shortens this frequently used name

View File

@ -1,5 +1,7 @@
package v1
// Postgres CRD definition, please use CamelCase for field names.
import (
"time"
@ -27,6 +29,9 @@ type PostgresSpec struct {
Patroni `json:"patroni,omitempty"`
Resources `json:"resources,omitempty"`
EnableConnectionPool *bool `json:"enableConnectionPool,omitempty"`
ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"`
TeamID string `json:"teamId"`
DockerImage string `json:"dockerImage,omitempty"`
@ -162,3 +167,24 @@ type UserFlags []string
type PostgresStatus struct {
PostgresClusterStatus string `json:"PostgresClusterStatus"`
}
// Options for connection pooler
//
// TODO: prepared snippets of configuration, one can choose via type, e.g.
// pgbouncer-large (with higher resources) or odyssey-small (with smaller
// resources)
// Type string `json:"type,omitempty"`
//
// TODO: figure out what other important parameters of the connection pool it
// makes sense to expose. E.g. pool size (min/max boundaries), max client
// connections etc.
type ConnectionPool struct {
NumberOfInstances *int32 `json:"numberOfInstances,omitempty"`
Schema string `json:"schema,omitempty"`
User string `json:"user,omitempty"`
Mode string `json:"mode,omitempty"`
DockerImage string `json:"dockerImage,omitempty"`
MaxDBConnections *int32 `json:"maxDBConnections,omitempty"`
Resources `json:"resources,omitempty"`
}

View File

@ -68,6 +68,59 @@ func (in *CloneDescription) DeepCopy() *CloneDescription {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ConnectionPool) DeepCopyInto(out *ConnectionPool) {
*out = *in
if in.NumberOfInstances != nil {
in, out := &in.NumberOfInstances, &out.NumberOfInstances
*out = new(int32)
**out = **in
}
if in.MaxDBConnections != nil {
in, out := &in.MaxDBConnections, &out.MaxDBConnections
*out = new(int32)
**out = **in
}
out.Resources = in.Resources
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPool.
func (in *ConnectionPool) DeepCopy() *ConnectionPool {
if in == nil {
return nil
}
out := new(ConnectionPool)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ConnectionPoolConfiguration) DeepCopyInto(out *ConnectionPoolConfiguration) {
*out = *in
if in.NumberOfInstances != nil {
in, out := &in.NumberOfInstances, &out.NumberOfInstances
*out = new(int32)
**out = **in
}
if in.MaxDBConnections != nil {
in, out := &in.MaxDBConnections, &out.MaxDBConnections
*out = new(int32)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPoolConfiguration.
func (in *ConnectionPoolConfiguration) DeepCopy() *ConnectionPoolConfiguration {
if in == nil {
return nil
}
out := new(ConnectionPoolConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KubernetesMetaConfiguration) DeepCopyInto(out *KubernetesMetaConfiguration) {
*out = *in
@ -255,6 +308,7 @@ func (in *OperatorConfigurationData) DeepCopyInto(out *OperatorConfigurationData
out.LoggingRESTAPI = in.LoggingRESTAPI
out.Scalyr = in.Scalyr
out.LogicalBackup = in.LogicalBackup
in.ConnectionPool.DeepCopyInto(&out.ConnectionPool)
return
}
@ -417,6 +471,16 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) {
out.Volume = in.Volume
in.Patroni.DeepCopyInto(&out.Patroni)
out.Resources = in.Resources
if in.EnableConnectionPool != nil {
in, out := &in.EnableConnectionPool, &out.EnableConnectionPool
*out = new(bool)
**out = **in
}
if in.ConnectionPool != nil {
in, out := &in.ConnectionPool, &out.ConnectionPool
*out = new(ConnectionPool)
(*in).DeepCopyInto(*out)
}
if in.SpiloFSGroup != nil {
in, out := &in.SpiloFSGroup, &out.SpiloFSGroup
*out = new(int64)

View File

@ -11,6 +11,7 @@ import (
"sync"
"time"
"github.com/r3labs/diff"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@ -48,11 +49,26 @@ type Config struct {
PodServiceAccountRoleBinding *rbacv1.RoleBinding
}
// K8S objects that are belongs to a connection pool
type ConnectionPoolObjects struct {
Deployment *appsv1.Deployment
Service *v1.Service
// It could happen that a connection pool was enabled, but the operator was
// not able to properly process a corresponding event or was restarted. In
// this case we will miss missing/require situation and a lookup function
// will not be installed. To avoid synchronizing it all the time to prevent
// this, we can remember the result in memory at least until the next
// restart.
LookupFunction bool
}
type kubeResources struct {
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
ConnectionPool *ConnectionPoolObjects
PodDisruptionBudget *policybeta1.PodDisruptionBudget
//Pods are treated separately
//PVCs are treated separately
@ -184,7 +200,8 @@ func (c *Cluster) isNewCluster() bool {
func (c *Cluster) initUsers() error {
c.setProcessName("initializing users")
// clear our the previous state of the cluster users (in case we are running a sync).
// clear our the previous state of the cluster users (in case we are
// running a sync).
c.systemUsers = map[string]spec.PgUser{}
c.pgUsers = map[string]spec.PgUser{}
@ -292,8 +309,10 @@ func (c *Cluster) Create() error {
}
c.logger.Infof("pods are ready")
// create database objects unless we are running without pods or disabled that feature explicitly
// create database objects unless we are running without pods or disabled
// that feature explicitly
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
c.logger.Infof("Create roles")
if err = c.createRoles(); err != nil {
return fmt.Errorf("could not create users: %v", err)
}
@ -316,6 +335,26 @@ func (c *Cluster) Create() error {
c.logger.Errorf("could not list resources: %v", err)
}
// Create connection pool deployment and services if necessary. Since we
// need to peform some operations with the database itself (e.g. install
// lookup function), do it as the last step, when everything is available.
//
// Do not consider connection pool as a strict requirement, and if
// something fails, report warning
if c.needConnectionPool() {
if c.ConnectionPool != nil {
c.logger.Warning("Connection pool already exists in the cluster")
return nil
}
connPool, err := c.createConnectionPool(c.installLookupFunction)
if err != nil {
c.logger.Warningf("could not create connection pool: %v", err)
return nil
}
c.logger.Infof("connection pool %q has been successfully created",
util.NameFromMeta(connPool.Deployment.ObjectMeta))
}
return nil
}
@ -571,7 +610,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
}
if !reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) {
// connection pool needs one system user created, which is done in
// initUsers. Check if it needs to be called.
sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users)
needConnPool := c.needConnectionPoolWorker(&newSpec.Spec)
if !sameUsers || needConnPool {
c.logger.Debugf("syncing secrets")
if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users: %v", err)
@ -695,6 +738,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
}
// sync connection pool
if err := c.syncConnectionPool(oldSpec, newSpec, c.installLookupFunction); err != nil {
return fmt.Errorf("could not sync connection pool: %v", err)
}
return nil
}
@ -746,6 +794,12 @@ func (c *Cluster) Delete() {
c.logger.Warningf("could not remove leftover patroni objects; %v", err)
}
// Delete connection pool objects anyway, even if it's not mentioned in the
// manifest, just to not keep orphaned components in case if something went
// wrong
if err := c.deleteConnectionPool(); err != nil {
c.logger.Warningf("could not remove connection pool: %v", err)
}
}
//NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status).
@ -812,6 +866,35 @@ func (c *Cluster) initSystemUsers() {
Name: c.OpConfig.ReplicationUsername,
Password: util.RandomPassword(constants.PasswordLength),
}
// Connection pool user is an exception, if requested it's going to be
// created by operator as a normal pgUser
if c.needConnectionPool() {
// initialize empty connection pool if not done yet
if c.Spec.ConnectionPool == nil {
c.Spec.ConnectionPool = &acidv1.ConnectionPool{}
}
username := util.Coalesce(
c.Spec.ConnectionPool.User,
c.OpConfig.ConnectionPool.User)
// connection pooler application should be able to login with this role
connPoolUser := spec.PgUser{
Origin: spec.RoleConnectionPool,
Name: username,
Flags: []string{constants.RoleFlagLogin},
Password: util.RandomPassword(constants.PasswordLength),
}
if _, exists := c.pgUsers[username]; !exists {
c.pgUsers[username] = connPoolUser
}
if _, exists := c.systemUsers[constants.ConnectionPoolUserKeyName]; !exists {
c.systemUsers[constants.ConnectionPoolUserKeyName] = connPoolUser
}
}
}
func (c *Cluster) initRobotUsers() error {
@ -1138,3 +1221,119 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error {
return c.deleteClusterObject(get, deleteConfigMapFn, "configmap")
}
// Test if two connection pool configuration needs to be synced. For simplicity
// compare not the actual K8S objects, but the configuration itself and request
// sync if there is any difference.
func (c *Cluster) needSyncConnPoolSpecs(oldSpec, newSpec *acidv1.ConnectionPool) (sync bool, reasons []string) {
reasons = []string{}
sync = false
changelog, err := diff.Diff(oldSpec, newSpec)
if err != nil {
c.logger.Infof("Cannot get diff, do not do anything, %+v", err)
return false, reasons
}
if len(changelog) > 0 {
sync = true
}
for _, change := range changelog {
msg := fmt.Sprintf("%s %+v from '%+v' to '%+v'",
change.Type, change.Path, change.From, change.To)
reasons = append(reasons, msg)
}
return sync, reasons
}
func syncResources(a, b *v1.ResourceRequirements) bool {
for _, res := range []v1.ResourceName{
v1.ResourceCPU,
v1.ResourceMemory,
} {
if !a.Limits[res].Equal(b.Limits[res]) ||
!a.Requests[res].Equal(b.Requests[res]) {
return true
}
}
return false
}
// Check if we need to synchronize connection pool deployment due to new
// defaults, that are different from what we see in the DeploymentSpec
func (c *Cluster) needSyncConnPoolDefaults(
spec *acidv1.ConnectionPool,
deployment *appsv1.Deployment) (sync bool, reasons []string) {
reasons = []string{}
sync = false
config := c.OpConfig.ConnectionPool
podTemplate := deployment.Spec.Template
poolContainer := podTemplate.Spec.Containers[constants.ConnPoolContainer]
if spec == nil {
spec = &acidv1.ConnectionPool{}
}
if spec.NumberOfInstances == nil &&
*deployment.Spec.Replicas != *config.NumberOfInstances {
sync = true
msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)",
*deployment.Spec.Replicas, *config.NumberOfInstances)
reasons = append(reasons, msg)
}
if spec.DockerImage == "" &&
poolContainer.Image != config.Image {
sync = true
msg := fmt.Sprintf("DockerImage is different (having %s, required %s)",
poolContainer.Image, config.Image)
reasons = append(reasons, msg)
}
expectedResources, err := generateResourceRequirements(spec.Resources,
c.makeDefaultConnPoolResources())
// An error to generate expected resources means something is not quite
// right, but for the purpose of robustness do not panic here, just report
// and ignore resources comparison (in the worst case there will be no
// updates for new resource values).
if err == nil && syncResources(&poolContainer.Resources, expectedResources) {
sync = true
msg := fmt.Sprintf("Resources are different (having %+v, required %+v)",
poolContainer.Resources, expectedResources)
reasons = append(reasons, msg)
}
if err != nil {
c.logger.Warningf("Cannot generate expected resources, %v", err)
}
for _, env := range poolContainer.Env {
if spec.User == "" && env.Name == "PGUSER" {
ref := env.ValueFrom.SecretKeyRef.LocalObjectReference
if ref.Name != c.credentialSecretName(config.User) {
sync = true
msg := fmt.Sprintf("Pool user is different (having %s, required %s)",
ref.Name, config.User)
reasons = append(reasons, msg)
}
}
if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema {
sync = true
msg := fmt.Sprintf("Pool schema is different (having %s, required %s)",
env.Value, config.Schema)
reasons = append(reasons, msg)
}
}
return sync, reasons
}

View File

@ -9,6 +9,7 @@ import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/teams"
v1 "k8s.io/api/core/v1"
@ -704,3 +705,20 @@ func TestServiceAnnotations(t *testing.T) {
})
}
}
func TestInitSystemUsers(t *testing.T) {
testName := "Test system users initialization"
// default cluster without connection pool
cl.initSystemUsers()
if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; exist {
t.Errorf("%s, connection pool user is present", testName)
}
// cluster with connection pool
cl.Spec.EnableConnectionPool = boolToPointer(true)
cl.initSystemUsers()
if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; !exist {
t.Errorf("%s, connection pool user is not present", testName)
}
}

View File

@ -1,10 +1,12 @@
package cluster
import (
"bytes"
"database/sql"
"fmt"
"net"
"strings"
"text/template"
"time"
"github.com/lib/pq"
@ -28,13 +30,37 @@ const (
getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;`
createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";`
alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";`
connectionPoolLookup = `
CREATE SCHEMA IF NOT EXISTS {{.pool_schema}};
CREATE OR REPLACE FUNCTION {{.pool_schema}}.user_lookup(
in i_username text, out uname text, out phash text)
RETURNS record AS $$
BEGIN
SELECT usename, passwd FROM pg_catalog.pg_shadow
WHERE usename = i_username INTO uname, phash;
RETURN;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text)
FROM public, {{.pool_user}};
GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text)
TO {{.pool_user}};
GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}};
`
)
func (c *Cluster) pgConnectionString() string {
func (c *Cluster) pgConnectionString(dbname string) string {
password := c.systemUsers[constants.SuperuserKeyName].Password
return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user='%s' password='%s' connect_timeout='%d'",
if dbname == "" {
dbname = "postgres"
}
return fmt.Sprintf("host='%s' dbname='%s' sslmode=require user='%s' password='%s' connect_timeout='%d'",
fmt.Sprintf("%s.%s.svc.%s", c.Name, c.Namespace, c.OpConfig.ClusterDomain),
dbname,
c.systemUsers[constants.SuperuserKeyName].Name,
strings.Replace(password, "$", "\\$", -1),
constants.PostgresConnectTimeout/time.Second)
@ -49,13 +75,17 @@ func (c *Cluster) databaseAccessDisabled() bool {
}
func (c *Cluster) initDbConn() error {
return c.initDbConnWithName("")
}
func (c *Cluster) initDbConnWithName(dbname string) error {
c.setProcessName("initializing db connection")
if c.pgDb != nil {
return nil
}
var conn *sql.DB
connstring := c.pgConnectionString()
connstring := c.pgConnectionString(dbname)
finalerr := retryutil.Retry(constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout,
func() (bool, error) {
@ -94,6 +124,10 @@ func (c *Cluster) initDbConn() error {
return nil
}
func (c *Cluster) connectionIsClosed() bool {
return c.pgDb == nil
}
func (c *Cluster) closeDbConn() (err error) {
c.setProcessName("closing db connection")
if c.pgDb != nil {
@ -243,3 +277,88 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin
return result
}
// Creates a connection pool credentials lookup function in every database to
// perform remote authentification.
func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
var stmtBytes bytes.Buffer
c.logger.Info("Installing lookup function")
if err := c.initDbConn(); err != nil {
return fmt.Errorf("could not init database connection")
}
defer func() {
if c.connectionIsClosed() {
return
}
if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err)
}
}()
currentDatabases, err := c.getDatabases()
if err != nil {
msg := "could not get databases to install pool lookup function: %v"
return fmt.Errorf(msg, err)
}
templater := template.Must(template.New("sql").Parse(connectionPoolLookup))
for dbname, _ := range currentDatabases {
if dbname == "template0" || dbname == "template1" {
continue
}
if err := c.initDbConnWithName(dbname); err != nil {
return fmt.Errorf("could not init database connection to %s", dbname)
}
c.logger.Infof("Install pool lookup function into %s", dbname)
params := TemplateParams{
"pool_schema": poolSchema,
"pool_user": poolUser,
}
if err := templater.Execute(&stmtBytes, params); err != nil {
c.logger.Errorf("could not prepare sql statement %+v: %v",
params, err)
// process other databases
continue
}
// golang sql will do retries couple of times if pq driver reports
// connections issues (driver.ErrBadConn), but since our query is
// idempotent, we can retry in a view of other errors (e.g. due to
// failover a db is temporary in a read-only mode or so) to make sure
// it was applied.
execErr := retryutil.Retry(
constants.PostgresConnectTimeout,
constants.PostgresConnectRetryTimeout,
func() (bool, error) {
if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil {
msg := fmt.Errorf("could not execute sql statement %s: %v",
stmtBytes.String(), err)
return false, msg
}
return true, nil
})
if execErr != nil {
c.logger.Errorf("could not execute after retries %s: %v",
stmtBytes.String(), err)
// process other databases
continue
}
c.logger.Infof("Pool lookup function installed into %s", dbname)
if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err)
}
}
c.ConnectionPool.LookupFunction = true
return nil
}

View File

@ -18,6 +18,7 @@ import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec"
pkgspec "github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
@ -32,10 +33,12 @@ const (
patroniPGBinariesParameterName = "bin_dir"
patroniPGParametersParameterName = "parameters"
patroniPGHBAConfParameterName = "pg_hba"
localHost = "127.0.0.1/32"
connectionPoolContainer = "connection-pool"
pgPort = 5432
// the gid of the postgres user in the default spilo image
spiloPostgresGID = 103
localHost = "127.0.0.1/32"
)
type pgUser struct {
@ -71,6 +74,10 @@ func (c *Cluster) statefulSetName() string {
return c.Name
}
func (c *Cluster) connPoolName() string {
return c.Name + "-pooler"
}
func (c *Cluster) endpointName(role PostgresRole) string {
name := c.Name
if role == Replica {
@ -89,6 +96,28 @@ func (c *Cluster) serviceName(role PostgresRole) string {
return name
}
func (c *Cluster) serviceAddress(role PostgresRole) string {
service, exist := c.Services[role]
if exist {
return service.ObjectMeta.Name
}
c.logger.Warningf("No service for role %s", role)
return ""
}
func (c *Cluster) servicePort(role PostgresRole) string {
service, exist := c.Services[role]
if exist {
return fmt.Sprint(service.Spec.Ports[0].Port)
}
c.logger.Warningf("No service for role %s", role)
return ""
}
func (c *Cluster) podDisruptionBudgetName() string {
return c.OpConfig.PDBNameFormat.Format("cluster", c.Name)
}
@ -97,10 +126,39 @@ func (c *Cluster) makeDefaultResources() acidv1.Resources {
config := c.OpConfig
defaultRequests := acidv1.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest}
defaultLimits := acidv1.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit}
defaultRequests := acidv1.ResourceDescription{
CPU: config.Resources.DefaultCPURequest,
Memory: config.Resources.DefaultMemoryRequest,
}
defaultLimits := acidv1.ResourceDescription{
CPU: config.Resources.DefaultCPULimit,
Memory: config.Resources.DefaultMemoryLimit,
}
return acidv1.Resources{ResourceRequests: defaultRequests, ResourceLimits: defaultLimits}
return acidv1.Resources{
ResourceRequests: defaultRequests,
ResourceLimits: defaultLimits,
}
}
// Generate default resource section for connection pool deployment, to be used
// if nothing custom is specified in the manifest
func (c *Cluster) makeDefaultConnPoolResources() acidv1.Resources {
config := c.OpConfig
defaultRequests := acidv1.ResourceDescription{
CPU: config.ConnectionPool.ConnPoolDefaultCPURequest,
Memory: config.ConnectionPool.ConnPoolDefaultMemoryRequest,
}
defaultLimits := acidv1.ResourceDescription{
CPU: config.ConnectionPool.ConnPoolDefaultCPULimit,
Memory: config.ConnectionPool.ConnPoolDefaultMemoryLimit,
}
return acidv1.Resources{
ResourceRequests: defaultRequests,
ResourceLimits: defaultLimits,
}
}
func generateResourceRequirements(resources acidv1.Resources, defaultResources acidv1.Resources) (*v1.ResourceRequirements, error) {
@ -124,12 +182,12 @@ func generateResourceRequirements(resources acidv1.Resources, defaultResources a
return &result, nil
}
func fillResourceList(pgSpec acidv1.ResourceDescription, defaults acidv1.ResourceDescription) (v1.ResourceList, error) {
func fillResourceList(spec acidv1.ResourceDescription, defaults acidv1.ResourceDescription) (v1.ResourceList, error) {
var err error
requests := v1.ResourceList{}
if pgSpec.CPU != "" {
requests[v1.ResourceCPU], err = resource.ParseQuantity(pgSpec.CPU)
if spec.CPU != "" {
requests[v1.ResourceCPU], err = resource.ParseQuantity(spec.CPU)
if err != nil {
return nil, fmt.Errorf("could not parse CPU quantity: %v", err)
}
@ -139,8 +197,8 @@ func fillResourceList(pgSpec acidv1.ResourceDescription, defaults acidv1.Resourc
return nil, fmt.Errorf("could not parse default CPU quantity: %v", err)
}
}
if pgSpec.Memory != "" {
requests[v1.ResourceMemory], err = resource.ParseQuantity(pgSpec.Memory)
if spec.Memory != "" {
requests[v1.ResourceMemory], err = resource.ParseQuantity(spec.Memory)
if err != nil {
return nil, fmt.Errorf("could not parse memory quantity: %v", err)
}
@ -320,7 +378,11 @@ func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]stri
return *tolerationsSpec
}
if len(podToleration["key"]) > 0 || len(podToleration["operator"]) > 0 || len(podToleration["value"]) > 0 || len(podToleration["effect"]) > 0 {
if len(podToleration["key"]) > 0 ||
len(podToleration["operator"]) > 0 ||
len(podToleration["value"]) > 0 ||
len(podToleration["effect"]) > 0 {
return []v1.Toleration{
{
Key: podToleration["key"],
@ -424,9 +486,9 @@ func generateSidecarContainers(sidecars []acidv1.Sidecar,
// Check whether or not we're requested to mount an shm volume,
// taking into account that PostgreSQL manifest has precedence.
func mountShmVolumeNeeded(opConfig config.Config, pgSpec *acidv1.PostgresSpec) *bool {
if pgSpec.ShmVolume != nil && *pgSpec.ShmVolume {
return pgSpec.ShmVolume
func mountShmVolumeNeeded(opConfig config.Config, spec *acidv1.PostgresSpec) *bool {
if spec.ShmVolume != nil && *spec.ShmVolume {
return spec.ShmVolume
}
return opConfig.ShmVolume
@ -550,10 +612,6 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri
Name: "KUBERNETES_ROLE_LABEL",
Value: c.OpConfig.PodRoleLabel,
},
{
Name: "KUBERNETES_LABELS",
Value: labels.Set(c.OpConfig.ClusterLabels).String(),
},
{
Name: "PGPASSWORD_SUPERUSER",
ValueFrom: &v1.EnvVarSource{
@ -589,6 +647,12 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri
Value: c.OpConfig.PamRoleName,
},
}
// Spilo expects cluster labels as JSON
if clusterLabels, err := json.Marshal(labels.Set(c.OpConfig.ClusterLabels)); err != nil {
envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_LABELS", Value: labels.Set(c.OpConfig.ClusterLabels).String()})
} else {
envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_LABELS", Value: string(clusterLabels)})
}
if spiloConfiguration != "" {
envVars = append(envVars, v1.EnvVar{Name: "SPILO_CONFIGURATION", Value: spiloConfiguration})
}
@ -767,7 +831,7 @@ func (c *Cluster) getNewPgVersion(container v1.Container, newPgVersion string) (
return newPgVersion, nil
}
func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.StatefulSet, error) {
func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.StatefulSet, error) {
var (
err error
@ -783,14 +847,14 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
// controller adjusts the default memory request at operator startup
request := pgSpec.Resources.ResourceRequests.Memory
request := spec.Resources.ResourceRequests.Memory
if request == "" {
request = c.OpConfig.DefaultMemoryRequest
request = c.OpConfig.Resources.DefaultMemoryRequest
}
limit := pgSpec.Resources.ResourceLimits.Memory
limit := spec.Resources.ResourceLimits.Memory
if limit == "" {
limit = c.OpConfig.DefaultMemoryLimit
limit = c.OpConfig.Resources.DefaultMemoryLimit
}
isSmaller, err := util.IsSmallerQuantity(request, limit)
@ -799,7 +863,7 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
}
if isSmaller {
c.logger.Warningf("The memory request of %v for the Postgres container is increased to match the memory limit of %v.", request, limit)
pgSpec.Resources.ResourceRequests.Memory = limit
spec.Resources.ResourceRequests.Memory = limit
}
@ -807,17 +871,17 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
// as this sidecar is managed separately
// adjust sidecar containers defined for that particular cluster
for _, sidecar := range pgSpec.Sidecars {
for _, sidecar := range spec.Sidecars {
// TODO #413
sidecarRequest := sidecar.Resources.ResourceRequests.Memory
if request == "" {
request = c.OpConfig.DefaultMemoryRequest
request = c.OpConfig.Resources.DefaultMemoryRequest
}
sidecarLimit := sidecar.Resources.ResourceLimits.Memory
if limit == "" {
limit = c.OpConfig.DefaultMemoryLimit
limit = c.OpConfig.Resources.DefaultMemoryLimit
}
isSmaller, err := util.IsSmallerQuantity(sidecarRequest, sidecarLimit)
@ -834,21 +898,21 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
defaultResources := c.makeDefaultResources()
resourceRequirements, err := generateResourceRequirements(pgSpec.Resources, defaultResources)
resourceRequirements, err := generateResourceRequirements(spec.Resources, defaultResources)
if err != nil {
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
}
if pgSpec.InitContainers != nil && len(pgSpec.InitContainers) > 0 {
if spec.InitContainers != nil && len(spec.InitContainers) > 0 {
if c.OpConfig.EnableInitContainers != nil && !(*c.OpConfig.EnableInitContainers) {
c.logger.Warningf("initContainers specified but disabled in configuration - next statefulset creation would fail")
}
initContainers = pgSpec.InitContainers
initContainers = spec.InitContainers
}
customPodEnvVarsList := make([]v1.EnvVar, 0)
if c.OpConfig.PodEnvironmentConfigMap != (spec.NamespacedName{}) {
if c.OpConfig.PodEnvironmentConfigMap != (pkgspec.NamespacedName{}) {
var cm *v1.ConfigMap
cm, err = c.KubeClient.ConfigMaps(c.OpConfig.PodEnvironmentConfigMap.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap.Name, metav1.GetOptions{})
if err != nil {
@ -866,33 +930,33 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
sort.Slice(customPodEnvVarsList,
func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name })
}
if pgSpec.StandbyCluster != nil && pgSpec.StandbyCluster.S3WalPath == "" {
if spec.StandbyCluster != nil && spec.StandbyCluster.S3WalPath == "" {
return nil, fmt.Errorf("s3_wal_path is empty for standby cluster")
}
// backward compatible check for InitContainers
if pgSpec.InitContainersOld != nil {
if spec.InitContainersOld != nil {
msg := "Manifest parameter init_containers is deprecated."
if pgSpec.InitContainers == nil {
if spec.InitContainers == nil {
c.logger.Warningf("%s Consider using initContainers instead.", msg)
pgSpec.InitContainers = pgSpec.InitContainersOld
spec.InitContainers = spec.InitContainersOld
} else {
c.logger.Warningf("%s Only value from initContainers is used", msg)
}
}
// backward compatible check for PodPriorityClassName
if pgSpec.PodPriorityClassNameOld != "" {
if spec.PodPriorityClassNameOld != "" {
msg := "Manifest parameter pod_priority_class_name is deprecated."
if pgSpec.PodPriorityClassName == "" {
if spec.PodPriorityClassName == "" {
c.logger.Warningf("%s Consider using podPriorityClassName instead.", msg)
pgSpec.PodPriorityClassName = pgSpec.PodPriorityClassNameOld
spec.PodPriorityClassName = spec.PodPriorityClassNameOld
} else {
c.logger.Warningf("%s Only value from podPriorityClassName is used", msg)
}
}
spiloConfiguration, err := generateSpiloJSONConfiguration(&pgSpec.PostgresqlParam, &pgSpec.Patroni, c.OpConfig.PamRoleName, c.logger)
spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, c.OpConfig.PamRoleName, c.logger)
if err != nil {
return nil, fmt.Errorf("could not generate Spilo JSON configuration: %v", err)
}
@ -901,24 +965,24 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
spiloEnvVars := c.generateSpiloPodEnvVars(
c.Postgresql.GetUID(),
spiloConfiguration,
&pgSpec.Clone,
pgSpec.StandbyCluster,
&spec.Clone,
spec.StandbyCluster,
customPodEnvVarsList,
)
// pickup the docker image for the spilo container
effectiveDockerImage := util.Coalesce(pgSpec.DockerImage, c.OpConfig.DockerImage)
effectiveDockerImage := util.Coalesce(spec.DockerImage, c.OpConfig.DockerImage)
// determine the FSGroup for the spilo pod
effectiveFSGroup := c.OpConfig.Resources.SpiloFSGroup
if pgSpec.SpiloFSGroup != nil {
effectiveFSGroup = pgSpec.SpiloFSGroup
if spec.SpiloFSGroup != nil {
effectiveFSGroup = spec.SpiloFSGroup
}
volumeMounts := generateVolumeMounts(pgSpec.Volume)
volumeMounts := generateVolumeMounts(spec.Volume)
// configure TLS with a custom secret volume
if pgSpec.TLS != nil && pgSpec.TLS.SecretName != "" {
if spec.TLS != nil && spec.TLS.SecretName != "" {
if effectiveFSGroup == nil {
c.logger.Warnf("Setting the default FSGroup to satisfy the TLS configuration")
fsGroup := int64(spiloPostgresGID)
@ -931,7 +995,7 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
Name: "tls-secret",
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: pgSpec.TLS.SecretName,
SecretName: spec.TLS.SecretName,
DefaultMode: &defaultMode,
},
},
@ -945,16 +1009,16 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
})
// use the same filenames as Secret resources by default
certFile := ensurePath(pgSpec.TLS.CertificateFile, mountPath, "tls.crt")
privateKeyFile := ensurePath(pgSpec.TLS.PrivateKeyFile, mountPath, "tls.key")
certFile := ensurePath(spec.TLS.CertificateFile, mountPath, "tls.crt")
privateKeyFile := ensurePath(spec.TLS.PrivateKeyFile, mountPath, "tls.key")
spiloEnvVars = append(
spiloEnvVars,
v1.EnvVar{Name: "SSL_CERTIFICATE_FILE", Value: certFile},
v1.EnvVar{Name: "SSL_PRIVATE_KEY_FILE", Value: privateKeyFile},
)
if pgSpec.TLS.CAFile != "" {
caFile := ensurePath(pgSpec.TLS.CAFile, mountPath, "")
if spec.TLS.CAFile != "" {
caFile := ensurePath(spec.TLS.CAFile, mountPath, "")
spiloEnvVars = append(
spiloEnvVars,
v1.EnvVar{Name: "SSL_CA_FILE", Value: caFile},
@ -973,7 +1037,7 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
)
// resolve conflicts between operator-global and per-cluster sidecars
sideCars := c.mergeSidecars(pgSpec.Sidecars)
sideCars := c.mergeSidecars(spec.Sidecars)
resourceRequirementsScalyrSidecar := makeResources(
c.OpConfig.ScalyrCPURequest,
@ -1003,10 +1067,10 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
}
}
tolerationSpec := tolerations(&pgSpec.Tolerations, c.OpConfig.PodToleration)
effectivePodPriorityClassName := util.Coalesce(pgSpec.PodPriorityClassName, c.OpConfig.PodPriorityClassName)
tolerationSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration)
effectivePodPriorityClassName := util.Coalesce(spec.PodPriorityClassName, c.OpConfig.PodPriorityClassName)
annotations := c.generatePodAnnotations(pgSpec)
annotations := c.generatePodAnnotations(spec)
// generate pod template for the statefulset, based on the spilo container and sidecars
podTemplate, err = generatePodTemplate(
@ -1023,7 +1087,7 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
c.OpConfig.PodServiceAccountName,
c.OpConfig.KubeIAMRole,
effectivePodPriorityClassName,
mountShmVolumeNeeded(c.OpConfig, pgSpec),
mountShmVolumeNeeded(c.OpConfig, spec),
c.OpConfig.EnablePodAntiAffinity,
c.OpConfig.PodAntiAffinityTopologyKey,
c.OpConfig.AdditionalSecretMount,
@ -1034,12 +1098,12 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
return nil, fmt.Errorf("could not generate pod template: %v", err)
}
if volumeClaimTemplate, err = generatePersistentVolumeClaimTemplate(pgSpec.Volume.Size,
pgSpec.Volume.StorageClass); err != nil {
if volumeClaimTemplate, err = generatePersistentVolumeClaimTemplate(spec.Volume.Size,
spec.Volume.StorageClass); err != nil {
return nil, fmt.Errorf("could not generate volume claim template: %v", err)
}
numberOfInstances := c.getNumberOfInstances(pgSpec)
numberOfInstances := c.getNumberOfInstances(spec)
// the operator has domain-specific logic on how to do rolling updates of PG clusters
// so we do not use default rolling updates implemented by stateful sets
@ -1076,13 +1140,13 @@ func (c *Cluster) generateStatefulSet(pgSpec *acidv1.PostgresSpec) (*appsv1.Stat
return statefulSet, nil
}
func (c *Cluster) generatePodAnnotations(pgSpec *acidv1.PostgresSpec) map[string]string {
func (c *Cluster) generatePodAnnotations(spec *acidv1.PostgresSpec) map[string]string {
annotations := make(map[string]string)
for k, v := range c.OpConfig.CustomPodAnnotations {
annotations[k] = v
}
if pgSpec != nil || pgSpec.PodAnnotations != nil {
for k, v := range pgSpec.PodAnnotations {
if spec != nil || spec.PodAnnotations != nil {
for k, v := range spec.PodAnnotations {
annotations[k] = v
}
}
@ -1148,13 +1212,13 @@ func (c *Cluster) mergeSidecars(sidecars []acidv1.Sidecar) []acidv1.Sidecar {
return result
}
func (c *Cluster) getNumberOfInstances(pgSpec *acidv1.PostgresSpec) int32 {
func (c *Cluster) getNumberOfInstances(spec *acidv1.PostgresSpec) int32 {
min := c.OpConfig.MinInstances
max := c.OpConfig.MaxInstances
cur := pgSpec.NumberOfInstances
cur := spec.NumberOfInstances
newcur := cur
if pgSpec.StandbyCluster != nil {
if spec.StandbyCluster != nil {
if newcur == 1 {
min = newcur
max = newcur
@ -1309,15 +1373,15 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser)
return &secret
}
func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, pgSpec *acidv1.PostgresSpec) bool {
func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *acidv1.PostgresSpec) bool {
switch role {
case Replica:
// if the value is explicitly set in a Postgresql manifest, follow this setting
if pgSpec.EnableReplicaLoadBalancer != nil {
return *pgSpec.EnableReplicaLoadBalancer
if spec.EnableReplicaLoadBalancer != nil {
return *spec.EnableReplicaLoadBalancer
}
// otherwise, follow the operator configuration
@ -1325,8 +1389,8 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, pgSpec *
case Master:
if pgSpec.EnableMasterLoadBalancer != nil {
return *pgSpec.EnableMasterLoadBalancer
if spec.EnableMasterLoadBalancer != nil {
return *spec.EnableMasterLoadBalancer
}
return c.OpConfig.EnableMasterLoadBalancer
@ -1337,7 +1401,7 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, pgSpec *
}
func (c *Cluster) generateService(role PostgresRole, pgSpec *acidv1.PostgresSpec) *v1.Service {
func (c *Cluster) generateService(role PostgresRole, spec *acidv1.PostgresSpec) *v1.Service {
serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}},
Type: v1.ServiceTypeClusterIP,
@ -1347,12 +1411,12 @@ func (c *Cluster) generateService(role PostgresRole, pgSpec *acidv1.PostgresSpec
serviceSpec.Selector = c.roleLabelsSet(false, role)
}
if c.shouldCreateLoadBalancerForService(role, pgSpec) {
if c.shouldCreateLoadBalancerForService(role, spec) {
// pgSpec.AllowedSourceRanges evaluates to the empty slice of zero length
// spec.AllowedSourceRanges evaluates to the empty slice of zero length
// when omitted or set to 'null'/empty sequence in the PG manifest
if len(pgSpec.AllowedSourceRanges) > 0 {
serviceSpec.LoadBalancerSourceRanges = pgSpec.AllowedSourceRanges
if len(spec.AllowedSourceRanges) > 0 {
serviceSpec.LoadBalancerSourceRanges = spec.AllowedSourceRanges
} else {
// safe default value: lock a load balancer only to the local address unless overridden explicitly
serviceSpec.LoadBalancerSourceRanges = []string{localHost}
@ -1371,7 +1435,7 @@ func (c *Cluster) generateService(role PostgresRole, pgSpec *acidv1.PostgresSpec
Name: c.serviceName(role),
Namespace: c.Namespace,
Labels: c.roleLabelsSet(true, role),
Annotations: c.generateServiceAnnotations(role, pgSpec),
Annotations: c.generateServiceAnnotations(role, spec),
},
Spec: serviceSpec,
}
@ -1379,19 +1443,19 @@ func (c *Cluster) generateService(role PostgresRole, pgSpec *acidv1.PostgresSpec
return service
}
func (c *Cluster) generateServiceAnnotations(role PostgresRole, pgSpec *acidv1.PostgresSpec) map[string]string {
func (c *Cluster) generateServiceAnnotations(role PostgresRole, spec *acidv1.PostgresSpec) map[string]string {
annotations := make(map[string]string)
for k, v := range c.OpConfig.CustomServiceAnnotations {
annotations[k] = v
}
if pgSpec != nil || pgSpec.ServiceAnnotations != nil {
for k, v := range pgSpec.ServiceAnnotations {
if spec != nil || spec.ServiceAnnotations != nil {
for k, v := range spec.ServiceAnnotations {
annotations[k] = v
}
}
if c.shouldCreateLoadBalancerForService(role, pgSpec) {
if c.shouldCreateLoadBalancerForService(role, spec) {
var dnsName string
if role == Master {
dnsName = c.masterDNSName()
@ -1779,6 +1843,306 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) {
return "logical-backup-" + c.clusterName().Name
}
// Generate pool size related environment variables.
//
// MAX_DB_CONN would specify the global maximum for connections to a target
// database.
//
// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough.
//
// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when
// most of the queries coming through a connection pooler are from the same
// user to the same db). In case if we want to spin up more connection pool
// instances, take this into account and maintain the same number of
// connections.
//
// MIN_SIZE is a pool minimal size, to prevent situation when sudden workload
// have to wait for spinning up a new connections.
//
// RESERVE_SIZE is how many additional connections to allow for a pool.
func (c *Cluster) getConnPoolEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar {
effectiveMode := util.Coalesce(
spec.ConnectionPool.Mode,
c.OpConfig.ConnectionPool.Mode)
numberOfInstances := spec.ConnectionPool.NumberOfInstances
if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32(
c.OpConfig.ConnectionPool.NumberOfInstances,
k8sutil.Int32ToPointer(1))
}
effectiveMaxDBConn := util.CoalesceInt32(
spec.ConnectionPool.MaxDBConnections,
c.OpConfig.ConnectionPool.MaxDBConnections)
if effectiveMaxDBConn == nil {
effectiveMaxDBConn = k8sutil.Int32ToPointer(
constants.ConnPoolMaxDBConnections)
}
maxDBConn := *effectiveMaxDBConn / *numberOfInstances
defaultSize := maxDBConn / 2
minSize := defaultSize / 2
reserveSize := minSize
return []v1.EnvVar{
{
Name: "CONNECTION_POOL_PORT",
Value: fmt.Sprint(pgPort),
},
{
Name: "CONNECTION_POOL_MODE",
Value: effectiveMode,
},
{
Name: "CONNECTION_POOL_DEFAULT_SIZE",
Value: fmt.Sprint(defaultSize),
},
{
Name: "CONNECTION_POOL_MIN_SIZE",
Value: fmt.Sprint(minSize),
},
{
Name: "CONNECTION_POOL_RESERVE_SIZE",
Value: fmt.Sprint(reserveSize),
},
{
Name: "CONNECTION_POOL_MAX_CLIENT_CONN",
Value: fmt.Sprint(constants.ConnPoolMaxClientConnections),
},
{
Name: "CONNECTION_POOL_MAX_DB_CONN",
Value: fmt.Sprint(maxDBConn),
},
}
}
func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) (
*v1.PodTemplateSpec, error) {
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
resources, err := generateResourceRequirements(
spec.ConnectionPool.Resources,
c.makeDefaultConnPoolResources())
effectiveDockerImage := util.Coalesce(
spec.ConnectionPool.DockerImage,
c.OpConfig.ConnectionPool.Image)
effectiveSchema := util.Coalesce(
spec.ConnectionPool.Schema,
c.OpConfig.ConnectionPool.Schema)
if err != nil {
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
}
secretSelector := func(key string) *v1.SecretKeySelector {
effectiveUser := util.Coalesce(
spec.ConnectionPool.User,
c.OpConfig.ConnectionPool.User)
return &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: c.credentialSecretName(effectiveUser),
},
Key: key,
}
}
envVars := []v1.EnvVar{
{
Name: "PGHOST",
Value: c.serviceAddress(Master),
},
{
Name: "PGPORT",
Value: c.servicePort(Master),
},
{
Name: "PGUSER",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("username"),
},
},
// the convention is to use the same schema name as
// connection pool username
{
Name: "PGSCHEMA",
Value: effectiveSchema,
},
{
Name: "PGPASSWORD",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("password"),
},
},
}
envVars = append(envVars, c.getConnPoolEnvVars(spec)...)
poolerContainer := v1.Container{
Name: connectionPoolContainer,
Image: effectiveDockerImage,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: *resources,
Ports: []v1.ContainerPort{
{
ContainerPort: pgPort,
Protocol: v1.ProtocolTCP,
},
},
Env: envVars,
}
podTemplate := &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: c.connPoolLabelsSelector().MatchLabels,
Namespace: c.Namespace,
Annotations: c.generatePodAnnotations(spec),
},
Spec: v1.PodSpec{
ServiceAccountName: c.OpConfig.PodServiceAccountName,
TerminationGracePeriodSeconds: &gracePeriod,
Containers: []v1.Container{poolerContainer},
// TODO: add tolerations to scheduler pooler on the same node
// as database
//Tolerations: *tolerationsSpec,
},
}
return podTemplate, nil
}
// Return an array of ownerReferences to make an arbitraty object dependent on
// the StatefulSet. Dependency is made on StatefulSet instead of PostgreSQL CRD
// while the former is represent the actual state, and only it's deletion means
// we delete the cluster (e.g. if CRD was deleted, StatefulSet somehow
// survived, we can't delete an object because it will affect the functioning
// cluster).
func (c *Cluster) ownerReferences() []metav1.OwnerReference {
controller := true
if c.Statefulset == nil {
c.logger.Warning("Cannot get owner reference, no statefulset")
return []metav1.OwnerReference{}
}
return []metav1.OwnerReference{
{
UID: c.Statefulset.ObjectMeta.UID,
APIVersion: "apps/v1",
Kind: "StatefulSet",
Name: c.Statefulset.ObjectMeta.Name,
Controller: &controller,
},
}
}
func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) (
*appsv1.Deployment, error) {
// there are two ways to enable connection pooler, either to specify a
// connectionPool section or enableConnectionPool. In the second case
// spec.connectionPool will be nil, so to make it easier to calculate
// default values, initialize it to an empty structure. It could be done
// anywhere, but here is the earliest common entry point between sync and
// create code, so init here.
if spec.ConnectionPool == nil {
spec.ConnectionPool = &acidv1.ConnectionPool{}
}
podTemplate, err := c.generateConnPoolPodTemplate(spec)
numberOfInstances := spec.ConnectionPool.NumberOfInstances
if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32(
c.OpConfig.ConnectionPool.NumberOfInstances,
k8sutil.Int32ToPointer(1))
}
if *numberOfInstances < constants.ConnPoolMinInstances {
msg := "Adjusted number of connection pool instances from %d to %d"
c.logger.Warningf(msg, numberOfInstances, constants.ConnPoolMinInstances)
*numberOfInstances = constants.ConnPoolMinInstances
}
if err != nil {
return nil, err
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: c.connPoolName(),
Namespace: c.Namespace,
Labels: c.connPoolLabelsSelector().MatchLabels,
Annotations: map[string]string{},
// make StatefulSet object its owner to represent the dependency.
// By itself StatefulSet is being deleted with "Orphaned"
// propagation policy, which means that it's deletion will not
// clean up this deployment, but there is a hope that this object
// will be garbage collected if something went wrong and operator
// didn't deleted it.
OwnerReferences: c.ownerReferences(),
},
Spec: appsv1.DeploymentSpec{
Replicas: numberOfInstances,
Selector: c.connPoolLabelsSelector(),
Template: *podTemplate,
},
}
return deployment, nil
}
func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service {
// there are two ways to enable connection pooler, either to specify a
// connectionPool section or enableConnectionPool. In the second case
// spec.connectionPool will be nil, so to make it easier to calculate
// default values, initialize it to an empty structure. It could be done
// anywhere, but here is the earliest common entry point between sync and
// create code, so init here.
if spec.ConnectionPool == nil {
spec.ConnectionPool = &acidv1.ConnectionPool{}
}
serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: c.connPoolName(),
Port: pgPort,
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)},
},
},
Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{
"connection-pool": c.connPoolName(),
},
}
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: c.connPoolName(),
Namespace: c.Namespace,
Labels: c.connPoolLabelsSelector().MatchLabels,
Annotations: map[string]string{},
// make StatefulSet object its owner to represent the dependency.
// By itself StatefulSet is being deleted with "Orphaned"
// propagation policy, which means that it's deletion will not
// clean up this service, but there is a hope that this object will
// be garbage collected if something went wrong and operator didn't
// deleted it.
OwnerReferences: c.ownerReferences(),
},
Spec: serviceSpec,
}
return service
}
func ensurePath(file string, defaultDir string, defaultFile string) string {
if file == "" {
return path.Join(defaultDir, defaultFile)

View File

@ -1,6 +1,8 @@
package cluster
import (
"errors"
"fmt"
"reflect"
"testing"
@ -13,6 +15,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -582,6 +585,375 @@ func TestSecretVolume(t *testing.T) {
}
}
func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec) error {
cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"]
if cpuReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest {
return fmt.Errorf("CPU request doesn't match, got %s, expected %s",
cpuReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest)
}
memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"]
if memReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest {
return fmt.Errorf("Memory request doesn't match, got %s, expected %s",
memReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest)
}
cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"]
if cpuLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit {
return fmt.Errorf("CPU limit doesn't match, got %s, expected %s",
cpuLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit)
}
memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"]
if memLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit {
return fmt.Errorf("Memory limit doesn't match, got %s, expected %s",
memLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit)
}
return nil
}
func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec) error {
poolLabels := podSpec.ObjectMeta.Labels["connection-pool"]
if poolLabels != cluster.connPoolLabelsSelector().MatchLabels["connection-pool"] {
return fmt.Errorf("Pod labels do not match, got %+v, expected %+v",
podSpec.ObjectMeta.Labels, cluster.connPoolLabelsSelector().MatchLabels)
}
return nil
}
func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec) error {
required := map[string]bool{
"PGHOST": false,
"PGPORT": false,
"PGUSER": false,
"PGSCHEMA": false,
"PGPASSWORD": false,
"CONNECTION_POOL_MODE": false,
"CONNECTION_POOL_PORT": false,
}
envs := podSpec.Spec.Containers[0].Env
for _, env := range envs {
required[env.Name] = true
}
for env, value := range required {
if !value {
return fmt.Errorf("Environment variable %s is not present", env)
}
}
return nil
}
func testCustomPodTemplate(cluster *Cluster, podSpec *v1.PodTemplateSpec) error {
if podSpec.ObjectMeta.Name != "test-pod-template" {
return fmt.Errorf("Custom pod template is not used, current spec %+v",
podSpec)
}
return nil
}
func TestConnPoolPodSpec(t *testing.T) {
testName := "Test connection pool pod template generation"
var cluster = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
ConnectionPool: config.ConnectionPool{
MaxDBConnections: int32ToPointer(60),
ConnPoolDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
var clusterNoDefaultRes = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
ConnectionPool: config.ConnectionPool{},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil }
tests := []struct {
subTest string
spec *acidv1.PostgresSpec
expected error
cluster *Cluster
check func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error
}{
{
subTest: "default configuration",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
expected: nil,
cluster: cluster,
check: noCheck,
},
{
subTest: "no default resources",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`),
cluster: clusterNoDefaultRes,
check: noCheck,
},
{
subTest: "default resources are set",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
expected: nil,
cluster: cluster,
check: testResources,
},
{
subTest: "labels for service",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
expected: nil,
cluster: cluster,
check: testLabels,
},
{
subTest: "required envs",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
expected: nil,
cluster: cluster,
check: testEnvs,
},
}
for _, tt := range tests {
podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec)
if err != tt.expected && err.Error() != tt.expected.Error() {
t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v",
testName, tt.subTest, err, tt.expected)
}
err = tt.check(cluster, podSpec)
if err != nil {
t.Errorf("%s [%s]: Pod spec is incorrect, %+v",
testName, tt.subTest, err)
}
}
}
func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployment) error {
owner := deployment.ObjectMeta.OwnerReferences[0]
if owner.Name != cluster.Statefulset.ObjectMeta.Name {
return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s",
owner.Name, cluster.Statefulset.ObjectMeta.Name)
}
return nil
}
func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error {
labels := deployment.Spec.Selector.MatchLabels
expected := cluster.connPoolLabelsSelector().MatchLabels
if labels["connection-pool"] != expected["connection-pool"] {
return fmt.Errorf("Labels are incorrect, got %+v, expected %+v",
labels, expected)
}
return nil
}
func TestConnPoolDeploymentSpec(t *testing.T) {
testName := "Test connection pool deployment spec generation"
var cluster = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
ConnectionPool: config.ConnectionPool{
ConnPoolDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sts",
},
}
noCheck := func(cluster *Cluster, deployment *appsv1.Deployment) error {
return nil
}
tests := []struct {
subTest string
spec *acidv1.PostgresSpec
expected error
cluster *Cluster
check func(cluster *Cluster, deployment *appsv1.Deployment) error
}{
{
subTest: "default configuration",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
expected: nil,
cluster: cluster,
check: noCheck,
},
{
subTest: "owner reference",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
expected: nil,
cluster: cluster,
check: testDeploymentOwnwerReference,
},
{
subTest: "selector",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
expected: nil,
cluster: cluster,
check: testSelector,
},
}
for _, tt := range tests {
deployment, err := tt.cluster.generateConnPoolDeployment(tt.spec)
if err != tt.expected && err.Error() != tt.expected.Error() {
t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v",
testName, tt.subTest, err, tt.expected)
}
err = tt.check(cluster, deployment)
if err != nil {
t.Errorf("%s [%s]: Deployment spec is incorrect, %+v",
testName, tt.subTest, err)
}
}
}
func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error {
owner := service.ObjectMeta.OwnerReferences[0]
if owner.Name != cluster.Statefulset.ObjectMeta.Name {
return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s",
owner.Name, cluster.Statefulset.ObjectMeta.Name)
}
return nil
}
func testServiceSelector(cluster *Cluster, service *v1.Service) error {
selector := service.Spec.Selector
if selector["connection-pool"] != cluster.connPoolName() {
return fmt.Errorf("Selector is incorrect, got %s, expected %s",
selector["connection-pool"], cluster.connPoolName())
}
return nil
}
func TestConnPoolServiceSpec(t *testing.T) {
testName := "Test connection pool service spec generation"
var cluster = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
ConnectionPool: config.ConnectionPool{
ConnPoolDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sts",
},
}
noCheck := func(cluster *Cluster, deployment *v1.Service) error {
return nil
}
tests := []struct {
subTest string
spec *acidv1.PostgresSpec
cluster *Cluster
check func(cluster *Cluster, deployment *v1.Service) error
}{
{
subTest: "default configuration",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
cluster: cluster,
check: noCheck,
},
{
subTest: "owner reference",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
cluster: cluster,
check: testServiceOwnwerReference,
},
{
subTest: "selector",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
cluster: cluster,
check: testServiceSelector,
},
}
for _, tt := range tests {
service := tt.cluster.generateConnPoolService(tt.spec)
if err := tt.check(cluster, service); err != nil {
t.Errorf("%s [%s]: Service spec is incorrect, %+v",
testName, tt.subTest, err)
}
}
}
func TestTLS(t *testing.T) {
var err error
var spec acidv1.PostgresSpec

View File

@ -90,6 +90,132 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) {
return statefulSet, nil
}
// Prepare the database for connection pool to be used, i.e. install lookup
// function (do it first, because it should be fast and if it didn't succeed,
// it doesn't makes sense to create more K8S objects. At this moment we assume
// that necessary connection pool user exists.
//
// After that create all the objects for connection pool, namely a deployment
// with a chosen pooler and a service to expose it.
func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolObjects, error) {
var msg string
c.setProcessName("creating connection pool")
schema := c.Spec.ConnectionPool.Schema
if schema == "" {
schema = c.OpConfig.ConnectionPool.Schema
}
user := c.Spec.ConnectionPool.User
if user == "" {
user = c.OpConfig.ConnectionPool.User
}
err := lookup(schema, user)
if err != nil {
msg = "could not prepare database for connection pool: %v"
return nil, fmt.Errorf(msg, err)
}
deploymentSpec, err := c.generateConnPoolDeployment(&c.Spec)
if err != nil {
msg = "could not generate deployment for connection pool: %v"
return nil, fmt.Errorf(msg, err)
}
// client-go does retry 10 times (with NoBackoff by default) when the API
// believe a request can be retried and returns Retry-After header. This
// should be good enough to not think about it here.
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(deploymentSpec)
if err != nil {
return nil, err
}
serviceSpec := c.generateConnPoolService(&c.Spec)
service, err := c.KubeClient.
Services(serviceSpec.Namespace).
Create(serviceSpec)
if err != nil {
return nil, err
}
c.ConnectionPool = &ConnectionPoolObjects{
Deployment: deployment,
Service: service,
}
c.logger.Debugf("created new connection pool %q, uid: %q",
util.NameFromMeta(deployment.ObjectMeta), deployment.UID)
return c.ConnectionPool, nil
}
func (c *Cluster) deleteConnectionPool() (err error) {
c.setProcessName("deleting connection pool")
c.logger.Debugln("deleting connection pool")
// Lack of connection pooler objects is not a fatal error, just log it if
// it was present before in the manifest
if c.ConnectionPool == nil {
c.logger.Infof("No connection pool to delete")
return nil
}
// Clean up the deployment object. If deployment resource we've remembered
// is somehow empty, try to delete based on what would we generate
deploymentName := c.connPoolName()
deployment := c.ConnectionPool.Deployment
if deployment != nil {
deploymentName = deployment.Name
}
// set delete propagation policy to foreground, so that replica set will be
// also deleted.
policy := metav1.DeletePropagationForeground
options := metav1.DeleteOptions{PropagationPolicy: &policy}
err = c.KubeClient.
Deployments(c.Namespace).
Delete(deploymentName, &options)
if !k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Connection pool deployment was already deleted")
} else if err != nil {
return fmt.Errorf("could not delete deployment: %v", err)
}
c.logger.Infof("Connection pool deployment %q has been deleted", deploymentName)
// Repeat the same for the service object
service := c.ConnectionPool.Service
serviceName := c.connPoolName()
if service != nil {
serviceName = service.Name
}
// set delete propagation policy to foreground, so that all the dependant
// will be deleted.
err = c.KubeClient.
Services(c.Namespace).
Delete(serviceName, &options)
if !k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Connection pool service was already deleted")
} else if err != nil {
return fmt.Errorf("could not delete service: %v", err)
}
c.logger.Infof("Connection pool service %q has been deleted", serviceName)
c.ConnectionPool = nil
return nil
}
func getPodIndex(podName string) (int32, error) {
parts := strings.Split(podName, "-")
if len(parts) == 0 {
@ -674,3 +800,34 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet {
func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget {
return c.PodDisruptionBudget
}
// Perform actual patching of a connection pool deployment, assuming that all
// the check were already done before.
func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) {
c.setProcessName("updating connection pool")
if c.ConnectionPool == nil || c.ConnectionPool.Deployment == nil {
return nil, fmt.Errorf("there is no connection pool in the cluster")
}
patchData, err := specPatch(newDeployment.Spec)
if err != nil {
return nil, fmt.Errorf("could not form patch for the deployment: %v", err)
}
// An update probably requires RetryOnConflict, but since only one operator
// worker at one time will try to update it chances of conflicts are
// minimal.
deployment, err := c.KubeClient.
Deployments(c.ConnectionPool.Deployment.Namespace).
Patch(
c.ConnectionPool.Deployment.Name,
types.MergePatchType,
patchData, "")
if err != nil {
return nil, fmt.Errorf("could not patch deployment: %v", err)
}
c.ConnectionPool.Deployment = deployment
return deployment, nil
}

View File

@ -0,0 +1,127 @@
package cluster
import (
"testing"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func mockInstallLookupFunction(schema string, user string) error {
return nil
}
func boolToPointer(value bool) *bool {
return &value
}
func TestConnPoolCreationAndDeletion(t *testing.T) {
testName := "Test connection pool creation"
var cluster = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
ConnectionPool: config.ConnectionPool{
ConnPoolDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger)
cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sts",
},
}
cluster.Spec = acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
}
poolResources, err := cluster.createConnectionPool(mockInstallLookupFunction)
if err != nil {
t.Errorf("%s: Cannot create connection pool, %s, %+v",
testName, err, poolResources)
}
if poolResources.Deployment == nil {
t.Errorf("%s: Connection pool deployment is empty", testName)
}
if poolResources.Service == nil {
t.Errorf("%s: Connection pool service is empty", testName)
}
err = cluster.deleteConnectionPool()
if err != nil {
t.Errorf("%s: Cannot delete connection pool, %s", testName, err)
}
}
func TestNeedConnPool(t *testing.T) {
testName := "Test how connection pool can be enabled"
var cluster = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
ConnectionPool: config.ConnectionPool{
ConnPoolDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger)
cluster.Spec = acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
}
if !cluster.needConnectionPool() {
t.Errorf("%s: Connection pool is not enabled with full definition",
testName)
}
cluster.Spec = acidv1.PostgresSpec{
EnableConnectionPool: boolToPointer(true),
}
if !cluster.needConnectionPool() {
t.Errorf("%s: Connection pool is not enabled with flag",
testName)
}
cluster.Spec = acidv1.PostgresSpec{
EnableConnectionPool: boolToPointer(false),
ConnectionPool: &acidv1.ConnectionPool{},
}
if cluster.needConnectionPool() {
t.Errorf("%s: Connection pool is still enabled with flag being false",
testName)
}
cluster.Spec = acidv1.PostgresSpec{
EnableConnectionPool: boolToPointer(true),
ConnectionPool: &acidv1.ConnectionPool{},
}
if !cluster.needConnectionPool() {
t.Errorf("%s: Connection pool is not enabled with flag and full",
testName)
}
}

View File

@ -23,6 +23,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
c.mu.Lock()
defer c.mu.Unlock()
oldSpec := c.Postgresql
c.setSpec(newSpec)
defer func() {
@ -108,6 +109,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
}
// sync connection pool
if err = c.syncConnectionPool(&oldSpec, newSpec, c.installLookupFunction); err != nil {
return fmt.Errorf("could not sync connection pool: %v", err)
}
return err
}
@ -424,7 +430,9 @@ func (c *Cluster) syncSecrets() error {
}
pwdUser := userMap[secretUsername]
// if this secret belongs to the infrastructure role and the password has changed - replace it in the secret
if pwdUser.Password != string(secret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure {
if pwdUser.Password != string(secret.Data["password"]) &&
pwdUser.Origin == spec.RoleOriginInfrastructure {
c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name)
if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil {
return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err)
@ -468,6 +476,16 @@ func (c *Cluster) syncRoles() (err error) {
for _, u := range c.pgUsers {
userNames = append(userNames, u.Name)
}
if c.needConnectionPool() {
connPoolUser := c.systemUsers[constants.ConnectionPoolUserKeyName]
userNames = append(userNames, connPoolUser.Name)
if _, exists := c.pgUsers[connPoolUser.Name]; !exists {
c.pgUsers[connPoolUser.Name] = connPoolUser
}
}
dbUsers, err = c.readPgUsersFromDatabase(userNames)
if err != nil {
return fmt.Errorf("error getting users from the database: %v", err)
@ -600,3 +618,165 @@ func (c *Cluster) syncLogicalBackupJob() error {
return nil
}
func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error {
if c.ConnectionPool == nil {
c.ConnectionPool = &ConnectionPoolObjects{}
}
newNeedConnPool := c.needConnectionPoolWorker(&newSpec.Spec)
oldNeedConnPool := c.needConnectionPoolWorker(&oldSpec.Spec)
if newNeedConnPool {
// Try to sync in any case. If we didn't needed connection pool before,
// it means we want to create it. If it was already present, still sync
// since it could happen that there is no difference in specs, and all
// the resources are remembered, but the deployment was manualy deleted
// in between
c.logger.Debug("syncing connection pool")
// in this case also do not forget to install lookup function as for
// creating cluster
if !oldNeedConnPool || !c.ConnectionPool.LookupFunction {
newConnPool := newSpec.Spec.ConnectionPool
specSchema := ""
specUser := ""
if newConnPool != nil {
specSchema = newConnPool.Schema
specUser = newConnPool.User
}
schema := util.Coalesce(
specSchema,
c.OpConfig.ConnectionPool.Schema)
user := util.Coalesce(
specUser,
c.OpConfig.ConnectionPool.User)
if err := lookup(schema, user); err != nil {
return err
}
}
if err := c.syncConnectionPoolWorker(oldSpec, newSpec); err != nil {
c.logger.Errorf("could not sync connection pool: %v", err)
return err
}
}
if oldNeedConnPool && !newNeedConnPool {
// delete and cleanup resources
if err := c.deleteConnectionPool(); err != nil {
c.logger.Warningf("could not remove connection pool: %v", err)
}
}
if !oldNeedConnPool && !newNeedConnPool {
// delete and cleanup resources if not empty
if c.ConnectionPool != nil &&
(c.ConnectionPool.Deployment != nil ||
c.ConnectionPool.Service != nil) {
if err := c.deleteConnectionPool(); err != nil {
c.logger.Warningf("could not remove connection pool: %v", err)
}
}
}
return nil
}
// Synchronize connection pool resources. Effectively we're interested only in
// synchronizing the corresponding deployment, but in case of deployment or
// service is missing, create it. After checking, also remember an object for
// the future references.
func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) error {
deployment, err := c.KubeClient.
Deployments(c.Namespace).
Get(c.connPoolName(), metav1.GetOptions{})
if err != nil && k8sutil.ResourceNotFound(err) {
msg := "Deployment %s for connection pool synchronization is not found, create it"
c.logger.Warningf(msg, c.connPoolName())
deploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec)
if err != nil {
msg = "could not generate deployment for connection pool: %v"
return fmt.Errorf(msg, err)
}
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(deploymentSpec)
if err != nil {
return err
}
c.ConnectionPool.Deployment = deployment
} else if err != nil {
return fmt.Errorf("could not get connection pool deployment to sync: %v", err)
} else {
c.ConnectionPool.Deployment = deployment
// actual synchronization
oldConnPool := oldSpec.Spec.ConnectionPool
newConnPool := newSpec.Spec.ConnectionPool
specSync, specReason := c.needSyncConnPoolSpecs(oldConnPool, newConnPool)
defaultsSync, defaultsReason := c.needSyncConnPoolDefaults(newConnPool, deployment)
reason := append(specReason, defaultsReason...)
if specSync || defaultsSync {
c.logger.Infof("Update connection pool deployment %s, reason: %+v",
c.connPoolName(), reason)
newDeploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec)
if err != nil {
msg := "could not generate deployment for connection pool: %v"
return fmt.Errorf(msg, err)
}
oldDeploymentSpec := c.ConnectionPool.Deployment
deployment, err := c.updateConnPoolDeployment(
oldDeploymentSpec,
newDeploymentSpec)
if err != nil {
return err
}
c.ConnectionPool.Deployment = deployment
return nil
}
}
service, err := c.KubeClient.
Services(c.Namespace).
Get(c.connPoolName(), metav1.GetOptions{})
if err != nil && k8sutil.ResourceNotFound(err) {
msg := "Service %s for connection pool synchronization is not found, create it"
c.logger.Warningf(msg, c.connPoolName())
serviceSpec := c.generateConnPoolService(&newSpec.Spec)
service, err := c.KubeClient.
Services(serviceSpec.Namespace).
Create(serviceSpec)
if err != nil {
return err
}
c.ConnectionPool.Service = service
} else if err != nil {
return fmt.Errorf("could not get connection pool service to sync: %v", err)
} else {
// Service updates are not supported and probably not that useful anyway
c.ConnectionPool.Service = service
}
return nil
}

212
pkg/cluster/sync_test.go Normal file
View File

@ -0,0 +1,212 @@
package cluster
import (
"fmt"
"testing"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func int32ToPointer(value int32) *int32 {
return &value
}
func deploymentUpdated(cluster *Cluster, err error) error {
if cluster.ConnectionPool.Deployment.Spec.Replicas == nil ||
*cluster.ConnectionPool.Deployment.Spec.Replicas != 2 {
return fmt.Errorf("Wrong nubmer of instances")
}
return nil
}
func objectsAreSaved(cluster *Cluster, err error) error {
if cluster.ConnectionPool == nil {
return fmt.Errorf("Connection pool resources are empty")
}
if cluster.ConnectionPool.Deployment == nil {
return fmt.Errorf("Deployment was not saved")
}
if cluster.ConnectionPool.Service == nil {
return fmt.Errorf("Service was not saved")
}
return nil
}
func objectsAreDeleted(cluster *Cluster, err error) error {
if cluster.ConnectionPool != nil {
return fmt.Errorf("Connection pool was not deleted")
}
return nil
}
func TestConnPoolSynchronization(t *testing.T) {
testName := "Test connection pool synchronization"
var cluster = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
ConnectionPool: config.ConnectionPool{
ConnPoolDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100Mi",
ConnPoolDefaultMemoryLimit: "100Mi",
NumberOfInstances: int32ToPointer(1),
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sts",
},
}
clusterMissingObjects := *cluster
clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects()
clusterMock := *cluster
clusterMock.KubeClient = k8sutil.NewMockKubernetesClient()
clusterDirtyMock := *cluster
clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient()
clusterDirtyMock.ConnectionPool = &ConnectionPoolObjects{
Deployment: &appsv1.Deployment{},
Service: &v1.Service{},
}
clusterNewDefaultsMock := *cluster
clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient()
cluster.OpConfig.ConnectionPool.Image = "pooler:2.0"
cluster.OpConfig.ConnectionPool.NumberOfInstances = int32ToPointer(2)
tests := []struct {
subTest string
oldSpec *acidv1.Postgresql
newSpec *acidv1.Postgresql
cluster *Cluster
check func(cluster *Cluster, err error) error
}{
{
subTest: "create if doesn't exist",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
},
cluster: &clusterMissingObjects,
check: objectsAreSaved,
},
{
subTest: "create if doesn't exist with a flag",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
EnableConnectionPool: boolToPointer(true),
},
},
cluster: &clusterMissingObjects,
check: objectsAreSaved,
},
{
subTest: "create from scratch",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
},
cluster: &clusterMissingObjects,
check: objectsAreSaved,
},
{
subTest: "delete if not needed",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{},
},
cluster: &clusterMock,
check: objectsAreDeleted,
},
{
subTest: "cleanup if still there",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{},
},
cluster: &clusterDirtyMock,
check: objectsAreDeleted,
},
{
subTest: "update deployment",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{
NumberOfInstances: int32ToPointer(1),
},
},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{
NumberOfInstances: int32ToPointer(2),
},
},
},
cluster: &clusterMock,
check: deploymentUpdated,
},
{
subTest: "update image from changed defaults",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
},
cluster: &clusterNewDefaultsMock,
check: deploymentUpdated,
},
}
for _, tt := range tests {
err := tt.cluster.syncConnectionPool(tt.oldSpec, tt.newSpec, mockInstallLookupFunction)
if err := tt.check(tt.cluster, err); err != nil {
t.Errorf("%s [%s]: Could not synchronize, %+v",
testName, tt.subTest, err)
}
}
}

View File

@ -69,3 +69,7 @@ type ClusterStatus struct {
Spec acidv1.PostgresSpec
Error error
}
type TemplateParams map[string]interface{}
type InstallFunction func(schema string, user string) error

View File

@ -408,7 +408,32 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set {
}
func (c *Cluster) labelsSelector() *metav1.LabelSelector {
return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil}
return &metav1.LabelSelector{
MatchLabels: c.labelsSet(false),
MatchExpressions: nil,
}
}
// Return connection pool labels selector, which should from one point of view
// inherit most of the labels from the cluster itself, but at the same time
// have e.g. different `application` label, so that recreatePod operation will
// not interfere with it (it lists all the pods via labels, and if there would
// be no difference, it will recreate also pooler pods).
func (c *Cluster) connPoolLabelsSelector() *metav1.LabelSelector {
connPoolLabels := labels.Set(map[string]string{})
extraLabels := labels.Set(map[string]string{
"connection-pool": c.connPoolName(),
"application": "db-connection-pool",
})
connPoolLabels = labels.Merge(connPoolLabels, c.labelsSet(false))
connPoolLabels = labels.Merge(connPoolLabels, extraLabels)
return &metav1.LabelSelector{
MatchLabels: connPoolLabels,
MatchExpressions: nil,
}
}
func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set {
@ -483,3 +508,15 @@ func (c *Cluster) GetSpec() (*acidv1.Postgresql, error) {
func (c *Cluster) patroniUsesKubernetes() bool {
return c.OpConfig.EtcdHost == ""
}
func (c *Cluster) needConnectionPoolWorker(spec *acidv1.PostgresSpec) bool {
if spec.EnableConnectionPool == nil {
return spec.ConnectionPool != nil
} else {
return *spec.EnableConnectionPool
}
}
func (c *Cluster) needConnectionPool() bool {
return c.needConnectionPoolWorker(&c.Spec)
}

View File

@ -8,6 +8,7 @@ import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -21,6 +22,10 @@ func (c *Controller) readOperatorConfigurationFromCRD(configObjectNamespace, con
return config, nil
}
func int32ToPointer(value int32) *int32 {
return &value
}
// importConfigurationFromCRD is a transitional function that converts CRD configuration to the one based on the configmap
func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigurationData) *config.Config {
result := &config.Config{}
@ -143,5 +148,51 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit
result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit
// Connection pool. Looks like we can't use defaulting in CRD before 1.17,
// so ensure default values here.
result.ConnectionPool.NumberOfInstances = util.CoalesceInt32(
fromCRD.ConnectionPool.NumberOfInstances,
int32ToPointer(2))
result.ConnectionPool.NumberOfInstances = util.MaxInt32(
result.ConnectionPool.NumberOfInstances,
int32ToPointer(2))
result.ConnectionPool.Schema = util.Coalesce(
fromCRD.ConnectionPool.Schema,
constants.ConnectionPoolSchemaName)
result.ConnectionPool.User = util.Coalesce(
fromCRD.ConnectionPool.User,
constants.ConnectionPoolUserName)
result.ConnectionPool.Image = util.Coalesce(
fromCRD.ConnectionPool.Image,
"registry.opensource.zalan.do/acid/pgbouncer")
result.ConnectionPool.Mode = util.Coalesce(
fromCRD.ConnectionPool.Mode,
constants.ConnectionPoolDefaultMode)
result.ConnectionPool.ConnPoolDefaultCPURequest = util.Coalesce(
fromCRD.ConnectionPool.DefaultCPURequest,
constants.ConnectionPoolDefaultCpuRequest)
result.ConnectionPool.ConnPoolDefaultMemoryRequest = util.Coalesce(
fromCRD.ConnectionPool.DefaultMemoryRequest,
constants.ConnectionPoolDefaultMemoryRequest)
result.ConnectionPool.ConnPoolDefaultCPULimit = util.Coalesce(
fromCRD.ConnectionPool.DefaultCPULimit,
constants.ConnectionPoolDefaultCpuLimit)
result.ConnectionPool.ConnPoolDefaultMemoryLimit = util.Coalesce(
fromCRD.ConnectionPool.DefaultMemoryLimit,
constants.ConnectionPoolDefaultMemoryLimit)
result.ConnectionPool.MaxDBConnections = util.CoalesceInt32(
fromCRD.ConnectionPool.MaxDBConnections,
int32ToPointer(constants.ConnPoolMaxDBConnections))
return result
}

View File

@ -23,13 +23,15 @@ const fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespa
// RoleOrigin contains the code of the origin of a role
type RoleOrigin int
// The rolesOrigin constant values must be sorted by the role priority for resolveNameConflict(...) to work.
// The rolesOrigin constant values must be sorted by the role priority for
// resolveNameConflict(...) to work.
const (
RoleOriginUnknown RoleOrigin = iota
RoleOriginManifest
RoleOriginInfrastructure
RoleOriginTeamsAPI
RoleOriginSystem
RoleConnectionPool
)
type syncUserOperation int
@ -178,6 +180,8 @@ func (r RoleOrigin) String() string {
return "teams API role"
case RoleOriginSystem:
return "system role"
case RoleConnectionPool:
return "connection pool role"
default:
panic(fmt.Sprintf("bogus role origin value %d", r))
}

View File

@ -8,6 +8,7 @@ import (
"fmt"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util/constants"
)
// CRD describes CustomResourceDefinition specific configuration parameters
@ -83,6 +84,20 @@ type LogicalBackup struct {
LogicalBackupS3SSE string `name:"logical_backup_s3_sse" default:"AES256"`
}
// Operator options for connection pooler
type ConnectionPool struct {
NumberOfInstances *int32 `name:"connection_pool_number_of_instances" default:"2"`
Schema string `name:"connection_pool_schema" default:"pooler"`
User string `name:"connection_pool_user" default:"pooler"`
Image string `name:"connection_pool_image" default:"registry.opensource.zalan.do/acid/pgbouncer"`
Mode string `name:"connection_pool_mode" default:"transaction"`
MaxDBConnections *int32 `name:"connection_pool_max_db_connections" default:"60"`
ConnPoolDefaultCPURequest string `name:"connection_pool_default_cpu_request" default:"500m"`
ConnPoolDefaultMemoryRequest string `name:"connection_pool_default_memory_request" default:"100Mi"`
ConnPoolDefaultCPULimit string `name:"connection_pool_default_cpu_limit" default:"1"`
ConnPoolDefaultMemoryLimit string `name:"connection_pool_default_memory_limit" default:"100Mi"`
}
// Config describes operator config
type Config struct {
CRD
@ -90,6 +105,7 @@ type Config struct {
Auth
Scalyr
LogicalBackup
ConnectionPool
WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to'
EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS
@ -196,5 +212,10 @@ func validate(cfg *Config) (err error) {
if cfg.Workers == 0 {
err = fmt.Errorf("number of workers should be higher than 0")
}
if *cfg.ConnectionPool.NumberOfInstances < constants.ConnPoolMinInstances {
msg := "number of connection pool instances should be higher than %d"
err = fmt.Errorf(msg, constants.ConnPoolMinInstances)
}
return
}

View File

@ -0,0 +1,18 @@
package constants
// Connection pool specific constants
const (
ConnectionPoolUserName = "pooler"
ConnectionPoolSchemaName = "pooler"
ConnectionPoolDefaultType = "pgbouncer"
ConnectionPoolDefaultMode = "transaction"
ConnectionPoolDefaultCpuRequest = "500m"
ConnectionPoolDefaultCpuLimit = "1"
ConnectionPoolDefaultMemoryRequest = "100Mi"
ConnectionPoolDefaultMemoryLimit = "100Mi"
ConnPoolContainer = 0
ConnPoolMaxDBConnections = 60
ConnPoolMaxClientConnections = 10000
ConnPoolMinInstances = 2
)

View File

@ -2,15 +2,16 @@ package constants
// Roles specific constants
const (
PasswordLength = 64
SuperuserKeyName = "superuser"
ReplicationUserKeyName = "replication"
RoleFlagSuperuser = "SUPERUSER"
RoleFlagInherit = "INHERIT"
RoleFlagLogin = "LOGIN"
RoleFlagNoLogin = "NOLOGIN"
RoleFlagCreateRole = "CREATEROLE"
RoleFlagCreateDB = "CREATEDB"
RoleFlagReplication = "REPLICATION"
RoleFlagByPassRLS = "BYPASSRLS"
PasswordLength = 64
SuperuserKeyName = "superuser"
ConnectionPoolUserKeyName = "pooler"
ReplicationUserKeyName = "replication"
RoleFlagSuperuser = "SUPERUSER"
RoleFlagInherit = "INHERIT"
RoleFlagLogin = "LOGIN"
RoleFlagNoLogin = "NOLOGIN"
RoleFlagCreateRole = "CREATEROLE"
RoleFlagCreateDB = "CREATEDB"
RoleFlagReplication = "REPLICATION"
RoleFlagByPassRLS = "BYPASSRLS"
)

View File

@ -9,11 +9,13 @@ import (
batchv1beta1 "k8s.io/api/batch/v1beta1"
clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1"
apiappsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policybeta1 "k8s.io/api/policy/v1beta1"
apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@ -26,6 +28,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func Int32ToPointer(value int32) *int32 {
return &value
}
// KubernetesClient describes getters for Kubernetes objects
type KubernetesClient struct {
corev1.SecretsGetter
@ -39,6 +45,7 @@ type KubernetesClient struct {
corev1.NamespacesGetter
corev1.ServiceAccountsGetter
appsv1.StatefulSetsGetter
appsv1.DeploymentsGetter
rbacv1.RoleBindingsGetter
policyv1beta1.PodDisruptionBudgetsGetter
apiextbeta1.CustomResourceDefinitionsGetter
@ -55,6 +62,34 @@ type mockSecret struct {
type MockSecretGetter struct {
}
type mockDeployment struct {
appsv1.DeploymentInterface
}
type mockDeploymentNotExist struct {
appsv1.DeploymentInterface
}
type MockDeploymentGetter struct {
}
type MockDeploymentNotExistGetter struct {
}
type mockService struct {
corev1.ServiceInterface
}
type mockServiceNotExist struct {
corev1.ServiceInterface
}
type MockServiceGetter struct {
}
type MockServiceNotExistGetter struct {
}
type mockConfigMap struct {
corev1.ConfigMapInterface
}
@ -101,6 +136,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient.NodesGetter = client.CoreV1()
kubeClient.NamespacesGetter = client.CoreV1()
kubeClient.StatefulSetsGetter = client.AppsV1()
kubeClient.DeploymentsGetter = client.AppsV1()
kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1()
kubeClient.RESTClient = client.CoreV1().RESTClient()
kubeClient.RoleBindingsGetter = client.RbacV1()
@ -230,19 +266,145 @@ func (c *mockConfigMap) Get(name string, options metav1.GetOptions) (*v1.ConfigM
}
// Secrets to be mocked
func (c *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface {
func (mock *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface {
return &mockSecret{}
}
// ConfigMaps to be mocked
func (c *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface {
func (mock *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface {
return &mockConfigMap{}
}
func (mock *MockDeploymentGetter) Deployments(namespace string) appsv1.DeploymentInterface {
return &mockDeployment{}
}
func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.DeploymentInterface {
return &mockDeploymentNotExist{}
}
func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) {
return &apiappsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
},
Spec: apiappsv1.DeploymentSpec{
Replicas: Int32ToPointer(1),
},
}, nil
}
func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) error {
return nil
}
func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
return &apiappsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
},
Spec: apiappsv1.DeploymentSpec{
Replicas: Int32ToPointer(1),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Image: "pooler:1.0",
},
},
},
},
},
}, nil
}
func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) {
return &apiappsv1.Deployment{
Spec: apiappsv1.DeploymentSpec{
Replicas: Int32ToPointer(2),
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
},
}, nil
}
func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
return nil, &apierrors.StatusError{
ErrStatus: metav1.Status{
Reason: metav1.StatusReasonNotFound,
},
}
}
func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) {
return &apiappsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
},
Spec: apiappsv1.DeploymentSpec{
Replicas: Int32ToPointer(1),
},
}, nil
}
func (mock *MockServiceGetter) Services(namespace string) corev1.ServiceInterface {
return &mockService{}
}
func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.ServiceInterface {
return &mockServiceNotExist{}
}
func (mock *mockService) Create(*v1.Service) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
},
}, nil
}
func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error {
return nil
}
func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
},
}, nil
}
func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
},
}, nil
}
func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) {
return nil, &apierrors.StatusError{
ErrStatus: metav1.Status{
Reason: metav1.StatusReasonNotFound,
},
}
}
// NewMockKubernetesClient for other tests
func NewMockKubernetesClient() KubernetesClient {
return KubernetesClient{
SecretsGetter: &MockSecretGetter{},
ConfigMapsGetter: &MockConfigMapsGetter{},
SecretsGetter: &MockSecretGetter{},
ConfigMapsGetter: &MockConfigMapsGetter{},
DeploymentsGetter: &MockDeploymentGetter{},
ServicesGetter: &MockServiceGetter{},
}
}
func ClientMissingObjects() KubernetesClient {
return KubernetesClient{
DeploymentsGetter: &MockDeploymentNotExistGetter{},
ServicesGetter: &MockServiceNotExistGetter{},
}
}

View File

@ -147,6 +147,40 @@ func Coalesce(val, defaultVal string) string {
return val
}
// Yeah, golang
func CoalesceInt32(val, defaultVal *int32) *int32 {
if val == nil {
return defaultVal
}
return val
}
// Test if any of the values is nil
func testNil(values ...*int32) bool {
for _, v := range values {
if v == nil {
return true
}
}
return false
}
// Return maximum of two integers provided via pointers. If one value is not
// defined, return the other one. If both are not defined, result is also
// undefined, caller needs to check for that.
func MaxInt32(a, b *int32) *int32 {
if testNil(a, b) {
return nil
}
if *a > *b {
return a
}
return b
}
// IsSmallerQuantity : checks if first resource is of a smaller quantity than the second
func IsSmallerQuantity(requestStr, limitStr string) (bool, error) {

View File

@ -20,7 +20,7 @@ spec:
serviceAccountName: postgres-operator-ui
containers:
- name: "service"
image: registry.opensource.zalan.do/acid/postgres-operator-ui:v1.3.0
image: registry.opensource.zalan.do/acid/postgres-operator-ui:v1.4.0
ports:
- containerPort: 8081
protocol: "TCP"