Merge branch 'master' into observed-generation

This commit is contained in:
Felix Kunde 2026-06-01 18:08:59 +02:00 committed by GitHub
commit 3e6d0ddbb9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
53 changed files with 5048 additions and 557 deletions

View File

@ -34,6 +34,12 @@ jobs:
OPERATOR_IMAGE="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${GITHUB_REF/refs\/tags\//}"
echo "OPERATOR_IMAGE=$OPERATOR_IMAGE" >> $GITHUB_OUTPUT
- name: Define pooler image name
id: image_pooler
run: |
POOLER_IMAGE="${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/pgbouncer:${GITHUB_REF/refs\/tags\//}"
echo "POOLER_IMAGE=$POOLER_IMAGE" >> $GITHUB_OUTPUT
- name: Define UI image name
id: image_ui
run: |
@ -69,6 +75,15 @@ jobs:
tags: "${{ steps.image.outputs.OPERATOR_IMAGE }}"
platforms: linux/amd64,linux/arm64
- name: Build and push multiarch pooler image to ghcr
uses: docker/build-push-action@v3
with:
context: pooler
push: true
build-args: BASE_IMAGE=alpine:3.22
tags: "${{ steps.image_pooler.outputs.POOLER_IMAGE }}"
platforms: linux/amd64,linux/arm64
- name: Build and push multiarch ui image to ghcr
uses: docker/build-push-action@v3
with:

View File

@ -1,4 +1,4 @@
.PHONY: clean local test linux macos mocks docker push e2e
.PHONY: clean local test linux macos mocks docker pooler push e2e
BINARY ?= postgres-operator
BUILD_FLAGS ?= -v
@ -49,6 +49,7 @@ endif
PATH := $(GOPATH)/bin:$(PATH)
SHELL := env PATH="$(PATH)" $(SHELL)
IMAGE_TAG := $(IMAGE):$(TAG)$(CDP_TAG)$(DEBUG_FRESH)$(DEBUG_POSTFIX)
POOLER_TAG := $(IMAGE)/pgbouncer:$(TAG)$(CDP_TAG)$(DEBUG_FRESH)$(DEBUG_POSTFIX)
default: local
@ -78,6 +79,9 @@ $(GENERATED_CRDS): $(GENERATED)
local: ${SOURCES} $(GENERATED_CRDS)
CGO_ENABLED=${CGO_ENABLED} go build -o build/${BINARY} $(LOCAL_BUILD_FLAGS) -ldflags "$(LDFLAGS)" $(SOURCES)
wasm: ${SOURCES} $(GENERATED_CRDS)
GOOS=wasip1 GOARCH=wasm CGO_ENABLED=${CGO_ENABLED} go build -o build/${BINARY}.wasm ${BUILD_FLAGS} -ldflags "$(LDFLAGS)" $(SOURCES)
linux: ${SOURCES} $(GENERATED_CRDS)
GOOS=linux GOARCH=amd64 CGO_ENABLED=${CGO_ENABLED} go build -o build/linux/${BINARY} ${BUILD_FLAGS} -ldflags "$(LDFLAGS)" $(SOURCES)
@ -92,6 +96,9 @@ docker: $(GENERATED_CRDS) ${DOCKERDIR}/${DOCKERFILE}
echo "git describe $(shell git describe --tags --always --dirty)"
docker build --rm -t "$(IMAGE_TAG)" -f "${DOCKERDIR}/${DOCKERFILE}" --build-arg VERSION="${VERSION}" --build-arg BASE_IMAGE="${BASE_IMAGE}" .
pooler:
cd pooler; docker build --rm -t "$(POOLER_TAG)" --build-arg VERSION="${VERSION}" --build-arg BASE_IMAGE="${BASE_IMAGE}" .
indocker-race:
docker run --rm -v "${GOPATH}":"${GOPATH}" -e GOPATH="${GOPATH}" -e RACE=1 -w ${PWD} golang:1.25.3 bash -c "make linux"
@ -110,5 +117,5 @@ test: mocks $(GENERATED) $(GENERATED_CRDS)
codegen: $(GENERATED)
e2e: docker # build operator image to be tested
e2e: docker pooler # build operator and pooler images to be tested
cd e2e; make e2etest

View File

@ -79,6 +79,9 @@ spec:
enable_lazy_spilo_upgrade:
type: boolean
default: false
enable_maintenance_windows:
type: boolean
default: true
enable_pgversion_env_var:
type: boolean
default: true
@ -669,7 +672,7 @@ spec:
default: "pooler"
connection_pooler_image:
type: string
default: "registry.opensource.zalan.do/acid/pgbouncer:master-32"
default: "ghcr.io/zalando/postgres-operator/pgbouncer:latest"
connection_pooler_max_db_connections:
type: integer
default: 60

File diff suppressed because it is too large Load Diff

View File

@ -37,6 +37,10 @@ spec:
- name: {{ .Chart.Name }}
image: "{{ .Values.image.registry }}/{{ .Values.image.repository }}:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
{{- if .Values.extraArgs }}
args:
{{ toYaml .Values.extraArgs | indent 8 }}
{{- end }}
env:
{{- if .Values.enableJsonLogging }}
- name: ENABLE_JSON_LOGGING

View File

@ -18,6 +18,9 @@ configTarget: "OperatorConfigurationCRD"
# JSON logging format
enableJsonLogging: false
# Command-line options for the operator
extraArgs: []
# general configuration parameters
configGeneral:
# the deployment should create/update the CRDs
@ -27,6 +30,8 @@ configGeneral:
- "all"
# update only the statefulsets without immediately doing the rolling update
enable_lazy_spilo_upgrade: false
# toogle to use maintenance windows feature
enable_maintenance_windows: true
# set the PGVERSION env var instead of providing the version via postgresql.bin_dir in SPILO_CONFIGURATION
enable_pgversion_env_var: true
# start any new database pod without limitations on shm memory
@ -441,7 +446,7 @@ configConnectionPooler:
# db user for pooler to use
connection_pooler_user: "pooler"
# docker image
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-32"
connection_pooler_image: "ghcr.io/zalando/postgres-operator/pgbouncer:latest"
# max db connections the pooler should hold
connection_pooler_max_db_connections: 60
# default pooling mode

View File

@ -4,6 +4,7 @@ allow_concurrent_steps: true
build_env: &BUILD_ENV
PYTHON_BASE_IMAGE: container-registry.zalando.net/library/python-3.11-slim
ALPINE_BASE_IMAGE: container-registry.zalando.net/library/alpine-3
UBUNTU_BASE_IMAGE: container-registry.zalando.net/library/ubuntu-22.04
MULTI_ARCH_REGISTRY: container-registry-test.zalando.net/acid
pipeline:
@ -42,6 +43,33 @@ pipeline:
-f docker/Dockerfile \
--push .
- id: build-pooler
env:
<<: *BUILD_ENV
type: script
vm_config:
type: linux
commands:
- desc: Build image
cmd: |
cd pooler
if [ -z ${CDP_SOURCE_BRANCH} ]; then
IMAGE=${MULTI_ARCH_REGISTRY}/pgbouncer
else
IMAGE=${MULTI_ARCH_REGISTRY}/pgbouncer-test
fi
docker buildx create --config /etc/cdp-buildkitd.toml --driver-opt network=host --bootstrap --use
docker buildx build --platform "linux/amd64,linux/arm64" \
--build-arg BASE_IMAGE="${ALPINE_BASE_IMAGE}" \
-t "${IMAGE}:${CDP_BUILD_VERSION}" \
--push .
if [ -z ${CDP_SOURCE_BRANCH} ]; then
cdp-promote-image ${IMAGE}:${CDP_BUILD_VERSION}
fi
- id: build-operator-ui
env:
<<: *BUILD_ENV
@ -99,6 +127,7 @@ pipeline:
docker buildx create --config /etc/cdp-buildkitd.toml --driver-opt network=host --bootstrap --use
docker buildx build --platform linux/amd64,linux/arm64 \
--build-arg BASE_IMAGE="${UBUNTU_BASE_IMAGE}" \
-t ${IMAGE}:${CDP_BUILD_VERSION} \
--push .

View File

@ -65,7 +65,10 @@ the `PGVERSION` environment variable is set for the database pods. Since
In-place major version upgrades can be configured to be executed by the
operator with the `major_version_upgrade_mode` option. By default, it is
enabled (mode: `manual`). In any case, altering the version in the manifest
will trigger a rolling update of pods to update the `PGVERSION` env variable.
will update the desired `PGVERSION`. If `maintenanceWindows` are configured,
major-version-related pod rotation is deferred until the next maintenance
window. Without maintenance windows, the operator will trigger a rolling
update of pods to apply the new `PGVERSION`.
Spilo's [`configure_spilo`](https://github.com/zalando/spilo/blob/master/postgres-appliance/scripts/configure_spilo.py)
script will notice the version mismatch but start the current version again.
@ -92,10 +95,11 @@ Thus, the `full` mode can create drift between desired and actual state.
### Upgrade during maintenance windows
When `maintenanceWindows` are defined in the Postgres manifest the operator
will trigger a major version upgrade only during these periods. Make sure they
are at least twice as long as your configured `resync_period` to guarantee
that operator actions can be triggered.
When `maintenanceWindows` are defined in the Postgres manifest or in the global
config the operator will trigger major-version-related pod rotation and the
major version upgrade only during these periods. Make sure they are at least
twice as long as your configured `resync_period` to guarantee that operator
actions can be triggered.
### Upgrade annotations
@ -887,15 +891,13 @@ cluster manifest. In the case any of these variables are omitted from the
manifest, the operator configuration settings `enable_master_load_balancer` and
`enable_replica_load_balancer` apply. Note that the operator settings affect
all Postgresql services running in all namespaces watched by the operator.
If load balancing is enabled two default annotations will be applied to its
services:
If load balancing is enabled the following default annotation will be applied to
its services:
- `external-dns.alpha.kubernetes.io/hostname` with the value defined by the
operator configs `master_dns_name_format` and `replica_dns_name_format`.
This value can't be overwritten. If any changing in its value is needed, it
MUST be done changing the DNS format operator config parameters; and
- `service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout` with
a default value of "3600".
There are multiple options to specify service annotations that will be merged
with each other and override in the following order (where latter take

View File

@ -118,7 +118,9 @@ These parameters are grouped directly under the `spec` key in the manifest.
a list which defines specific time frames when certain maintenance operations
such as automatic major upgrades or master pod migration are allowed to happen.
Accepted formats are "01:00-06:00" for daily maintenance windows or
"Sat:00:00-04:00" for specific days, with all times in UTC.
"Sat:00:00-04:00" for specific days, with all times in UTC. Note, when the
global config option `enable_maintenance_windows` is false, the specified
windows will be ignored.
* **users**
a map of usernames to user flags for the users that should be created in the

View File

@ -23,6 +23,12 @@ The following command-line options are supported for the operator:
off can can be overridden by the aforementioned operator configuration
option.
* **-kubeqps**
set the maximum number of Kubernetes API requests per second. Default is 10.
* **-kubeburst**
set the burst limit for Kubernetes API requests, allowing temporary spikes beyond the configured QPS. Default is 20.
In addition to that, standard [glog
flags](https://godoc.org/github.com/golang/glog) are also supported. For
instance, one may want to add `-alsologtostderr` and `-v=8` to debug the

View File

@ -173,6 +173,9 @@ Those are top-level keys, containing both leaf keys and groups.
the thresholds. The value must be `"true"` to be effective. The default is empty
which means the feature is disabled.
* **enable_maintenance_windows**
toggle for using the maintenance windows feature. Default is `"true"`.
* **maintenance_windows**
a list which defines specific time frames when certain maintenance
operations such as automatic major upgrades or master pod migration are
@ -897,6 +900,19 @@ grouped under the `logical_backup` key.
* **logical_backup_cronjob_environment_secret**
Reference to a Kubernetes secret, which keys will be added as environment variables to the cronjob. Default: ""
The following environment variables can be passed to the logical backup
cronjob via `logical_backup_cronjob_environment_secret` to control
connectivity checks before the backup starts:
* **LOGICAL_BACKUP_CONNECT_RETRIES**
Number of times to retry connecting to the target PostgreSQL pod before
giving up. This is useful when NetworkPolicy enforcement introduces a
short delay before a newly-created pod's IP is allowed through ingress
rules on the destination node. Default: "10"
* **LOGICAL_BACKUP_CONNECT_RETRY_DELAY**
Delay in seconds between connectivity retries. Default: "2"
## Debugging the operator
Options to aid debugging of the operator itself. Grouped under the `debug` key.
@ -1059,7 +1075,7 @@ operator being able to provide some reasonable defaults.
* **connection_pooler_image**
Docker image to use for connection pooler deployment.
Default: "registry.opensource.zalan.do/acid/pgbouncer"
Default: "ghcr.io/zalando/postgres-operator/pgbouncer:latest"
* **connection_pooler_max_db_connections**
How many connections the pooler can max hold. This value is divided among the

View File

@ -714,7 +714,7 @@ but Kubernetes will not spin up the pod if the requested HugePages cannot be all
For more information on HugePages in Kubernetes, see also
[https://kubernetes.io/docs/tasks/manage-hugepages/scheduling-hugepages/](https://kubernetes.io/docs/tasks/manage-hugepages/scheduling-hugepages/)
## Use taints, tolerations and node affinity for dedicated PostgreSQL nodes
## Use taints, tolerations, node affinity and topology spread constraint for dedicated PostgreSQL nodes
To ensure Postgres pods are running on nodes without any other application pods,
you can use [taints and tolerations](https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/)
@ -755,6 +755,23 @@ spec:
If you need to define a `nodeAffinity` for all your Postgres clusters use the
`node_readiness_label` [configuration](administrator.md#node-readiness-labels).
If you need PostgreSQL Pods to run on separate nodes, you can use the
[topologySpreadConstraints](https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/) to control how they are distributed across your cluster.
This ensures they are spread among failure domains such as
regions, zones, nodes, or other user-defined topology domains.
```yaml
apiVersion: "acid.zalan.do/v1"
kind: postgresql
metadata:
name: acid-minimal-cluster
spec:
topologySpreadConstraints:
- maxskew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: DoNotSchedule
```
## In-place major version upgrade
Starting with Spilo 14, operator supports in-place major version upgrade to a
@ -1064,7 +1081,7 @@ spec:
- all
volumeSource:
emptyDir: {}
sidecars:
sidecars:
- name: "container-name"
image: "company/image:tag"
volumeMounts:

View File

@ -3,6 +3,7 @@
export cluster_name="postgres-operator-e2e-tests"
export kubeconfig_path="/tmp/kind-config-${cluster_name}"
export operator_image="ghcr.io/zalando/postgres-operator:latest"
export pooler_image="ghcr.io/zalando/postgres-operator/pgbouncer:latest"
export e2e_test_runner_image="ghcr.io/zalando/postgres-operator-e2e-tests-runner:latest"
docker run -it --entrypoint /bin/bash --network=host -e "TERM=xterm-256color" \
@ -11,4 +12,5 @@ docker run -it --entrypoint /bin/bash --network=host -e "TERM=xterm-256color" \
--mount type=bind,source="$(readlink -f tests)",target=/tests \
--mount type=bind,source="$(readlink -f exec.sh)",target=/exec.sh \
--mount type=bind,source="$(readlink -f scripts)",target=/scripts \
-e OPERATOR_IMAGE="${operator_image}" "${e2e_test_runner_image}"
-e OPERATOR_IMAGE="${operator_image}" -e POOLER_IMAGE="${pooler_image}" \
"${e2e_test_runner_image}"

View File

@ -26,17 +26,33 @@ echo "Kubeconfig path: ${kubeconfig_path}"
function pull_images(){
operator_tag=$(git describe --tags --always --dirty)
image_name="ghcr.io/zalando/postgres-operator:${operator_tag}"
if [[ -z $(docker images -q "${image_name}") ]]
then
if ! docker pull "${image_name}"
then
echo "Failed to pull operator image: ${image_name}"
exit 1
components=("postgres-operator" "pooler")
image_urls=("ghcr.io/zalando/postgres-operator:${operator_tag}" "ghcr.io/zalando/postgres-operator/pgbouncer:${operator_tag}")
for i in "${!components[@]}"; do
component="${components[$i]}"
image="${image_urls[$i]}"
if [[ -z $(docker images -q "$image") ]]; then
echo "Pulling $component image: $image"
if ! docker pull "$image"; then
echo "Failed to pull $component image: $image"
exit 1
fi
else
echo "$component image already exists: $image"
fi
fi
operator_image="${image_name}"
echo "Using operator image: ${operator_image}"
# Set variables for later use
if [[ "$component" == "postgres-operator" ]]; then
operator_image="$image"
elif [[ "$component" == "pooler" ]]; then
pooler_image="$image"
fi
done
echo "Using operator image: $operator_image"
echo "Using pooler image: $pooler_image"
}
function start_kind(){
@ -55,10 +71,11 @@ function start_kind(){
kind load docker-image "${spilo_image}" --name ${cluster_name}
}
function load_operator_image() {
echo "Loading operator image"
function load_operator_images() {
echo "Loading operator images"
export KUBECONFIG="${kubeconfig_path}"
kind load docker-image "${operator_image}" --name ${cluster_name}
kind load docker-image "${pooler_image}" --name ${cluster_name}
}
function set_kind_api_server_ip(){
@ -85,7 +102,8 @@ function run_tests(){
--mount type=bind,source="$(readlink -f tests)",target=/tests \
--mount type=bind,source="$(readlink -f exec.sh)",target=/exec.sh \
--mount type=bind,source="$(readlink -f scripts)",target=/scripts \
-e OPERATOR_IMAGE="${operator_image}" "${e2e_test_runner_image}" ${E2E_TEST_CASE-} $@
-e OPERATOR_IMAGE="${operator_image}" -e POOLER_IMAGE="${pooler_image}" \
"${e2e_test_runner_image}" ${E2E_TEST_CASE-} $@
}
function cleanup(){
@ -100,7 +118,7 @@ function main(){
[[ -z ${NOCLEANUP-} ]] && trap "cleanup" QUIT TERM EXIT
pull_images
[[ ! -f ${kubeconfig_path} ]] && start_kind
load_operator_image
load_operator_images
set_kind_api_server_ip
generate_certificate

View File

@ -129,6 +129,7 @@ class EndToEndTestCase(unittest.TestCase):
configmap["data"]["workers"] = "1"
configmap["data"]["docker_image"] = SPILO_CURRENT
configmap["data"]["major_version_upgrade_mode"] = "full"
configmap["data"]["connection_pooler_image"] = os.environ['POOLER_IMAGE']
with open("manifests/configmap.yaml", 'w') as f:
yaml.dump(configmap, f, Dumper=yaml.Dumper)
@ -711,7 +712,7 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: k8s.count_running_pods(master_pooler_label), 2, "No pooler pods found")
self.eventuallyEqual(lambda: k8s.count_running_pods(replica_pooler_label), 2, "No pooler replica pods found")
self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label), 2, "No pooler service found")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label), 1, "Pooler secret not created")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label), 3, "Not all pooler secrets found")
# TLS still enabled so check existing env variables and volume mounts
self.eventuallyEqual(lambda: k8s.count_pods_with_env_variable("CONNECTION_POOLER_CLIENT_TLS_CRT", pooler_label), 4, "TLS env variable CONNECTION_POOLER_CLIENT_TLS_CRT missing in pooler pods")
@ -737,14 +738,12 @@ class EndToEndTestCase(unittest.TestCase):
master_annotations = {
"external-dns.alpha.kubernetes.io/hostname": "acid-minimal-cluster-pooler.default.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
}
self.eventuallyTrue(lambda: k8s.check_service_annotations(
master_pooler_label+","+pooler_label, master_annotations), "Wrong annotations")
replica_annotations = {
"external-dns.alpha.kubernetes.io/hostname": "acid-minimal-cluster-pooler-repl.default.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
}
self.eventuallyTrue(lambda: k8s.check_service_annotations(
replica_pooler_label+","+pooler_label, replica_annotations), "Wrong annotations")
@ -771,7 +770,7 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label),
1, "No pooler service found")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label),
1, "Secret not created")
2, "Not all pooler secrets created")
# Turn off only replica connection pooler
k8s.api.custom_objects_api.patch_namespaced_custom_object(
@ -799,7 +798,7 @@ class EndToEndTestCase(unittest.TestCase):
'ClusterIP',
"Expected LoadBalancer service type for master, found {}")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label),
1, "Secret not created")
2, "Not all pooler secrets created")
# scale up connection pooler deployment
k8s.api.custom_objects_api.patch_namespaced_custom_object(
@ -834,8 +833,8 @@ class EndToEndTestCase(unittest.TestCase):
0, "Pooler pods not scaled down")
self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label),
0, "Pooler service not removed")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label('application=spilo,cluster-name=acid-minimal-cluster'),
4, "Secrets not deleted")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label),
0, "Not all pooler secrets deleted")
# Verify that all the databases have pooler schema installed.
# Do this via psql, since otherwise we need to deal with
@ -946,7 +945,7 @@ class EndToEndTestCase(unittest.TestCase):
},
}
}
old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
old_svc_creation_timestamp = svc.metadata.creation_timestamp
@ -1384,7 +1383,7 @@ class EndToEndTestCase(unittest.TestCase):
}
k8s.update_config(patch_scaled_policy_retain)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# decrease the number of instances
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', pg_patch_scale_down_instances)
@ -1661,7 +1660,6 @@ class EndToEndTestCase(unittest.TestCase):
# toggle pod anti affinity to move replica away from master node
self.assert_distributed_pods(master_nodes)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_overwrite_pooler_deployment(self):
pooler_name = 'acid-minimal-cluster-pooler'
@ -1828,7 +1826,7 @@ class EndToEndTestCase(unittest.TestCase):
},
}
k8s.api.core_v1.patch_namespaced_secret(
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
namespace="default",
body=secret_fake_rotation)
@ -1845,7 +1843,7 @@ class EndToEndTestCase(unittest.TestCase):
"enable_password_rotation": "true",
"inherited_annotations": "environment",
"password_rotation_interval": "30",
"password_rotation_user_retention": "30", # should be set to 60
"password_rotation_user_retention": "30", # should be set to 60
},
}
k8s.update_config(enable_password_rotation)
@ -1914,7 +1912,7 @@ class EndToEndTestCase(unittest.TestCase):
self.assertTrue("environment" in db_user_secret.metadata.annotations, "Added annotation was not propagated to secret")
# disable password rotation for all other users (foo_user)
# and pick smaller intervals to see if the third fake rotation user is dropped
# and pick smaller intervals to see if the third fake rotation user is dropped
enable_password_rotation = {
"data": {
"enable_password_rotation": "false",
@ -2420,6 +2418,90 @@ class EndToEndTestCase(unittest.TestCase):
# toggle pod anti affinity to move replica away from master node
self.assert_distributed_pods(master_nodes)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_topology_spread_constraints(self):
'''
Enable topologySpreadConstraints for pods
'''
k8s = self.k8s
cluster_labels = "application=spilo,cluster-name=acid-minimal-cluster"
# Verify we are in good state from potential previous tests
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
# patch the pvc retention policy to enable delete when scale down
patch_scaled_policy_delete = {
"data": {
"persistent_volume_claim_retention_policy": "when_deleted:retain,when_scaled:delete"
}
}
k8s.update_config(patch_scaled_policy_delete)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
master_nodes, replica_nodes = k8s.get_cluster_nodes()
self.assertNotEqual(master_nodes, [])
self.assertNotEqual(replica_nodes, [])
# Patch label to nodes for topologySpreadConstraints
patch_node_label = {
"metadata": {
"labels": {
"topology.kubernetes.io/zone": "zalando"
}
}
}
k8s.api.core_v1.patch_node(master_nodes[0], patch_node_label)
k8s.api.core_v1.patch_node(replica_nodes[0], patch_node_label)
# Patch topologySpreadConstraint and scale-out postgresql pods to postgresqls manifest.
patch_topologySpreadConstraint_config = {
"spec": {
"numberOfInstances": 6,
"topologySpreadConstraint": [
{
"maxskew": 1,
"topologyKey": "topology.kubernetes.io/zone",
"whenUnsatisfiable": "DoNotSchedule"
}
]
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default",
"postgresqls", "acid-minimal-cluster",
patch_topologySpreadConstraint_config)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_labels), 6, "Postgresql StatefulSet are scale to 6")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 6, "All pods are running")
worker_node_1 = 0
worker_node_2 = 0
pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_labels)
for pod in pods.items:
if pod.spec.node_name == 'postgres-operator-e2e-tests-worker':
worker_node_1 += 1
elif pod.spec.node_name == 'postgres-operator-e2e-tests-worker2':
worker_node_2 += 1
self.assertEqual(worker_node_1, worker_node_2)
self.assertEqual(worker_node_1, 3)
self.assertEqual(worker_node_2, 3)
# Reset configurations
patch_topologySpreadConstraint_config = {
"spec": {
"numberOfInstances": 2,
"topologySpreadConstraint": []
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default",
"postgresqls", "acid-minimal-cluster",
patch_topologySpreadConstraint_config)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_labels), 2, "Postgresql StatefulSet are scale to 2")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "All pods are running")
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_zz_cluster_deletion(self):
'''
@ -2495,7 +2577,7 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted")
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 2, "PVCs were deleted although disabled in config")
except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
@ -2537,7 +2619,7 @@ class EndToEndTestCase(unittest.TestCase):
# if nodes are different we can quit here
if master_nodes[0] not in replica_nodes:
return True
return True
# enable pod anti affintiy in config map which should trigger movement of replica
patch_enable_antiaffinity = {
@ -2561,7 +2643,7 @@ class EndToEndTestCase(unittest.TestCase):
}
k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity")
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels)
k8s.wait_for_running_pods(cluster_labels, 2)
@ -2572,7 +2654,7 @@ class EndToEndTestCase(unittest.TestCase):
# if nodes are different we can quit here
for target_node in target_nodes:
if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes:
print('Pods run on the same node')
print('Pods run on the same node')
return False
except timeout_decorator.TimeoutError:

4
go.mod
View File

@ -6,11 +6,11 @@ require (
github.com/Masterminds/semver v1.5.0
github.com/aws/aws-sdk-go v1.55.8
github.com/golang/mock v1.6.0
github.com/lib/pq v1.10.9
github.com/lib/pq v1.11.2
github.com/motomux/pretty v0.0.0-20161209205251-b2aad2c9a95d
github.com/pkg/errors v0.9.1
github.com/r3labs/diff v1.1.0
github.com/sirupsen/logrus v1.9.3
github.com/sirupsen/logrus v1.9.4
github.com/stretchr/testify v1.11.1
golang.org/x/crypto v0.45.0
gopkg.in/yaml.v2 v2.4.0

10
go.sum
View File

@ -71,8 +71,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.11.2 h1:x6gxUeu39V0BHZiugWe8LXZYZ+Utk7hSJGThs8sdzfs=
github.com/lib/pq v1.11.2/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
@ -111,8 +111,8 @@ github.com/r3labs/diff v1.1.0/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w=
github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g=
github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo=
github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0=
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
@ -122,7 +122,6 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
@ -166,7 +165,6 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# Copyright 2017 The Kubernetes Authors.
#

View File

@ -1,4 +1,4 @@
ARG BASE_IMAGE=registry.opensource.zalan.do/library/ubuntu-22.04:latest
ARG BASE_IMAGE=ubuntu:22.04
FROM ${BASE_IMAGE}
LABEL maintainer="Team ACID @ Zalando <team-acid@zalando.de>"

View File

@ -183,6 +183,25 @@ function get_master_pod {
get_pods "labelSelector=${CLUSTER_NAME_LABEL}%3D${SCOPE},spilo-role%3Dmaster" | tee | head -n 1
}
# Wait for TCP connectivity to the target PostgreSQL pod.
# When NetworkPolicy is enforced via iptables, a newly-created pod's IP may not
# yet be present in the destination node's ingress allow lists, causing
# cross-node connections to be rejected until the next policy sync.
function wait_for_pg {
local retries=${LOGICAL_BACKUP_CONNECT_RETRIES:-10}
local delay=${LOGICAL_BACKUP_CONNECT_RETRY_DELAY:-2}
local i
for (( i=1; i<=retries; i++ )); do
if "$PG_BIN"/pg_isready -h "$PGHOST" -p "${PGPORT:-5432}" -q 2>/dev/null; then
return 0
fi
echo "waiting for $PGHOST:${PGPORT:-5432} to become reachable (attempt $i/$retries)..."
sleep "$delay"
done
echo "ERROR: $PGHOST:${PGPORT:-5432} not reachable after $((retries * delay))s"
return 1
}
CURRENT_NODENAME=$(get_current_pod | jq .items[].spec.nodeName --raw-output)
export CURRENT_NODENAME
@ -197,6 +216,8 @@ for search in "${search_strategy[@]}"; do
done
wait_for_pg
set -x
if [ "$LOGICAL_BACKUP_PROVIDER" == "az" ]; then
dump | compress > /tmp/azure-backup.sql.gz

View File

@ -232,6 +232,12 @@ spec:
# values:
# - enabled
# Add topology spread constraint to distribute PostgreSQL pods across all nodes labeled with "topology.kubernetes.io/zone".
# topologySpreadConstraint:
# - maxSkew: 1
# topologyKey: topology.kubernetes.io/zone
# whenUnsatisfiable: DoNotSchedule
# Enables change data capture streams for defined database tables
# streams:
# - applicationId: test-app

View File

@ -17,7 +17,7 @@ data:
connection_pooler_default_cpu_request: "500m"
connection_pooler_default_memory_limit: 100Mi
connection_pooler_default_memory_request: 100Mi
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-32"
connection_pooler_image: "ghcr.io/zalando/postgres-operator/pgbouncer:latest"
connection_pooler_max_db_connections: "60"
connection_pooler_mode: "transaction"
connection_pooler_number_of_instances: "2"
@ -46,6 +46,7 @@ data:
enable_ebs_gp3_migration_max_size: "1000"
enable_init_containers: "true"
enable_lazy_spilo_upgrade: "false"
enable_maintenance_windows: "true"
enable_master_load_balancer: "false"
enable_master_pooler_load_balancer: "false"
enable_password_rotation: "false"

View File

@ -23,7 +23,7 @@ spec:
serviceAccountName: postgres-operator
containers:
- name: postgres-operator
image: registry.opensource.zalan.do/acid/pgbouncer:master-32
image: ghcr.io/zalando/postgres-operator/pgbouncer:latest
imagePullPolicy: IfNotPresent
resources:
requests:

View File

@ -77,6 +77,9 @@ spec:
enable_lazy_spilo_upgrade:
type: boolean
default: false
enable_maintenance_windows:
type: boolean
default: true
enable_pgversion_env_var:
type: boolean
default: true
@ -667,7 +670,7 @@ spec:
default: "pooler"
connection_pooler_image:
type: string
default: "registry.opensource.zalan.do/acid/pgbouncer:master-32"
default: "ghcr.io/zalando/postgres-operator/pgbouncer:latest"
connection_pooler_max_db_connections:
type: integer
default: 60

View File

@ -8,6 +8,7 @@ configuration:
# crd_categories:
# - all
# enable_lazy_spilo_upgrade: false
enable_maintenance_windows: true
enable_pgversion_env_var: true
# enable_shm_volume: true
enable_spilo_wal_path_compat: false
@ -217,7 +218,7 @@ configuration:
connection_pooler_default_cpu_request: "500m"
connection_pooler_default_memory_limit: 100Mi
connection_pooler_default_memory_request: 100Mi
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer:master-32"
connection_pooler_image: "ghcr.io/zalando/postgres-operator/pgbouncer:latest"
# connection_pooler_max_db_connections: 60
connection_pooler_mode: "transaction"
connection_pooler_number_of_instances: 2

View File

@ -4056,6 +4056,22 @@ spec:
type: string
type: object
type: array
topologySpreadConstraints:
type: array
nullable: true
items:
type: object
properties:
maxSkew:
type: integer
format: int32
topologyKey:
type: string
whenUnsatisfiable:
type: string
enum:
- DoNotSchedule
- ScheduleAnyway
useLoadBalancer:
description: |-
deprecated load balancer settings maintained for backward compatibility

View File

@ -56,6 +56,7 @@ var OperatorConfigCRDResourceColumns = []apiextv1.CustomResourceColumnDefinition
}
var min1 = 1.0
var minLength1 int64 = 1
var minDisable = -1.0
// OperatorConfigCRDResourceValidation to check applied manifest parameters
@ -105,6 +106,9 @@ var OperatorConfigCRDResourceValidation = apiextv1.CustomResourceValidation{
"enable_lazy_spilo_upgrade": {
Type: "boolean",
},
"enable_maintenance_windows": {
Type: "boolean",
},
"enable_shm_volume": {
Type: "boolean",
},

View File

@ -266,6 +266,7 @@ type OperatorConfigurationData struct {
Workers uint32 `json:"workers,omitempty"`
ResyncPeriod Duration `json:"resync_period,omitempty"`
RepairPeriod Duration `json:"repair_period,omitempty"`
EnableMaintenanceWindows *bool `json:"enable_maintenance_windows,omitempty"`
MaintenanceWindows []MaintenanceWindow `json:"maintenance_windows,omitempty"`
SetMemoryRequestToLimit bool `json:"set_memory_request_to_limit,omitempty"`
ShmVolume *bool `json:"enable_shm_volume,omitempty"`

View File

@ -92,17 +92,18 @@ type PostgresSpec struct {
Clone *CloneDescription `json:"clone,omitempty"`
// Note: usernames specified here as database owners must be declared
// in the users key of the spec key.
Databases map[string]string `json:"databases,omitempty"`
PreparedDatabases map[string]PreparedDatabase `json:"preparedDatabases,omitempty"`
SchedulerName *string `json:"schedulerName,omitempty"`
NodeAffinity *v1.NodeAffinity `json:"nodeAffinity,omitempty"`
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
Sidecars []Sidecar `json:"sidecars,omitempty"`
InitContainers []v1.Container `json:"initContainers,omitempty"`
PodPriorityClassName string `json:"podPriorityClassName,omitempty"`
ShmVolume *bool `json:"enableShmVolume,omitempty"`
EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"`
LogicalBackupRetention string `json:"logicalBackupRetention,omitempty"`
Databases map[string]string `json:"databases,omitempty"`
PreparedDatabases map[string]PreparedDatabase `json:"preparedDatabases,omitempty"`
SchedulerName *string `json:"schedulerName,omitempty"`
NodeAffinity *v1.NodeAffinity `json:"nodeAffinity,omitempty"`
TopologySpreadConstraints []v1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"`
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
Sidecars []Sidecar `json:"sidecars,omitempty"`
InitContainers []v1.Container `json:"initContainers,omitempty"`
PodPriorityClassName string `json:"podPriorityClassName,omitempty"`
ShmVolume *bool `json:"enableShmVolume,omitempty"`
EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"`
LogicalBackupRetention string `json:"logicalBackupRetention,omitempty"`
// +kubebuilder:validation:Pattern=`^(\d+|\*)(/\d+)?(\s+(\d+|\*)(/\d+)?){4}$`
LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"`
StandbyCluster *StandbyDescription `json:"standby,omitempty"`

View File

@ -433,6 +433,11 @@ func (in *OperatorConfigurationData) DeepCopyInto(out *OperatorConfigurationData
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.EnableMaintenanceWindows != nil {
in, out := &in.EnableMaintenanceWindows, &out.EnableMaintenanceWindows
*out = new(bool)
**out = **in
}
if in.MaintenanceWindows != nil {
in, out := &in.MaintenanceWindows, &out.MaintenanceWindows
*out = make([]MaintenanceWindow, len(*in))
@ -796,6 +801,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) {
*out = new(corev1.NodeAffinity)
(*in).DeepCopyInto(*out)
}
if in.TopologySpreadConstraints != nil {
in, out := &in.TopologySpreadConstraints, &out.TopologySpreadConstraints
*out = make([]corev1.TopologySpreadConstraint, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Tolerations != nil {
in, out := &in.Tolerations, &out.Tolerations
*out = make([]corev1.Toleration, len(*in))

View File

@ -3,6 +3,7 @@ package cluster
// Postgres CustomResourceDefinition object i.e. Spilo
import (
"context"
"database/sql"
"encoding/json"
"fmt"
@ -71,7 +72,7 @@ type kubeResources struct {
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
// Pods are treated separately
}
// Cluster describes postgresql cluster
@ -96,7 +97,7 @@ type Cluster struct {
teamsAPIClient teams.Interface
oauthTokenGetter OAuthTokenGetter
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
KubeClient k8sutil.KubernetesClient // TODO: move clients to the better place?
currentProcess Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
@ -150,7 +151,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
Streams: make(map[string]*zalandov1.FabricEventStream)},
Streams: make(map[string]*zalandov1.FabricEventStream),
},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
@ -445,7 +447,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
var match, needsRollUpdate, needsReplace bool
match = true
//TODO: improve me
// TODO: improve me
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
match = false
reasons = append(reasons, "new statefulset's number of replicas does not match the current one")
@ -504,6 +506,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod affinity does not match the current one")
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) {
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one")
}
if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) {
needsReplace = true
needsRollUpdate = true
@ -677,7 +684,6 @@ func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resourc
}
}
return true
}
func compareEnv(a, b []v1.EnvVar) bool {
@ -712,9 +718,7 @@ func compareEnv(a, b []v1.EnvVar) bool {
}
func compareSpiloConfiguration(configa, configb string) bool {
var (
oa, ob spiloConfiguration
)
var oa, ob spiloConfiguration
var err error
err = json.Unmarshal([]byte(configa), &oa)
@ -823,7 +827,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]
}
return reason != "", reason
}
func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
@ -900,7 +903,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLog
}
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
// TODO: improve comparison
if !reflect.DeepEqual(new.Spec, cur.Spec) {
return false, "new PDB's spec does not match the current one"
}
@ -949,8 +952,17 @@ func (c *Cluster) removeFinalizer() error {
}
c.logger.Infof("removing finalizer %s", finalizerName)
finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)
// Fetch the latest version of the object to avoid resourceVersion conflicts
clusterName := c.clusterName()
latestPg, err := c.KubeClient.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get(
context.TODO(), clusterName.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching latest postgresql for finalizer removal: %v", err)
}
finalizers := util.RemoveString(latestPg.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(clusterName, latestPg, finalizers)
if err != nil {
return fmt.Errorf("error removing finalizer: %v", err)
}
@ -1073,7 +1085,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
c.logger.Debug("syncing secrets")
//TODO: mind the secrets of the deleted/new users
// TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
c.logger.Errorf("could not sync secrets: %v", err)
updateFailed = true
@ -1111,7 +1123,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// logical backup job
func() {
// create if it did not exist
if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
c.logger.Debug("creating backup cron job")
@ -1139,7 +1150,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
}
}
}()
// Roles and Databases
@ -1216,7 +1226,7 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
func (c *Cluster) Delete() error {
var anyErrors = false
anyErrors := false
c.mu.Lock()
defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
@ -1293,7 +1303,7 @@ func (c *Cluster) Delete() error {
// If we are done deleting our various resources we remove the finalizer to let K8S finally delete the Postgres CR
if anyErrors {
c.eventRecorder.Event(c.GetReference(), v1.EventTypeWarning, "Delete", "some resources could be successfully deleted yet")
return fmt.Errorf("some error(s) occured when deleting resources, NOT removing finalizer yet")
return fmt.Errorf("some error(s) occurred when deleting resources, NOT removing finalizer yet")
}
if err := c.removeFinalizer(); err != nil {
return fmt.Errorf("done cleaning up, but error when removing finalizer: %v", err)
@ -1307,7 +1317,6 @@ func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) {
c.specMu.RLock()
defer c.specMu.RUnlock()
return !c.Status.Success(), c.Status
}
// ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue.
@ -1416,7 +1425,6 @@ func (c *Cluster) initSystemUsers() {
}
func (c *Cluster) initPreparedDatabaseRoles() error {
if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}}
}
@ -1482,10 +1490,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error {
}
func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error {
for defaultRole, inherits := range defaultRoles {
namespace := c.Namespace
//if namespaced secrets are allowed
// if namespaced secrets are allowed
if secretNamespace != "" {
if c.Config.OpConfig.EnableCrossNamespaceSecret {
namespace = secretNamespace
@ -1553,7 +1560,7 @@ func (c *Cluster) initRobotUsers() error {
}
}
//if namespaced secrets are allowed
// if namespaced secrets are allowed
if c.Config.OpConfig.EnableCrossNamespaceSecret {
if strings.Contains(username, ".") {
splits := strings.Split(username, ".")
@ -1604,7 +1611,6 @@ func (c *Cluster) initAdditionalOwnerRoles() {
func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error {
teamMembers, err := c.getTeamMembers(teamID)
if err != nil {
return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err)
}
@ -1643,7 +1649,6 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e
}
func (c *Cluster) initHumanUsers() error {
var clusterIsOwnedBySuperuserTeam bool
superuserTeams := []string{}

View File

@ -680,8 +680,7 @@ func TestServiceAnnotations(t *testing.T) {
operatorAnnotations: make(map[string]string),
serviceAnnotations: make(map[string]string),
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
},
},
{
@ -702,8 +701,7 @@ func TestServiceAnnotations(t *testing.T) {
operatorAnnotations: make(map[string]string),
serviceAnnotations: make(map[string]string),
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
},
},
{
@ -714,8 +712,7 @@ func TestServiceAnnotations(t *testing.T) {
operatorAnnotations: make(map[string]string),
serviceAnnotations: map[string]string{"foo": "bar"},
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
"foo": "bar",
},
},
@ -737,8 +734,7 @@ func TestServiceAnnotations(t *testing.T) {
operatorAnnotations: map[string]string{"foo": "bar"},
serviceAnnotations: make(map[string]string),
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
"foo": "bar",
},
},
@ -780,8 +776,7 @@ func TestServiceAnnotations(t *testing.T) {
"external-dns.alpha.kubernetes.io/hostname": "wrong.external-dns-name.example.com",
},
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
},
},
{
@ -792,8 +787,7 @@ func TestServiceAnnotations(t *testing.T) {
serviceAnnotations: make(map[string]string),
operatorAnnotations: make(map[string]string),
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg.test.db.example.com,test-stg.acid.db.example.com",
},
},
{
@ -835,8 +829,7 @@ func TestServiceAnnotations(t *testing.T) {
operatorAnnotations: make(map[string]string),
serviceAnnotations: make(map[string]string),
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
},
},
{
@ -857,8 +850,7 @@ func TestServiceAnnotations(t *testing.T) {
operatorAnnotations: make(map[string]string),
serviceAnnotations: make(map[string]string),
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
},
},
{
@ -869,8 +861,7 @@ func TestServiceAnnotations(t *testing.T) {
operatorAnnotations: make(map[string]string),
serviceAnnotations: map[string]string{"foo": "bar"},
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
"foo": "bar",
},
},
@ -892,8 +883,7 @@ func TestServiceAnnotations(t *testing.T) {
operatorAnnotations: map[string]string{"foo": "bar"},
serviceAnnotations: make(map[string]string),
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
"foo": "bar",
},
},
@ -935,8 +925,7 @@ func TestServiceAnnotations(t *testing.T) {
"external-dns.alpha.kubernetes.io/hostname": "wrong.external-dns-name.example.com",
},
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
},
},
{
@ -947,8 +936,7 @@ func TestServiceAnnotations(t *testing.T) {
serviceAnnotations: make(map[string]string),
operatorAnnotations: make(map[string]string),
expect: map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600",
"external-dns.alpha.kubernetes.io/hostname": "acid-test-stg-repl.test.db.example.com,test-stg-repl.acid.db.example.com",
},
},
{
@ -1377,7 +1365,6 @@ func TestCompareServices(t *testing.T) {
serviceWithOwnerReference := newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeClusterIP,
[]string{"128.141.0.0/16", "137.138.0.0/16"},
@ -1406,7 +1393,6 @@ func TestCompareServices(t *testing.T) {
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeClusterIP,
[]string{"128.141.0.0/16", "137.138.0.0/16"},
@ -1414,7 +1400,6 @@ func TestCompareServices(t *testing.T) {
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeClusterIP,
[]string{"128.141.0.0/16", "137.138.0.0/16"},
@ -1426,7 +1411,6 @@ func TestCompareServices(t *testing.T) {
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeClusterIP,
[]string{"128.141.0.0/16", "137.138.0.0/16"},
@ -1434,7 +1418,6 @@ func TestCompareServices(t *testing.T) {
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"},
@ -1447,7 +1430,6 @@ func TestCompareServices(t *testing.T) {
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"},
@ -1455,7 +1437,6 @@ func TestCompareServices(t *testing.T) {
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"185.249.56.0/22"},
@ -1468,7 +1449,6 @@ func TestCompareServices(t *testing.T) {
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{"128.141.0.0/16", "137.138.0.0/16"},
@ -1476,7 +1456,6 @@ func TestCompareServices(t *testing.T) {
new: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeLoadBalancer,
[]string{},
@ -1489,7 +1468,6 @@ func TestCompareServices(t *testing.T) {
current: newService(
map[string]string{
constants.ZalandoDNSNameAnnotation: "clstr.acid.zalan.do",
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
},
v1.ServiceTypeClusterIP,
[]string{"128.141.0.0/16", "137.138.0.0/16"},
@ -1603,8 +1581,8 @@ func newCronJob(image, schedule string, vars []v1.EnvVar, mounts []v1.VolumeMoun
func TestCompareLogicalBackupJob(t *testing.T) {
img1 := "registry.opensource.zalan.do/acid/logical-backup:v1.0"
img2 := "registry.opensource.zalan.do/acid/logical-backup:v2.0"
img1 := "ghcr.io/zalando/postgres-operator/logical-backup:v1.14.0"
img2 := "ghcr.io/zalando/postgres-operator/logical-backup:v1.15.1"
clientSet := fake.NewSimpleClientset()
acidClientSet := fakeacidv1.NewSimpleClientset()

View File

@ -31,6 +31,7 @@ var poolerRunAsGroup = int64(101)
// ConnectionPoolerObjects K8s objects that are belong to connection pooler
type ConnectionPoolerObjects struct {
AuthSecret *v1.Secret
Deployment *appsv1.Deployment
Service *v1.Service
Name string
@ -167,6 +168,38 @@ func (c *Cluster) createConnectionPooler(LookupFunction InstallFunction) (SyncRe
return reason, nil
}
func (c *Cluster) generateUserlist() string {
var sb strings.Builder
poolerAdminUser := c.systemUsers[constants.ConnectionPoolerUserKeyName]
fmt.Fprintf(&sb, "\"%s\" \"%s\"\n", poolerAdminUser.Name, poolerAdminUser.Password)
for roleName, infraRole := range c.InfrastructureRoles {
if infraRole.Password != "" {
fmt.Fprintf(&sb, "\"%s\" \"%s\"\n", roleName, infraRole.Password)
}
}
return sb.String()
}
func (c *Cluster) generateConnectionPoolerAuthSecret(connectionPooler *ConnectionPoolerObjects) *v1.Secret {
return &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Labels: c.connectionPoolerLabels(connectionPooler.Role, true).MatchLabels,
Name: fmt.Sprintf("%s-u", connectionPooler.Name),
Namespace: connectionPooler.Namespace,
Annotations: c.annotationsSet(nil),
OwnerReferences: c.ownerReferences(),
},
Type: v1.SecretTypeOpaque,
// Secret data must be bytes. Kubernetes handles the encoding.
StringData: map[string]string{
"userlist.txt": c.generateUserlist(),
},
}
}
// Generate pool size related environment variables.
//
// MAX_DB_CONN would specify the global maximum for connections to a target
@ -320,6 +353,18 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) (
}
envVars = append(envVars, c.getConnectionPoolerEnvVars()...)
infraRolesList := make([]string, 0)
for infraRoleName := range c.InfrastructureRoles {
infraRolesList = append(infraRolesList, infraRoleName)
}
if len(infraRolesList) > 0 {
envVars = append(envVars, v1.EnvVar{
Name: "INFRASTRUCTURE_ROLES",
Value: strings.Join(infraRolesList, ","),
})
}
poolerContainer := v1.Container{
Name: connectionPoolerContainer,
Image: effectiveDockerImage,
@ -343,12 +388,29 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) (
},
}
var poolerVolumes []v1.Volume
var volumeMounts []v1.VolumeMount
// mount secret volume with userlist.txt for pgBouncer to authenticate users
poolerVolumes = append(poolerVolumes, v1.Volume{
Name: fmt.Sprintf("%s-u", c.connectionPoolerName(role)),
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: fmt.Sprintf("%s-u", c.connectionPoolerName(role)),
},
},
})
volumeMounts = append(volumeMounts, v1.VolumeMount{
Name: fmt.Sprintf("%s-u", c.connectionPoolerName(role)),
MountPath: "/etc/pgbouncer/userlist.txt",
SubPath: "userlist.txt",
ReadOnly: true,
})
// If the cluster has custom TLS certificates configured, we do the following:
// 1. Add environment variables to tell pgBouncer where to find the TLS certificates
// 2. Reference the secret in a volume
// 3. Mount the volume to the container at /tls
var poolerVolumes []v1.Volume
var volumeMounts []v1.VolumeMount
if spec.TLS != nil && spec.TLS.SecretName != "" {
getPoolerTLSEnv := func(k string) string {
keyName := ""
@ -533,10 +595,6 @@ func (c *Cluster) generatePoolerServiceAnnotations(role PostgresRole, spec *acid
annotations := c.getCustomServiceAnnotations(role, spec)
if c.shouldCreateLoadBalancerForPoolerService(role, spec) {
// set ELB Timeout annotation with default value
if _, ok := annotations[constants.ElbTimeoutAnnotationName]; !ok {
annotations[constants.ElbTimeoutAnnotationName] = constants.ElbTimeoutAnnotationValue
}
// -repl suffix will be added by replicaDNSName
clusterNameWithPoolerSuffix := c.connectionPoolerName(Master)
if role == Master {
@ -639,12 +697,31 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) {
c.logger.Infof("connection pooler service %s has been deleted for role %s", service.Name, role)
}
// Repeat the same for the auth secret
authSecret := c.ConnectionPooler[role].AuthSecret
if authSecret == nil {
c.logger.Debug("no connection pooler auth secret to delete")
} else {
err := c.KubeClient.
Secrets(c.Namespace).
Delete(context.TODO(), authSecret.Name, metav1.DeleteOptions{})
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("connection pooler auth secret %s for role %s has already been deleted", authSecret.Name, role)
} else if err != nil {
return fmt.Errorf("could not delete connection pooler auth secret: %v", err)
}
c.logger.Infof("connection pooler auth secret %s has been deleted for role %s", authSecret.Name, role)
}
c.ConnectionPooler[role].AuthSecret = nil
c.ConnectionPooler[role].Deployment = nil
c.ConnectionPooler[role].Service = nil
return nil
}
// delete connection pooler
// delete connection pooler secret
func (c *Cluster) deleteConnectionPoolerSecret() (err error) {
// Repeat the same for the secret object
secretName := c.credentialSecretName(c.OpConfig.ConnectionPooler.User)
@ -660,6 +737,7 @@ func (c *Cluster) deleteConnectionPoolerSecret() (err error) {
return fmt.Errorf("could not delete pooler secret: %v", err)
}
}
return nil
}
@ -912,7 +990,7 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
// in this case also do not forget to install lookup function
// skip installation in standby clusters, since they are read-only
if !c.ConnectionPooler[role].LookupFunction && c.Spec.StandbyCluster == nil {
if !c.ConnectionPooler[role].LookupFunction && !isStandbyCluster(&newSpec.Spec) {
connectionPooler := c.Spec.ConnectionPooler
specSchema := ""
specUser := ""
@ -975,11 +1053,42 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
pods []v1.Pod
service *v1.Service
newService *v1.Service
authSecret *v1.Secret
newAuthSecret *v1.Secret
err error
)
updatedPodAnnotations := map[string]*string{}
syncReason := make([]string, 0)
// create extra secret for connection pooler authentication
newAuthSecret = c.generateConnectionPoolerAuthSecret(c.ConnectionPooler[role])
if authSecret, err = c.KubeClient.Secrets(c.Namespace).Get(context.TODO(), fmt.Sprintf("%s-u", c.connectionPoolerName(role)), metav1.GetOptions{}); err == nil {
c.ConnectionPooler[role].AuthSecret = authSecret
// make sure existing annotations are preserved
newAuthSecret.Annotations = c.annotationsSet(authSecret.Annotations)
authSecret, err = c.KubeClient.Secrets(authSecret.Namespace).Update(context.TODO(), newAuthSecret, metav1.UpdateOptions{})
if err != nil {
return NoSync, fmt.Errorf("could not update connection pooler auth secret: %v", err)
}
c.ConnectionPooler[role].AuthSecret = authSecret
} else if !k8sutil.ResourceNotFound(err) {
return NoSync, fmt.Errorf("could not get auth secret for connection pooler to sync: %v", err)
}
if k8sutil.ResourceNotFound(err) {
c.logger.Warningf("auth secret %s for connection pooler is not found, create it", fmt.Sprintf("%s-u", c.connectionPoolerName(role)))
authSecret, err = c.KubeClient.
Secrets(newAuthSecret.Namespace).
Create(context.TODO(), newAuthSecret, metav1.CreateOptions{})
if err != nil {
return NoSync, err
}
c.ConnectionPooler[role].AuthSecret = authSecret
}
// next the pooler deployment
deployment, err = c.KubeClient.
Deployments(c.Namespace).
Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{})

View File

@ -30,6 +30,7 @@ func newFakeK8sPoolerTestClient() (k8sutil.KubernetesClient, *fake.Clientset) {
StatefulSetsGetter: clientSet.AppsV1(),
DeploymentsGetter: clientSet.AppsV1(),
ServicesGetter: clientSet.CoreV1(),
SecretsGetter: clientSet.CoreV1(),
}, clientSet
}
@ -803,6 +804,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) {
}
cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{
Master: {
AuthSecret: nil,
Deployment: nil,
Service: nil,
LookupFunction: true,
@ -1019,6 +1021,7 @@ func TestPoolerTLS(t *testing.T) {
// create pooler resources
cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{}
cluster.ConnectionPooler[Master] = &ConnectionPoolerObjects{
AuthSecret: nil,
Deployment: nil,
Service: nil,
Name: cluster.connectionPoolerName(Master),
@ -1089,12 +1092,14 @@ func TestConnectionPoolerServiceSpec(t *testing.T) {
}
cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{
Master: {
AuthSecret: nil,
Deployment: nil,
Service: nil,
LookupFunction: false,
Role: Master,
},
Replica: {
AuthSecret: nil,
Deployment: nil,
Service: nil,
LookupFunction: false,

View File

@ -612,6 +612,13 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring
return podAntiAffinity
}
func generateTopologySpreadConstraints(labels labels.Set, topologySpreadConstraints []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint {
for _, topologySpreadConstraint := range topologySpreadConstraints {
topologySpreadConstraint.LabelSelector = &metav1.LabelSelector{MatchLabels: labels}
}
return topologySpreadConstraints
}
func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration {
// allow to override tolerations by postgresql manifest
if len(*tolerationsSpec) > 0 {
@ -817,10 +824,8 @@ func (c *Cluster) generatePodTemplate(
initContainers []v1.Container,
sidecarContainers []v1.Container,
sharePgSocketWithSidecars *bool,
topologySpreadConstraintsSpec []v1.TopologySpreadConstraint,
tolerationsSpec *[]v1.Toleration,
spiloRunAsUser *int64,
spiloRunAsGroup *int64,
spiloFSGroup *int64,
nodeAffinity *v1.Affinity,
schedulerName *string,
terminateGracePeriod int64,
@ -839,18 +844,22 @@ func (c *Cluster) generatePodTemplate(
terminateGracePeriodSeconds := terminateGracePeriod
containers := []v1.Container{*spiloContainer}
containers = append(containers, sidecarContainers...)
securityContext := v1.PodSecurityContext{}
if spiloRunAsUser != nil {
securityContext.RunAsUser = spiloRunAsUser
securityContext := v1.PodSecurityContext{
RunAsUser: c.OpConfig.Resources.SpiloRunAsUser,
RunAsGroup: c.OpConfig.Resources.SpiloRunAsGroup,
FSGroup: c.OpConfig.Resources.SpiloFSGroup,
}
if spiloRunAsGroup != nil {
securityContext.RunAsGroup = spiloRunAsGroup
if c.Spec.SpiloRunAsUser != nil {
securityContext.RunAsUser = c.Spec.SpiloRunAsUser
}
if spiloFSGroup != nil {
securityContext.FSGroup = spiloFSGroup
if c.Spec.SpiloRunAsGroup != nil {
securityContext.RunAsGroup = c.Spec.SpiloRunAsGroup
}
if c.Spec.SpiloFSGroup != nil {
securityContext.FSGroup = c.Spec.SpiloFSGroup
}
podSpec := v1.PodSpec{
@ -886,6 +895,8 @@ func (c *Cluster) generatePodTemplate(
podSpec.PriorityClassName = priorityClassName
}
podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, topologySpreadConstraintsSpec)
if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars {
addVarRunVolume(&podSpec)
}
@ -1352,22 +1363,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
// pickup the docker image for the spilo container
effectiveDockerImage := util.Coalesce(spec.DockerImage, c.OpConfig.DockerImage)
// determine the User, Group and FSGroup for the spilo pod
effectiveRunAsUser := c.OpConfig.Resources.SpiloRunAsUser
if spec.SpiloRunAsUser != nil {
effectiveRunAsUser = spec.SpiloRunAsUser
}
effectiveRunAsGroup := c.OpConfig.Resources.SpiloRunAsGroup
if spec.SpiloRunAsGroup != nil {
effectiveRunAsGroup = spec.SpiloRunAsGroup
}
effectiveFSGroup := c.OpConfig.Resources.SpiloFSGroup
if spec.SpiloFSGroup != nil {
effectiveFSGroup = spec.SpiloFSGroup
}
volumeMounts := generateVolumeMounts(spec.Volume)
// configure TLS with a custom secret volume
@ -1484,10 +1479,8 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
initContainers,
sidecarContainers,
c.OpConfig.SharePgSocketWithSidecars,
spec.TopologySpreadConstraints,
&tolerationSpec,
effectiveRunAsUser,
effectiveRunAsGroup,
effectiveFSGroup,
c.nodeAffinity(c.OpConfig.NodeReadinessLabel, spec.NodeAffinity),
spec.SchedulerName,
int64(c.OpConfig.PodTerminateGracePeriod.Seconds()),
@ -2047,11 +2040,6 @@ func (c *Cluster) generateServiceAnnotations(role PostgresRole, spec *acidv1.Pos
if c.shouldCreateLoadBalancerForService(role, spec) {
dnsName := c.dnsName(role)
// Just set ELB Timeout annotation with default value, if it does not
// have a custom value
if _, ok := annotations[constants.ElbTimeoutAnnotationName]; !ok {
annotations[constants.ElbTimeoutAnnotationName] = constants.ElbTimeoutAnnotationValue
}
// External DNS name annotation is not customizable
annotations[constants.ZalandoDNSNameAnnotation] = dnsName
}
@ -2369,6 +2357,8 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
tolerationsSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration)
topologySpreadConstraintsSpec := generateTopologySpreadConstraints(labels, spec.TopologySpreadConstraints)
// re-use the method that generates DB pod templates
if podTemplate, err = c.generatePodTemplate(
c.Namespace,
@ -2378,10 +2368,8 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
[]v1.Container{},
[]v1.Container{},
util.False(),
topologySpreadConstraintsSpec,
&tolerationsSpec,
nil,
nil,
nil,
c.nodeAffinity(c.OpConfig.NodeReadinessLabel, nil),
nil,
int64(c.OpConfig.PodTerminateGracePeriod.Seconds()),

View File

@ -2967,6 +2967,7 @@ func newLBFakeClient() (k8sutil.KubernetesClient, *fake.Clientset) {
DeploymentsGetter: clientSet.AppsV1(),
PodsGetter: clientSet.CoreV1(),
ServicesGetter: clientSet.CoreV1(),
SecretsGetter: clientSet.CoreV1(),
}, clientSet
}
@ -4271,3 +4272,56 @@ func TestGenerateCapabilities(t *testing.T) {
}
}
}
func TestTopologySpreadConstraints(t *testing.T) {
clusterName := "acid-test-cluster"
namespace := "default"
labelSelector := &metav1.LabelSelector{
MatchLabels: cluster.labelsSet(true),
}
pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
},
Spec: acidv1.PostgresSpec{
NumberOfInstances: 1,
Resources: &acidv1.Resources{
ResourceRequests: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
ResourceLimits: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
},
Volume: acidv1.Volume{
Size: "1G",
},
TopologySpreadConstraints: []v1.TopologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: labelSelector,
},
},
},
}
cluster := New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Name = clusterName
cluster.Namespace = namespace
cluster.labelsSet(true)
s, err := cluster.generateStatefulSet(&pg.Spec)
assert.NoError(t, err)
assert.Contains(t, s.Spec.Template.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{
MaxSkew: int32(1),
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: labelSelector,
},
)
}

View File

@ -376,6 +376,36 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error)
return memberData, nil
}
// podIsNotRunning returns true if a pod is known to be in a non-running state,
// e.g. stuck in CreateContainerConfigError, CrashLoopBackOff, ImagePullBackOff, etc.
// Pods with no status information are not considered non-running, as they may
// simply not have reported status yet.
func podIsNotRunning(pod *v1.Pod) bool {
if pod.Status.Phase == "" {
// No status reported yet — don't treat as non-running
return false
}
if pod.Status.Phase != v1.PodRunning {
return true
}
for _, cs := range pod.Status.ContainerStatuses {
if cs.State.Waiting != nil || cs.State.Terminated != nil {
return true
}
}
return false
}
// allPodsRunning returns true only if every pod in the list is in a healthy running state.
func (c *Cluster) allPodsRunning(pods []v1.Pod) bool {
for i := range pods {
if podIsNotRunning(&pods[i]) {
return false
}
}
return true
}
func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
stopCh := make(chan struct{})
ch := c.registerPodSubscriber(podName)
@ -444,7 +474,8 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
// switchover if
// 1. we have not observed a new master pod when re-creating former replicas
// 2. we know possible switchover targets even when no replicas were recreated
if newMasterPod == nil && len(replicas) > 0 {
// 3. the master pod is actually running (can't switchover a dead master)
if newMasterPod == nil && len(replicas) > 0 && !podIsNotRunning(masterPod) {
masterCandidate, err := c.getSwitchoverCandidate(masterPod)
if err != nil {
// do not recreate master now so it will keep the update flag and switchover will be retried on next sync
@ -455,6 +486,9 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
}
} else if newMasterPod == nil && len(replicas) == 0 {
c.logger.Warningf("cannot perform switch over before re-creating the pod: no replicas")
} else if podIsNotRunning(masterPod) {
c.logger.Warningf("master pod %q is not running, skipping switchover and recreating directly",
util.NameFromMeta(masterPod.ObjectMeta))
}
c.logger.Infof("recreating old master pod %q", util.NameFromMeta(masterPod.ObjectMeta))

View File

@ -15,6 +15,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/patroni"
v1 "k8s.io/api/core/v1"
)
func TestGetSwitchoverCandidate(t *testing.T) {
@ -112,3 +113,302 @@ func TestGetSwitchoverCandidate(t *testing.T) {
}
}
}
func TestPodIsNotRunning(t *testing.T) {
tests := []struct {
subtest string
pod v1.Pod
expected bool
}{
{
subtest: "pod with no status reported yet",
pod: v1.Pod{
Status: v1.PodStatus{},
},
expected: false,
},
{
subtest: "pod running with all containers ready",
pod: v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
},
},
},
expected: false,
},
{
subtest: "pod in pending phase",
pod: v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodPending,
},
},
expected: true,
},
{
subtest: "pod running but container in CreateContainerConfigError",
pod: v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Waiting: &v1.ContainerStateWaiting{
Reason: "CreateContainerConfigError",
Message: `secret "some-secret" not found`,
},
},
},
},
},
},
expected: true,
},
{
subtest: "pod running but container in CrashLoopBackOff",
pod: v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Waiting: &v1.ContainerStateWaiting{
Reason: "CrashLoopBackOff",
},
},
},
},
},
},
expected: true,
},
{
subtest: "pod running but container terminated",
pod: v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 137,
},
},
},
},
},
},
expected: true,
},
{
subtest: "pod running with mixed container states - one healthy one broken",
pod: v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
{
State: v1.ContainerState{
Waiting: &v1.ContainerStateWaiting{
Reason: "CreateContainerConfigError",
},
},
},
},
},
},
expected: true,
},
{
subtest: "pod in failed phase",
pod: v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodFailed,
},
},
expected: true,
},
{
subtest: "pod running with multiple healthy containers",
pod: v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
},
},
},
expected: false,
},
{
subtest: "pod running with ImagePullBackOff",
pod: v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Waiting: &v1.ContainerStateWaiting{
Reason: "ImagePullBackOff",
},
},
},
},
},
},
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.subtest, func(t *testing.T) {
result := podIsNotRunning(&tt.pod)
if result != tt.expected {
t.Errorf("podIsNotRunning() = %v, expected %v", result, tt.expected)
}
})
}
}
func TestAllPodsRunning(t *testing.T) {
client, _ := newFakeK8sSyncClient()
var cluster = New(
Config{
OpConfig: config.Config{
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
PodRoleLabel: "spilo-role",
},
},
}, client, acidv1.Postgresql{}, logger, eventRecorder)
tests := []struct {
subtest string
pods []v1.Pod
expected bool
}{
{
subtest: "all pods running",
pods: []v1.Pod{
{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}},
},
},
},
{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}},
},
},
},
},
expected: true,
},
{
subtest: "one pod not running",
pods: []v1.Pod{
{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}},
},
},
},
{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Waiting: &v1.ContainerStateWaiting{
Reason: "CreateContainerConfigError",
},
},
},
},
},
},
},
expected: false,
},
{
subtest: "all pods not running",
pods: []v1.Pod{
{
Status: v1.PodStatus{
Phase: v1.PodPending,
},
},
{
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Waiting: &v1.ContainerStateWaiting{
Reason: "CrashLoopBackOff",
},
},
},
},
},
},
},
expected: false,
},
{
subtest: "empty pod list",
pods: []v1.Pod{},
expected: true,
},
{
subtest: "pods with no status reported yet",
pods: []v1.Pod{
{
Status: v1.PodStatus{},
},
{
Status: v1.PodStatus{},
},
},
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.subtest, func(t *testing.T) {
result := cluster.allPodsRunning(tt.pods)
if result != tt.expected {
t.Errorf("allPodsRunning() = %v, expected %v", result, tt.expected)
}
})
}
}

View File

@ -41,6 +41,12 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
defer c.mu.Unlock()
oldSpec := c.Postgresql
if !c.isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
// do not apply any major version related changes yet
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}
c.setSpec(newSpec)
defer func() {
@ -97,11 +103,6 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
}
if !c.isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
// do not apply any major version related changes yet
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}
if err = c.syncStatefulSet(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
err = fmt.Errorf("could not sync statefulsets: %v", err)
@ -718,14 +719,26 @@ func (c *Cluster) syncStatefulSet() error {
if configPatched, restartPrimaryFirst, restartWait, err = c.syncPatroniConfig(pods, c.Spec.Patroni, requiredPgParameters); err != nil {
c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
postponeReasons = append(postponeReasons, "errors during Patroni config sync")
isSafeToRecreatePods = false
// Only mark unsafe if all pods are running. If some pods are not running,
// Patroni API errors are expected and should not block pod recreation,
// which is the only way to fix non-running pods.
if c.allPodsRunning(pods) {
isSafeToRecreatePods = false
} else {
c.logger.Warningf("ignoring Patroni config sync errors because some pods are not running")
}
}
// restart Postgres where it is still pending
if err = c.restartInstances(pods, restartWait, restartPrimaryFirst); err != nil {
c.logger.Errorf("errors while restarting Postgres in pods via Patroni API: %v", err)
postponeReasons = append(postponeReasons, "errors while restarting Postgres via Patroni API")
isSafeToRecreatePods = false
// Same logic: don't let unreachable non-running pods block recreation.
if c.allPodsRunning(pods) {
isSafeToRecreatePods = false
} else {
c.logger.Warningf("ignoring Patroni restart errors because some pods are not running")
}
}
// if we get here we also need to re-create the pods (either leftovers from the old

View File

@ -675,7 +675,9 @@ func isStandbyCluster(spec *acidv1.PostgresSpec) bool {
}
func (c *Cluster) isInMaintenanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool {
if len(specMaintenanceWindows) == 0 && len(c.OpConfig.MaintenanceWindows) == 0 {
ignoreMaintenanceWindows := c.OpConfig.EnableMaintenanceWindows != nil && !*c.OpConfig.EnableMaintenanceWindows
noWindowsDefined := len(specMaintenanceWindows) == 0 && len(c.OpConfig.MaintenanceWindows) == 0
if noWindowsDefined || ignoreMaintenanceWindows {
return true
}
now := time.Now()

View File

@ -660,6 +660,7 @@ func TestIsInMaintenanceWindow(t *testing.T) {
cluster := New(
Config{
OpConfig: config.Config{
EnableMaintenanceWindows: util.True(),
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
@ -683,12 +684,27 @@ func TestIsInMaintenanceWindow(t *testing.T) {
name string
windows []acidv1.MaintenanceWindow
configWindows []string
windowsFlag bool
expected bool
}{
{
name: "no maintenance windows",
windows: nil,
configWindows: nil,
windowsFlag: true,
expected: true,
},
{
name: "maintenance windows diabled",
windows: []acidv1.MaintenanceWindow{
{
Everyday: true,
StartTime: mustParseTime("00:00"),
EndTime: mustParseTime("23:59"),
},
},
configWindows: nil,
windowsFlag: false,
expected: true,
},
{
@ -701,6 +717,7 @@ func TestIsInMaintenanceWindow(t *testing.T) {
},
},
configWindows: nil,
windowsFlag: true,
expected: true,
},
{
@ -713,6 +730,7 @@ func TestIsInMaintenanceWindow(t *testing.T) {
},
},
configWindows: nil,
windowsFlag: true,
expected: true,
},
{
@ -724,24 +742,35 @@ func TestIsInMaintenanceWindow(t *testing.T) {
EndTime: mustParseTime(futureTimeEndFormatted),
},
},
expected: false,
windowsFlag: true,
expected: false,
},
{
name: "global maintenance windows with future interval time",
windows: nil,
configWindows: []string{fmt.Sprintf("%s-%s", futureTimeStartFormatted, futureTimeEndFormatted)},
windowsFlag: true,
expected: false,
},
{
name: "global maintenance windows all day",
windows: nil,
configWindows: []string{"00:00-02:00", "02:00-23:59"},
windowsFlag: true,
expected: true,
},
{
name: "global maintenance windows ignored",
windows: nil,
configWindows: []string{"00:00-02:00", "02:00-23:59"},
windowsFlag: false,
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cluster.OpConfig.EnableMaintenanceWindows = &tt.windowsFlag
cluster.OpConfig.MaintenanceWindows = tt.configWindows
cluster.Spec.MaintenanceWindows = tt.windows
if cluster.isInMaintenanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected {

View File

@ -39,7 +39,7 @@ func (c *Cluster) syncVolumes() error {
} else {
err = c.syncUnderlyingEBSVolume()
if err != nil {
c.logger.Errorf("errors occured during EBS volume adjustments: %v", err)
c.logger.Errorf("errors occurred during EBS volume adjustments: %v", err)
}
}
}

View File

@ -277,7 +277,7 @@ func (c *Controller) initRoleBinding() {
}`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name)
c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue)
}
c.logger.Info("Parse role bindings")
// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil)

View File

@ -51,6 +51,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
result.ShmVolume = util.CoalesceBool(fromCRD.ShmVolume, util.True())
result.SidecarImages = fromCRD.SidecarImages
result.SidecarContainers = fromCRD.SidecarContainers
result.EnableMaintenanceWindows = util.CoalesceBool(fromCRD.EnableMaintenanceWindows, util.True())
if len(fromCRD.MaintenanceWindows) > 0 {
result.MaintenanceWindows = make([]string, 0, len(fromCRD.MaintenanceWindows))
for _, window := range fromCRD.MaintenanceWindows {
@ -274,7 +275,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
result.ConnectionPooler.Image = util.Coalesce(
fromCRD.ConnectionPooler.Image,
"registry.opensource.zalan.do/acid/pgbouncer")
"ghcr.io/zalando/postgres-operator/pgbouncer:latest")
result.ConnectionPooler.Mode = util.Coalesce(
fromCRD.ConnectionPooler.Mode,

View File

@ -259,13 +259,26 @@ func (c *Controller) processEvent(event ClusterEvent) {
lg.Infoln("cluster has been created")
case EventUpdate:
lg.Infoln("update of the cluster started")
if !clusterFound {
lg.Warningln("cluster does not exist")
return
}
c.curWorkerCluster.Store(event.WorkerID, cl)
// Check if this cluster has been marked for deletion
if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
if err = cl.Delete(); err != nil {
cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
lg.Error(cl.Error)
return
}
lg.Infoln("cluster has been deleted via update event")
return
}
lg.Infoln("update of the cluster started")
err = cl.Update(event.OldSpec, event.NewSpec)
if err != nil {
cl.Error = fmt.Sprintf("could not update cluster: %v", err)
@ -380,7 +393,6 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
}
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
deprecate := func(deprecated, replacement string) {
c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
}
@ -426,7 +438,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
clusterError string
)
if informerOldSpec != nil { //update, delete
if informerOldSpec != nil { // update, delete
uid = informerOldSpec.GetUID()
clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
@ -441,7 +453,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
} else {
clusterError = informerOldSpec.Error
}
} else { //add, sync
} else { // add, sync
uid = informerNewSpec.GetUID()
clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
clusterError = informerNewSpec.Error
@ -552,7 +564,19 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
pgOld := c.postgresqlCheck(prev)
pgNew := c.postgresqlCheck(cur)
if pgOld != nil && pgNew != nil {
// Avoid the inifinite recursion for status updates
clusterName := util.NameFromMeta(pgNew.ObjectMeta)
// Check if DeletionTimestamp was set (resource marked for deletion)
deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero()
if deletionTimestampChanged {
c.logger.WithField("cluster-name", clusterName).Infof(
"UPDATE event: DeletionTimestamp set to %s, queueing event",
pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
c.queueClusterEvent(pgOld, pgNew, EventUpdate)
return
}
// Avoid the infinite recursion for status updates
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
return
@ -591,7 +615,6 @@ or config maps.
The operator does not sync accounts/role bindings after creation.
*/
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
namespace := event.NewSpec.GetNamespace()
if err := c.createPodServiceAccount(namespace); err != nil {
@ -605,7 +628,6 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
}
func (c *Controller) createPodServiceAccount(namespace string) error {
podServiceAccountName := c.opConfig.PodServiceAccountName
_, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
if k8sutil.ResourceNotFound(err) {
@ -628,7 +650,6 @@ func (c *Controller) createPodServiceAccount(namespace string) error {
}
func (c *Controller) createRoleBindings(namespace string) error {
podServiceAccountName := c.opConfig.PodServiceAccountName
podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name

View File

@ -155,7 +155,7 @@ type ConnectionPooler struct {
NumberOfInstances *int32 `name:"connection_pooler_number_of_instances" default:"2"`
Schema string `name:"connection_pooler_schema" default:"pooler"`
User string `name:"connection_pooler_user" default:"pooler"`
Image string `name:"connection_pooler_image" default:"registry.opensource.zalan.do/acid/pgbouncer"`
Image string `name:"connection_pooler_image" default:"ghcr.io/zalando/postgres-operator/pgbouncer:latest"`
Mode string `name:"connection_pooler_mode" default:"transaction"`
MaxDBConnections *int32 `name:"connection_pooler_max_db_connections" default:"60"`
ConnectionPoolerDefaultCPURequest string `name:"connection_pooler_default_cpu_request"`
@ -173,14 +173,15 @@ type Config struct {
LogicalBackup
ConnectionPooler
WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to'
KubernetesUseConfigMaps bool `name:"kubernetes_use_configmaps" default:"false"`
EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS
MaintenanceWindows []string `name:"maintenance_windows"`
DockerImage string `name:"docker_image" default:"ghcr.io/zalando/spilo-18:4.1-p1"`
SidecarImages map[string]string `name:"sidecar_docker_images"` // deprecated in favour of SidecarContainers
SidecarContainers []v1.Container `name:"sidecars"`
PodServiceAccountName string `name:"pod_service_account_name" default:"postgres-pod"`
WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to'
KubernetesUseConfigMaps bool `name:"kubernetes_use_configmaps" default:"false"`
EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS
EnableMaintenanceWindows *bool `name:"enable_maintenance_windows" default:"true"`
MaintenanceWindows []string `name:"maintenance_windows"`
DockerImage string `name:"docker_image" default:"ghcr.io/zalando/spilo-18:4.1-p1"`
SidecarImages map[string]string `name:"sidecar_docker_images"` // deprecated in favour of SidecarContainers
SidecarContainers []v1.Container `name:"sidecars"`
PodServiceAccountName string `name:"pod_service_account_name" default:"postgres-pod"`
// value of this string must be valid JSON or YAML; see initPodServiceAccount
PodServiceAccountDefinition string `name:"pod_service_account_definition" default:""`
PodServiceAccountRoleBindingDefinition string `name:"pod_service_account_role_binding_definition" default:""`

View File

@ -3,8 +3,6 @@ package constants
// Names and values in Kubernetes annotation for services, statefulsets and volumes
const (
ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname"
ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout"
ElbTimeoutAnnotationValue = "3600"
KubeIAmAnnotation = "iam.amazonaws.com/role"
VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by"
PostgresqlControllerAnnotationKey = "acid.zalan.do/controller"

54
pooler/Dockerfile Normal file
View File

@ -0,0 +1,54 @@
ARG BASE_IMAGE=alpine:3.22
FROM ${BASE_IMAGE} AS build_stage
RUN apk add -U --no-cache \
autoconf \
automake \
curl \
gcc \
libc-dev \
libevent \
libevent-dev \
libtool \
make \
openssl-dev \
pkgconfig \
git
WORKDIR /src
RUN git clone --single-branch --depth 1 https://github.com/pgbouncer/pgbouncer.git . && \
git checkout $(git describe --tags $(git rev-list --tags --max-count=1))
RUN git submodule init && git submodule update
RUN ./autogen.sh && \
./configure --prefix=/pgbouncer --with-libevent=/usr/lib && \
sed -i '/dist_man_MANS/d' Makefile && \
make && \
make install
FROM ${BASE_IMAGE}
RUN apk -U upgrade --no-cache \
&& apk --no-cache add bash c-ares ca-certificates gettext libevent openssl postgresql-client
RUN addgroup -g 101 -S pgbouncer && \
adduser -u 100 -S pgbouncer -G pgbouncer && \
mkdir -p /etc/pgbouncer /var/log/pgbouncer /var/run/pgbouncer /etc/ssl/certs
COPY --from=build_stage /pgbouncer/bin/pgbouncer /bin/pgbouncer
COPY pgbouncer.ini.tmpl /etc/pgbouncer/
COPY entrypoint.sh /entrypoint.sh
RUN chown -R pgbouncer:pgbouncer \
/var/log/pgbouncer \
/var/run/pgbouncer \
/etc/pgbouncer \
/etc/ssl/certs \
&& chmod +x /entrypoint.sh
USER pgbouncer:pgbouncer
WORKDIR /etc/pgbouncer
ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]

19
pooler/entrypoint.sh Executable file
View File

@ -0,0 +1,19 @@
#!/bin/sh
set -ex
if [ -z "${CONNECTION_POOLER_CLIENT_TLS_CRT}" ]; then
openssl req -nodes -new -x509 -subj /CN=spilo.dummy.org \
-keyout /etc/ssl/certs/pgbouncer.key \
-out /etc/ssl/certs/pgbouncer.crt
else
ln -s ${CONNECTION_POOLER_CLIENT_TLS_CRT} /etc/ssl/certs/pgbouncer.crt
ln -s ${CONNECTION_POOLER_CLIENT_TLS_KEY} /etc/ssl/certs/pgbouncer.key
if [ ! -z "${CONNECTION_POOLER_CLIENT_CA_FILE}" ]; then
ln -s ${CONNECTION_POOLER_CLIENT_CA_FILE} /etc/ssl/certs/ca.crt
fi
fi
envsubst < /etc/pgbouncer/pgbouncer.ini.tmpl > /etc/pgbouncer/pgbouncer.ini
exec /bin/pgbouncer /etc/pgbouncer/pgbouncer.ini

70
pooler/pgbouncer.ini.tmpl Normal file
View File

@ -0,0 +1,70 @@
# vim: set ft=dosini:
[databases]
* = host=$PGHOST port=$PGPORT auth_user=$PGUSER
postgres = host=$PGHOST port=$PGPORT auth_user=$PGUSER
[pgbouncer]
pool_mode = $CONNECTION_POOLER_MODE
listen_port = $CONNECTION_POOLER_PORT
listen_addr = *
admin_users = $PGUSER
stats_users = $INFRASTRUCTURE_ROLES
auth_dbname = postgres
auth_file = /etc/pgbouncer/userlist.txt
auth_query = SELECT * FROM $PGSCHEMA.user_lookup($1)
auth_type = md5
logfile = /var/log/pgbouncer/pgbouncer.log
pidfile = /var/run/pgbouncer/pgbouncer.pid
server_tls_sslmode = require
server_tls_ca_file = /etc/ssl/certs/pgbouncer.crt
server_tls_protocols = secure
client_tls_sslmode = require
client_tls_key_file = /etc/ssl/certs/pgbouncer.key
client_tls_cert_file = /etc/ssl/certs/pgbouncer.crt
log_connections = 0
log_disconnections = 0
# Number of prepared statements to cache on a server connection (zero value
# disables support of prepared statements).
max_prepared_statements = 200
# How many server connections to allow per user/database pair.
default_pool_size = $CONNECTION_POOLER_DEFAULT_SIZE
# Add more server connections to pool if below this number. Improves behavior
# when usual load comes suddenly back after period of total inactivity.
#
# NOTE: This value is per pool, i.e. a pair of (db, user), not a global one.
# Which means on the higher level it has to be calculated from the max allowed
# database connections and number of databases and users. If not taken into
# account, then for too many users or databases PgBouncer will go crazy
# opening/evicting connections. For now disable it.
#
# min_pool_size = $CONNECTION_POOLER_MIN_SIZE
# How many additional connections to allow to a pool
reserve_pool_size = $CONNECTION_POOLER_RESERVE_SIZE
# Maximum number of client connections allowed.
max_client_conn = $CONNECTION_POOLER_MAX_CLIENT_CONN
# Do not allow more than this many connections per database (regardless of
# pool, i.e. user)
max_db_connections = $CONNECTION_POOLER_MAX_DB_CONN
# If a client has been in "idle in transaction" state longer, it will be
# disconnected. [seconds]
idle_transaction_timeout = 600
# If login failed, because of failure from connect() or authentication that
# pooler waits this much before retrying to connect. Default is 15. [seconds]
server_login_retry = 5
# To ignore extra parameter in startup packet. By default only 'database' and
# 'user' are allowed, all others raise error. This is needed to tolerate
# overenthusiastic JDBC wanting to unconditionally set 'extra_float_digits=2'
# in startup packet.
ignore_startup_parameters = extra_float_digits,options

View File

@ -77,7 +77,7 @@ spec:
"17",
"16",
"15",
"14",
"14"
]
}
# Exemple of settings to make snapshot view working in the ui when using AWS

View File

@ -321,11 +321,18 @@ def read_basebackups(
suffix = '' if uid == 'base' else '/' + uid
backups = []
# Reuse a single S3 client configured with AWS_ENDPOINT so MinIO /
# other S3-compatible backends are hit for list+get calls too. The
# previous plain client('s3') fell back to the default AWS endpoint
# and returned empty data against a custom endpoint; read_stored_clusters
# and read_versions already pass endpoint_url=AWS_ENDPOINT (#3078).
s3_client = client('s3', endpoint_url=AWS_ENDPOINT)
for vp in postgresql_versions:
backup_prefix = f'{prefix}{pg_cluster}{suffix}/wal/{vp}/basebackups_005/'
logger.info(f"{bucket}/{backup_prefix}")
paginator = client('s3').get_paginator('list_objects_v2')
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=backup_prefix)
for page in pages:
@ -334,7 +341,7 @@ def read_basebackups(
if not key.endswith("backup_stop_sentinel.json"):
continue
response = client('s3').get_object(Bucket=bucket, Key=key)
response = s3_client.get_object(Bucket=bucket, Key=key)
backup_info = loads(response["Body"].read().decode("utf-8"))
last_modified = response["LastModified"].astimezone(timezone.utc).isoformat()

View File

@ -11,4 +11,4 @@ kubernetes==11.0.0
python-json-logger==2.0.7
requests==2.32.4
stups-tokens>=1.1.19
werkzeug==3.1.5
werkzeug==3.1.6