Merge branch 'master' into master

This commit is contained in:
Felix Kunde 2025-07-15 11:26:20 +02:00 committed by GitHub
commit 9d95785a2b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
69 changed files with 1191 additions and 363 deletions

View File

@ -66,8 +66,6 @@ podLabels:
extraEnvs: extraEnvs:
[] []
# Exemple of settings to make snapshot view working in the ui when using AWS # Exemple of settings to make snapshot view working in the ui when using AWS
# - name: WALE_S3_ENDPOINT
# value: https+path://s3.us-east-1.amazonaws.com:443
# - name: SPILO_S3_BACKUP_PREFIX # - name: SPILO_S3_BACKUP_PREFIX
# value: spilo/ # value: spilo/
# - name: AWS_ACCESS_KEY_ID # - name: AWS_ACCESS_KEY_ID
@ -87,8 +85,6 @@ extraEnvs:
# key: AWS_DEFAULT_REGION # key: AWS_DEFAULT_REGION
# - name: SPILO_S3_BACKUP_BUCKET # - name: SPILO_S3_BACKUP_BUCKET
# value: <s3 bucket used by the operator> # value: <s3 bucket used by the operator>
# - name: "USE_AWS_INSTANCE_PROFILE"
# value: "true"
# configure UI service # configure UI service
service: service:

View File

@ -364,7 +364,7 @@ configLogicalBackup:
# logical_backup_memory_request: "" # logical_backup_memory_request: ""
# image for pods of the logical backup job (example runs pg_dumpall) # image for pods of the logical backup job (example runs pg_dumpall)
logical_backup_docker_image: "ghcr.io/zalando/postgres-operator/logical-backup:v1.13.0" logical_backup_docker_image: "ghcr.io/zalando/postgres-operator/logical-backup:v1.14.0"
# path of google cloud service account json file # path of google cloud service account json file
# logical_backup_google_application_credentials: "" # logical_backup_google_application_credentials: ""

View File

@ -384,7 +384,7 @@ exceptions:
The interval of days can be set with `password_rotation_interval` (default The interval of days can be set with `password_rotation_interval` (default
`90` = 90 days, minimum 1). On each rotation the user name and password values `90` = 90 days, minimum 1). On each rotation the user name and password values
are replaced in the K8s secret. They belong to a newly created user named after are replaced in the K8s secret. They belong to a newly created user named after
the original role plus rotation date in YYMMDD format. All priviliges are the original role plus rotation date in YYMMDD format. All privileges are
inherited meaning that migration scripts should still grant and revoke rights inherited meaning that migration scripts should still grant and revoke rights
against the original role. The timestamp of the next rotation (in RFC 3339 against the original role. The timestamp of the next rotation (in RFC 3339
format, UTC timezone) is written to the secret as well. Note, if the rotation format, UTC timezone) is written to the secret as well. Note, if the rotation
@ -564,7 +564,7 @@ manifest affinity.
``` ```
If `node_readiness_label_merge` is set to `"OR"` (default) the readiness label If `node_readiness_label_merge` is set to `"OR"` (default) the readiness label
affinty will be appended with its own expressions block: affinity will be appended with its own expressions block:
```yaml ```yaml
affinity: affinity:
@ -620,22 +620,34 @@ By default the topology key for the pod anti affinity is set to
`kubernetes.io/hostname`, you can set another topology key e.g. `kubernetes.io/hostname`, you can set another topology key e.g.
`failure-domain.beta.kubernetes.io/zone`. See [built-in node labels](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#interlude-built-in-node-labels) for available topology keys. `failure-domain.beta.kubernetes.io/zone`. See [built-in node labels](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#interlude-built-in-node-labels) for available topology keys.
## Pod Disruption Budget ## Pod Disruption Budgets
By default the operator uses a PodDisruptionBudget (PDB) to protect the cluster By default the operator creates two PodDisruptionBudgets (PDB) to protect the cluster
from voluntarily disruptions and hence unwanted DB downtime. The `MinAvailable` from voluntarily disruptions and hence unwanted DB downtime: so-called primary PDB and
parameter of the PDB is set to `1` which prevents killing masters in single-node and PDB for critical operations.
clusters and/or the last remaining running instance in a multi-node cluster.
### Primary PDB
The `MinAvailable` parameter of this PDB is set to `1` and, if `pdb_master_label_selector`
is enabled, label selector includes `spilo-role=master` condition, which prevents killing
masters in single-node clusters and/or the last remaining running instance in a multi-node
cluster.
## PDB for critical operations
The `MinAvailable` parameter of this PDB is equal to the `numberOfInstances` set in the
cluster manifest, while label selector includes `critical-operation=true` condition. This
allows to protect all pods of a cluster, given they are labeled accordingly.
For example, Operator labels all Spilo pods with `critical-operation=true` during the major
version upgrade run. You may want to protect cluster pods during other critical operations
by assigning the label to pods yourself or using other means of automation.
The PDB is only relaxed in two scenarios: The PDB is only relaxed in two scenarios:
* If a cluster is scaled down to `0` instances (e.g. for draining nodes) * If a cluster is scaled down to `0` instances (e.g. for draining nodes)
* If the PDB is disabled in the configuration (`enable_pod_disruption_budget`) * If the PDB is disabled in the configuration (`enable_pod_disruption_budget`)
The PDB is still in place having `MinAvailable` set to `0`. If enabled it will The PDBs are still in place having `MinAvailable` set to `0`. Disabling PDBs
be automatically set to `1` on scale up. Disabling PDBs helps avoiding blocking helps avoiding blocking Kubernetes upgrades in managed K8s environments at the
Kubernetes upgrades in managed K8s environments at the cost of prolonged DB cost of prolonged DB downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384)
downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384)
for the use case. for the use case.
## Add cluster-specific labels ## Add cluster-specific labels
@ -1128,7 +1140,7 @@ metadata:
iam.gke.io/gcp-service-account: <GCP_SERVICE_ACCOUNT_NAME>@<GCP_PROJECT_ID>.iam.gserviceaccount.com iam.gke.io/gcp-service-account: <GCP_SERVICE_ACCOUNT_NAME>@<GCP_PROJECT_ID>.iam.gserviceaccount.com
``` ```
2. Specify the new custom service account in your [operator paramaters](./reference/operator_parameters.md) 2. Specify the new custom service account in your [operator parameters](./reference/operator_parameters.md)
If using manual deployment or kustomize, this is done by setting If using manual deployment or kustomize, this is done by setting
`pod_service_account_name` in your configuration file specified in the `pod_service_account_name` in your configuration file specified in the

View File

@ -230,7 +230,7 @@ kubectl delete postgresql acid-minimal-cluster
``` ```
This should remove the associated StatefulSet, database Pods, Services and This should remove the associated StatefulSet, database Pods, Services and
Endpoints. The PersistentVolumes are released and the PodDisruptionBudget is Endpoints. The PersistentVolumes are released and the PodDisruptionBudgets are
deleted. Secrets however are not deleted and backups will remain in place. deleted. Secrets however are not deleted and backups will remain in place.
When deleting a cluster while it is still starting up or got stuck during that When deleting a cluster while it is still starting up or got stuck during that

View File

@ -116,9 +116,9 @@ These parameters are grouped directly under the `spec` key in the manifest.
* **maintenanceWindows** * **maintenanceWindows**
a list which defines specific time frames when certain maintenance operations a list which defines specific time frames when certain maintenance operations
are allowed. So far, it is only implemented for automatic major version such as automatic major upgrades or master pod migration. Accepted formats
upgrades. Accepted formats are "01:00-06:00" for daily maintenance windows or are "01:00-06:00" for daily maintenance windows or "Sat:00:00-04:00" for specific
"Sat:00:00-04:00" for specific days, with all times in UTC. days, with all times in UTC.
* **users** * **users**
a map of usernames to user flags for the users that should be created in the a map of usernames to user flags for the users that should be created in the
@ -247,7 +247,7 @@ These parameters are grouped directly under the `spec` key in the manifest.
[kubernetes volumeSource](https://godoc.org/k8s.io/api/core/v1#VolumeSource). [kubernetes volumeSource](https://godoc.org/k8s.io/api/core/v1#VolumeSource).
It allows you to mount existing PersistentVolumeClaims, ConfigMaps and Secrets inside the StatefulSet. It allows you to mount existing PersistentVolumeClaims, ConfigMaps and Secrets inside the StatefulSet.
Also an `emptyDir` volume can be shared between initContainer and statefulSet. Also an `emptyDir` volume can be shared between initContainer and statefulSet.
Additionaly, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example). Additionally, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example).
Set `isSubPathExpr` to true if you want to include [API environment variables](https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath-expanded-environment). Set `isSubPathExpr` to true if you want to include [API environment variables](https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath-expanded-environment).
You can also specify in which container the additional Volumes will be mounted with the `targetContainers` array option. You can also specify in which container the additional Volumes will be mounted with the `targetContainers` array option.
If `targetContainers` is empty, additional volumes will be mounted only in the `postgres` container. If `targetContainers` is empty, additional volumes will be mounted only in the `postgres` container.
@ -257,7 +257,7 @@ These parameters are grouped directly under the `spec` key in the manifest.
## Prepared Databases ## Prepared Databases
The operator can create databases with default owner, reader and writer roles The operator can create databases with default owner, reader and writer roles
without the need to specifiy them under `users` or `databases` sections. Those without the need to specify them under `users` or `databases` sections. Those
parameters are grouped under the `preparedDatabases` top-level key. For more parameters are grouped under the `preparedDatabases` top-level key. For more
information, see [user docs](../user.md#prepared-databases-with-roles-and-default-privileges). information, see [user docs](../user.md#prepared-databases-with-roles-and-default-privileges).

View File

@ -209,7 +209,7 @@ under the `users` key.
For all `LOGIN` roles that are not database owners the operator can rotate For all `LOGIN` roles that are not database owners the operator can rotate
credentials in the corresponding K8s secrets by replacing the username and credentials in the corresponding K8s secrets by replacing the username and
password. This means, new users will be added on each rotation inheriting password. This means, new users will be added on each rotation inheriting
all priviliges from the original roles. The rotation date (in YYMMDD format) all privileges from the original roles. The rotation date (in YYMMDD format)
is appended to the names of the new user. The timestamp of the next rotation is appended to the names of the new user. The timestamp of the next rotation
is written to the secret. The default is `false`. is written to the secret. The default is `false`.
@ -334,13 +334,13 @@ configuration they are grouped under the `kubernetes` key.
pod namespace). pod namespace).
* **pdb_name_format** * **pdb_name_format**
defines the template for PDB (Pod Disruption Budget) names created by the defines the template for primary PDB (Pod Disruption Budget) name created by the
operator. The default is `postgres-{cluster}-pdb`, where `{cluster}` is operator. The default is `postgres-{cluster}-pdb`, where `{cluster}` is
replaced by the cluster name. Only the `{cluster}` placeholders is allowed in replaced by the cluster name. Only the `{cluster}` placeholders is allowed in
the template. the template.
* **pdb_master_label_selector** * **pdb_master_label_selector**
By default the PDB will match the master role hence preventing nodes to be By default the primary PDB will match the master role hence preventing nodes to be
drained if the node_readiness_label is not used. If this option if set to drained if the node_readiness_label is not used. If this option if set to
`false` the `spilo-role=master` selector will not be added to the PDB. `false` the `spilo-role=master` selector will not be added to the PDB.
@ -552,7 +552,7 @@ configuration they are grouped under the `kubernetes` key.
pods with `InitialDelaySeconds: 6`, `PeriodSeconds: 10`, `TimeoutSeconds: 5`, pods with `InitialDelaySeconds: 6`, `PeriodSeconds: 10`, `TimeoutSeconds: 5`,
`SuccessThreshold: 1` and `FailureThreshold: 3`. When enabling readiness `SuccessThreshold: 1` and `FailureThreshold: 3`. When enabling readiness
probes it is recommended to switch the `pod_management_policy` to `parallel` probes it is recommended to switch the `pod_management_policy` to `parallel`
to avoid unneccesary waiting times in case of multiple instances failing. to avoid unnecessary waiting times in case of multiple instances failing.
The default is `false`. The default is `false`.
* **storage_resize_mode** * **storage_resize_mode**
@ -701,7 +701,7 @@ In the CRD-based configuration they are grouped under the `load_balancer` key.
replaced by the cluster name, `{namespace}` is replaced with the namespace replaced by the cluster name, `{namespace}` is replaced with the namespace
and `{hostedzone}` is replaced with the hosted zone (the value of the and `{hostedzone}` is replaced with the hosted zone (the value of the
`db_hosted_zone` parameter). The `{team}` placeholder can still be used, `db_hosted_zone` parameter). The `{team}` placeholder can still be used,
although it is not recommened because the team of a cluster can change. although it is not recommended because the team of a cluster can change.
If the cluster name starts with the `teamId` it will also be part of the If the cluster name starts with the `teamId` it will also be part of the
DNS, aynway. No other placeholders are allowed! DNS, aynway. No other placeholders are allowed!
@ -720,7 +720,7 @@ In the CRD-based configuration they are grouped under the `load_balancer` key.
is replaced by the cluster name, `{namespace}` is replaced with the is replaced by the cluster name, `{namespace}` is replaced with the
namespace and `{hostedzone}` is replaced with the hosted zone (the value of namespace and `{hostedzone}` is replaced with the hosted zone (the value of
the `db_hosted_zone` parameter). The `{team}` placeholder can still be used, the `db_hosted_zone` parameter). The `{team}` placeholder can still be used,
although it is not recommened because the team of a cluster can change. although it is not recommended because the team of a cluster can change.
If the cluster name starts with the `teamId` it will also be part of the If the cluster name starts with the `teamId` it will also be part of the
DNS, aynway. No other placeholders are allowed! DNS, aynway. No other placeholders are allowed!

View File

@ -900,7 +900,7 @@ the PostgreSQL version between source and target cluster has to be the same.
To start a cluster as standby, add the following `standby` section in the YAML To start a cluster as standby, add the following `standby` section in the YAML
file. You can stream changes from archived WAL files (AWS S3 or Google Cloud file. You can stream changes from archived WAL files (AWS S3 or Google Cloud
Storage) or from a remote primary. Only one option can be specfied in the Storage) or from a remote primary. Only one option can be specified in the
manifest: manifest:
```yaml ```yaml
@ -911,7 +911,7 @@ spec:
For GCS, you have to define STANDBY_GOOGLE_APPLICATION_CREDENTIALS as a For GCS, you have to define STANDBY_GOOGLE_APPLICATION_CREDENTIALS as a
[custom pod environment variable](administrator.md#custom-pod-environment-variables). [custom pod environment variable](administrator.md#custom-pod-environment-variables).
It is not set from the config to allow for overridding. It is not set from the config to allow for overriding.
```yaml ```yaml
spec: spec:
@ -1282,7 +1282,7 @@ minutes if the certificates have changed and reloads postgres accordingly.
### TLS certificates for connection pooler ### TLS certificates for connection pooler
By default, the pgBouncer image generates its own TLS certificate like Spilo. By default, the pgBouncer image generates its own TLS certificate like Spilo.
When the `tls` section is specfied in the manifest it will be used for the When the `tls` section is specified in the manifest it will be used for the
connection pooler pod(s) as well. The security context options are hard coded connection pooler pod(s) as well. The security context options are hard coded
to `runAsUser: 100` and `runAsGroup: 101`. The `fsGroup` will be the same to `runAsUser: 100` and `runAsGroup: 101`. The `fsGroup` will be the same
like for Spilo. like for Spilo.

View File

@ -1187,7 +1187,7 @@ class EndToEndTestCase(unittest.TestCase):
Test major version upgrade: with full upgrade, maintenance window, and annotation Test major version upgrade: with full upgrade, maintenance window, and annotation
""" """
def check_version(): def check_version():
p = k8s.patroni_rest("acid-upgrade-test-0", "") p = k8s.patroni_rest("acid-upgrade-test-0", "") or {}
version = p.get("server_version", 0) // 10000 version = p.get("server_version", 0) // 10000
return version return version
@ -1237,7 +1237,7 @@ class EndToEndTestCase(unittest.TestCase):
# should not upgrade because current time is not in maintenanceWindow # should not upgrade because current time is not in maintenanceWindow
current_time = datetime.now() current_time = datetime.now()
maintenance_window_future = f"{(current_time+timedelta(minutes=60)).strftime('%H:%M')}-{(current_time+timedelta(minutes=120)).strftime('%H:%M')}" maintenance_window_future = f"{(current_time+timedelta(minutes=60)).strftime('%H:%M')}-{(current_time+timedelta(minutes=120)).strftime('%H:%M')}"
pg_patch_version_15 = { pg_patch_version_15_outside_mw = {
"spec": { "spec": {
"postgresql": { "postgresql": {
"version": "15" "version": "15"
@ -1248,10 +1248,10 @@ class EndToEndTestCase(unittest.TestCase):
} }
} }
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15) "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_outside_mw)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label) # no pod replacement outside of the maintenance window
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 14, "Version should not be upgraded") self.eventuallyEqual(check_version, 14, "Version should not be upgraded")
@ -1259,12 +1259,12 @@ class EndToEndTestCase(unittest.TestCase):
second_annotations = get_annotations() second_annotations = get_annotations()
self.assertIsNone(second_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure should not be set") self.assertIsNone(second_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure should not be set")
# change the version again to trigger operator sync # change maintenanceWindows to current
maintenance_window_current = f"{(current_time-timedelta(minutes=30)).strftime('%H:%M')}-{(current_time+timedelta(minutes=30)).strftime('%H:%M')}" maintenance_window_current = f"{(current_time-timedelta(minutes=30)).strftime('%H:%M')}-{(current_time+timedelta(minutes=30)).strftime('%H:%M')}"
pg_patch_version_16 = { pg_patch_version_15_in_mw = {
"spec": { "spec": {
"postgresql": { "postgresql": {
"version": "16" "version": "15"
}, },
"maintenanceWindows": [ "maintenanceWindows": [
maintenance_window_current maintenance_window_current
@ -1273,13 +1273,13 @@ class EndToEndTestCase(unittest.TestCase):
} }
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_16) "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label) k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 16, "Version should be upgraded from 14 to 16") self.eventuallyEqual(check_version, 15, "Version should be upgraded from 14 to 15")
# check if annotation for last upgrade's success is updated after second upgrade # check if annotation for last upgrade's success is updated after second upgrade
third_annotations = get_annotations() third_annotations = get_annotations()
@ -1303,20 +1303,20 @@ class EndToEndTestCase(unittest.TestCase):
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_17) "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_17)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 16, "Version should not be upgraded because annotation for last upgrade's failure is set")
# change the version back to 15 and should remove failure annotation
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label) k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 15, "Version should not be upgraded because annotation for last upgrade's failure is set")
# change the version back to 15 and should remove failure annotation
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 15, "Version should not be upgraded from 15")
fourth_annotations = get_annotations() fourth_annotations = get_annotations()
self.assertIsNone(fourth_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure is not removed") self.assertIsNone(fourth_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure is not removed")
@ -1752,9 +1752,13 @@ class EndToEndTestCase(unittest.TestCase):
Test password rotation and removal of users due to retention policy Test password rotation and removal of users due to retention policy
''' '''
k8s = self.k8s k8s = self.k8s
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
leader = k8s.get_cluster_leader_pod() leader = k8s.get_cluster_leader_pod()
today = date.today() today = date.today()
# remember number of secrets to make sure it stays the same
secret_count = k8s.count_secrets_with_label(cluster_label)
# enable password rotation for owner of foo database # enable password rotation for owner of foo database
pg_patch_rotation_single_users = { pg_patch_rotation_single_users = {
"spec": { "spec": {
@ -1810,6 +1814,7 @@ class EndToEndTestCase(unittest.TestCase):
enable_password_rotation = { enable_password_rotation = {
"data": { "data": {
"enable_password_rotation": "true", "enable_password_rotation": "true",
"inherited_annotations": "environment",
"password_rotation_interval": "30", "password_rotation_interval": "30",
"password_rotation_user_retention": "30", # should be set to 60 "password_rotation_user_retention": "30", # should be set to 60
}, },
@ -1856,13 +1861,29 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: len(self.query_database_with_user(leader.metadata.name, "postgres", "SELECT 1", "foo_user")), 1, self.eventuallyEqual(lambda: len(self.query_database_with_user(leader.metadata.name, "postgres", "SELECT 1", "foo_user")), 1,
"Could not connect to the database with rotation user {}".format(rotation_user), 10, 5) "Could not connect to the database with rotation user {}".format(rotation_user), 10, 5)
# add annotation which triggers syncSecrets call
pg_annotation_patch = {
"metadata": {
"annotations": {
"environment": "test",
}
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_annotation_patch)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
time.sleep(10)
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), secret_count, "Unexpected number of secrets")
# check if rotation has been ignored for user from test_cross_namespace_secrets test # check if rotation has been ignored for user from test_cross_namespace_secrets test
db_user_secret = k8s.get_secret(username="test.db_user", namespace="test") db_user_secret = k8s.get_secret(username="test.db_user", namespace="test")
secret_username = str(base64.b64decode(db_user_secret.data["username"]), 'utf-8') secret_username = str(base64.b64decode(db_user_secret.data["username"]), 'utf-8')
self.assertEqual("test.db_user", secret_username, self.assertEqual("test.db_user", secret_username,
"Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username)) "Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username))
# check if annotation for secret has been updated
self.assertTrue("environment" in db_user_secret.metadata.annotations, "Added annotation was not propagated to secret")
# disable password rotation for all other users (foo_user) # disable password rotation for all other users (foo_user)
# and pick smaller intervals to see if the third fake rotation user is dropped # and pick smaller intervals to see if the third fake rotation user is dropped
enable_password_rotation = { enable_password_rotation = {
@ -2100,7 +2121,7 @@ class EndToEndTestCase(unittest.TestCase):
patch_sset_propagate_annotations = { patch_sset_propagate_annotations = {
"data": { "data": {
"downscaler_annotations": "deployment-time,downscaler/*", "downscaler_annotations": "deployment-time,downscaler/*",
"inherited_annotations": "owned-by", "inherited_annotations": "environment,owned-by",
} }
} }
k8s.update_config(patch_sset_propagate_annotations) k8s.update_config(patch_sset_propagate_annotations)
@ -2547,7 +2568,10 @@ class EndToEndTestCase(unittest.TestCase):
self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed") self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed")
pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace) pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "primary pod disruption budget owner reference check failed")
pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-op-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget for critical operations owner reference check failed")
pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace) pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed") self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed")

View File

@ -2,7 +2,7 @@
// +build !ignore_autogenerated // +build !ignore_autogenerated
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -59,16 +59,17 @@ type Config struct {
} }
type kubeResources struct { type kubeResources struct {
Services map[PostgresRole]*v1.Service Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints Endpoints map[PostgresRole]*v1.Endpoints
PatroniEndpoints map[string]*v1.Endpoints PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet Statefulset *appsv1.StatefulSet
VolumeClaims map[types.UID]*v1.PersistentVolumeClaim VolumeClaims map[types.UID]*v1.PersistentVolumeClaim
PodDisruptionBudget *policyv1.PodDisruptionBudget PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
Streams map[string]*zalandov1.FabricEventStream LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately //Pods are treated separately
} }
@ -105,10 +106,17 @@ type Cluster struct {
} }
type compareStatefulsetResult struct { type compareStatefulsetResult struct {
match bool match bool
replace bool replace bool
rollingUpdate bool rollingUpdate bool
reasons []string reasons []string
deletedPodAnnotations []string
}
type compareLogicalBackupJobResult struct {
match bool
reasons []string
deletedPodAnnotations []string
} }
// New creates a new cluster. This function should be called from a controller. // New creates a new cluster. This function should be called from a controller.
@ -336,14 +344,10 @@ func (c *Cluster) Create() (err error) {
c.logger.Infof("secrets have been successfully created") c.logger.Infof("secrets have been successfully created")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created")
if c.PodDisruptionBudget != nil { if err = c.createPodDisruptionBudgets(); err != nil {
return fmt.Errorf("pod disruption budget already exists in the cluster") return fmt.Errorf("could not create pod disruption budgets: %v", err)
} }
pdb, err := c.createPodDisruptionBudget() c.logger.Info("pod disruption budgets have been successfully created")
if err != nil {
return fmt.Errorf("could not create pod disruption budget: %v", err)
}
c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta))
if c.Statefulset != nil { if c.Statefulset != nil {
return fmt.Errorf("statefulset already exists in the cluster") return fmt.Errorf("statefulset already exists in the cluster")
@ -431,6 +435,7 @@ func (c *Cluster) Create() (err error) {
} }
func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compareStatefulsetResult { func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compareStatefulsetResult {
deletedPodAnnotations := []string{}
reasons := make([]string, 0) reasons := make([]string, 0)
var match, needsRollUpdate, needsReplace bool var match, needsRollUpdate, needsReplace bool
@ -445,7 +450,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
needsReplace = true needsReplace = true
reasons = append(reasons, "new statefulset's ownerReferences do not match") reasons = append(reasons, "new statefulset's ownerReferences do not match")
} }
if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations); changed { if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations, nil); changed {
match = false match = false
needsReplace = true needsReplace = true
reasons = append(reasons, "new statefulset's annotations do not match: "+reason) reasons = append(reasons, "new statefulset's annotations do not match: "+reason)
@ -519,7 +524,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
} }
} }
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed { if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations, &deletedPodAnnotations); changed {
match = false match = false
needsReplace = true needsReplace = true
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason) reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
@ -541,7 +546,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i)) reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
continue continue
} }
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed { if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations, nil); changed {
needsReplace = true needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q do not match the current ones: %s", name, reason)) reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q do not match the current ones: %s", name, reason))
} }
@ -579,7 +584,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
match = false match = false
} }
return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace} return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace, deletedPodAnnotations: deletedPodAnnotations}
} }
type containerCondition func(a, b v1.Container) bool type containerCondition func(a, b v1.Container) bool
@ -781,7 +786,7 @@ func volumeMountExists(mount v1.VolumeMount, mounts []v1.VolumeMount) bool {
return false return false
} }
func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) { func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]string) (bool, string) {
reason := "" reason := ""
ignoredAnnotations := make(map[string]bool) ignoredAnnotations := make(map[string]bool)
for _, ignore := range c.OpConfig.IgnoredAnnotations { for _, ignore := range c.OpConfig.IgnoredAnnotations {
@ -794,6 +799,9 @@ func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string)
} }
if _, ok := new[key]; !ok { if _, ok := new[key]; !ok {
reason += fmt.Sprintf(" Removed %q.", key) reason += fmt.Sprintf(" Removed %q.", key)
if removedList != nil {
*removedList = append(*removedList, key)
}
} }
} }
@ -836,41 +844,46 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
return true, "" return true, ""
} }
func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) { func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLogicalBackupJobResult {
deletedPodAnnotations := []string{}
reasons := make([]string, 0)
match := true
if cur.Spec.Schedule != new.Spec.Schedule { if cur.Spec.Schedule != new.Spec.Schedule {
return false, fmt.Sprintf("new job's schedule %q does not match the current one %q", match = false
new.Spec.Schedule, cur.Spec.Schedule) reasons = append(reasons, fmt.Sprintf("new job's schedule %q does not match the current one %q", new.Spec.Schedule, cur.Spec.Schedule))
} }
newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
curImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image curImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
if newImage != curImage { if newImage != curImage {
return false, fmt.Sprintf("new job's image %q does not match the current one %q", match = false
newImage, curImage) reasons = append(reasons, fmt.Sprintf("new job's image %q does not match the current one %q", newImage, curImage))
} }
newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed { if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation, &deletedPodAnnotations); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason) match = false
reasons = append(reasons, fmt.Sprint("new job's pod template metadata annotations do not match "+reason))
} }
newPgVersion := getPgVersion(new) newPgVersion := getPgVersion(new)
curPgVersion := getPgVersion(cur) curPgVersion := getPgVersion(cur)
if newPgVersion != curPgVersion { if newPgVersion != curPgVersion {
return false, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", match = false
newPgVersion, curPgVersion) reasons = append(reasons, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", newPgVersion, curPgVersion))
} }
needsReplace := false needsReplace := false
reasons := make([]string, 0) contReasons := make([]string, 0)
needsReplace, reasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, reasons) needsReplace, contReasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, contReasons)
if needsReplace { if needsReplace {
return false, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(reasons, `', '`)) match = false
reasons = append(reasons, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(contReasons, `', '`)))
} }
return true, "" return &compareLogicalBackupJobResult{match: match, reasons: reasons, deletedPodAnnotations: deletedPodAnnotations}
} }
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) { func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
@ -881,7 +894,7 @@ func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBud
if !reflect.DeepEqual(new.ObjectMeta.OwnerReferences, cur.ObjectMeta.OwnerReferences) { if !reflect.DeepEqual(new.ObjectMeta.OwnerReferences, cur.ObjectMeta.OwnerReferences) {
return false, "new PDB's owner references do not match the current ones" return false, "new PDB's owner references do not match the current ones"
} }
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed { if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations, nil); changed {
return false, "new PDB's annotations do not match the current ones:" + reason return false, "new PDB's annotations do not match the current ones:" + reason
} }
return true, "" return true, ""
@ -957,6 +970,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
defer c.mu.Unlock() defer c.mu.Unlock()
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating) c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
// do not apply any major version related changes yet
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}
c.setSpec(newSpec) c.setSpec(newSpec)
defer func() { defer func() {
@ -1016,10 +1034,18 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// only when streams were not specified in oldSpec but in newSpec // only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)
initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
if initUsers {
// if inherited annotations differ secrets have to be synced on update
newAnnotations := c.annotationsSet(nil)
oldAnnotations := make(map[string]string)
for _, secret := range c.Secrets {
oldAnnotations = secret.ObjectMeta.Annotations
break
}
annotationsChanged, _ := c.compareAnnotations(oldAnnotations, newAnnotations, nil)
if initUsers || annotationsChanged {
c.logger.Debug("initialize users") c.logger.Debug("initialize users")
if err := c.initUsers(); err != nil { if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err) c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
@ -1027,8 +1053,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true updateFailed = true
return return
} }
}
if initUsers || annotationsChanged {
c.logger.Debug("syncing secrets") c.logger.Debug("syncing secrets")
//TODO: mind the secrets of the deleted/new users //TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil { if err := c.syncSecrets(); err != nil {
@ -1060,9 +1085,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
} }
} }
// pod disruption budget // pod disruption budgets
if err := c.syncPodDisruptionBudget(true); err != nil { if err := c.syncPodDisruptionBudgets(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err) c.logger.Errorf("could not sync pod disruption budgets: %v", err)
updateFailed = true updateFailed = true
} }
@ -1135,6 +1160,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// streams // streams
if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) { if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) {
c.logger.Debug("syncing streams")
if err := c.syncStreams(); err != nil { if err := c.syncStreams(); err != nil {
c.logger.Errorf("could not sync streams: %v", err) c.logger.Errorf("could not sync streams: %v", err)
updateFailed = true updateFailed = true
@ -1207,10 +1233,10 @@ func (c *Cluster) Delete() error {
c.logger.Info("not deleting secrets because disabled in configuration") c.logger.Info("not deleting secrets because disabled in configuration")
} }
if err := c.deletePodDisruptionBudget(); err != nil { if err := c.deletePodDisruptionBudgets(); err != nil {
anyErrors = true anyErrors = true
c.logger.Warningf("could not delete pod disruption budget: %v", err) c.logger.Warningf("could not delete pod disruption budgets: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budgets: %v", err)
} }
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []PostgresRole{Master, Replica} {
@ -1709,16 +1735,17 @@ func (c *Cluster) GetCurrentProcess() Process {
// GetStatus provides status of the cluster // GetStatus provides status of the cluster
func (c *Cluster) GetStatus() *ClusterStatus { func (c *Cluster) GetStatus() *ClusterStatus {
status := &ClusterStatus{ status := &ClusterStatus{
Cluster: c.Name, Cluster: c.Name,
Namespace: c.Namespace, Namespace: c.Namespace,
Team: c.Spec.TeamID, Team: c.Spec.TeamID,
Status: c.Status, Status: c.Status,
Spec: c.Spec, Spec: c.Spec,
MasterService: c.GetServiceMaster(), MasterService: c.GetServiceMaster(),
ReplicaService: c.GetServiceReplica(), ReplicaService: c.GetServiceReplica(),
StatefulSet: c.GetStatefulSet(), StatefulSet: c.GetStatefulSet(),
PodDisruptionBudget: c.GetPodDisruptionBudget(), PrimaryPodDisruptionBudget: c.GetPrimaryPodDisruptionBudget(),
CurrentProcess: c.GetCurrentProcess(), CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(),
CurrentProcess: c.GetCurrentProcess(),
Error: fmt.Errorf("error: %s", c.Error), Error: fmt.Errorf("error: %s", c.Error),
} }
@ -1731,18 +1758,58 @@ func (c *Cluster) GetStatus() *ClusterStatus {
return status return status
} }
// Switchover does a switchover (via Patroni) to a candidate pod func (c *Cluster) GetSwitchoverSchedule() string {
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error { var possibleSwitchover, schedule time.Time
now := time.Now().UTC()
for _, window := range c.Spec.MaintenanceWindows {
// in the best case it is possible today
possibleSwitchover = time.Date(now.Year(), now.Month(), now.Day(), window.StartTime.Hour(), window.StartTime.Minute(), 0, 0, time.UTC)
if window.Everyday {
if now.After(possibleSwitchover) {
// we are already past the time for today, try tomorrow
possibleSwitchover = possibleSwitchover.AddDate(0, 0, 1)
}
} else {
if now.Weekday() != window.Weekday {
// get closest possible time for this window
possibleSwitchover = possibleSwitchover.AddDate(0, 0, int((7+window.Weekday-now.Weekday())%7))
} else if now.After(possibleSwitchover) {
// we are already past the time for today, try next week
possibleSwitchover = possibleSwitchover.AddDate(0, 0, 7)
}
}
if (schedule == time.Time{}) || possibleSwitchover.Before(schedule) {
schedule = possibleSwitchover
}
}
return schedule.Format("2006-01-02T15:04+00")
}
// Switchover does a switchover (via Patroni) to a candidate pod
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName, scheduled bool) error {
var err error var err error
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
ch := c.registerPodSubscriber(candidate) ch := c.registerPodSubscriber(candidate)
defer c.unregisterPodSubscriber(candidate) defer c.unregisterPodSubscriber(candidate)
defer close(stopCh) defer close(stopCh)
if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { var scheduled_at string
if scheduled {
scheduled_at = c.GetSwitchoverSchedule()
} else {
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
scheduled_at = ""
}
if err = c.patroni.Switchover(curMaster, candidate.Name, scheduled_at); err == nil {
if scheduled {
c.logger.Infof("switchover from %q to %q is scheduled at %s", curMaster.Name, candidate, scheduled_at)
return nil
}
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
_, err = c.waitForPodLabel(ch, stopCh, nil) _, err = c.waitForPodLabel(ch, stopCh, nil)
@ -1750,6 +1817,9 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
err = fmt.Errorf("could not get master pod label: %v", err) err = fmt.Errorf("could not get master pod label: %v", err)
} }
} else { } else {
if scheduled {
return fmt.Errorf("could not schedule switchover: %v", err)
}
err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err) err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
} }

View File

@ -1680,12 +1680,20 @@ func TestCompareLogicalBackupJob(t *testing.T) {
} }
} }
match, reason := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob) cmp := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob)
if match != tt.match { if cmp.match != tt.match {
t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), match, currentCronJob, desiredCronJob) t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), cmp.match, currentCronJob, desiredCronJob)
} else { } else if !cmp.match {
if !strings.HasPrefix(reason, tt.reason) { found := false
t.Errorf("%s - expected reason prefix %s, found %s", t.Name(), tt.reason, reason) for _, reason := range cmp.reasons {
if strings.HasPrefix(reason, tt.reason) {
found = true
break
}
found = false
}
if !found {
t.Errorf("%s - expected reason prefix %s, not found in %#v", t.Name(), tt.reason, cmp.reasons)
} }
} }
}) })
@ -2057,3 +2065,91 @@ func TestCompareVolumeMounts(t *testing.T) {
}) })
} }
} }
func TestGetSwitchoverSchedule(t *testing.T) {
now := time.Now()
futureTimeStart := now.Add(1 * time.Hour)
futureWindowTimeStart := futureTimeStart.Format("15:04")
futureWindowTimeEnd := now.Add(2 * time.Hour).Format("15:04")
pastTimeStart := now.Add(-2 * time.Hour)
pastWindowTimeStart := pastTimeStart.Format("15:04")
pastWindowTimeEnd := now.Add(-1 * time.Hour).Format("15:04")
tests := []struct {
name string
windows []acidv1.MaintenanceWindow
expected string
}{
{
name: "everyday maintenance windows is later today",
windows: []acidv1.MaintenanceWindow{
{
Everyday: true,
StartTime: mustParseTime(futureWindowTimeStart),
EndTime: mustParseTime(futureWindowTimeEnd),
},
},
expected: futureTimeStart.Format("2006-01-02T15:04+00"),
},
{
name: "everyday maintenance window is tomorrow",
windows: []acidv1.MaintenanceWindow{
{
Everyday: true,
StartTime: mustParseTime(pastWindowTimeStart),
EndTime: mustParseTime(pastWindowTimeEnd),
},
},
expected: pastTimeStart.AddDate(0, 0, 1).Format("2006-01-02T15:04+00"),
},
{
name: "weekday maintenance windows is later today",
windows: []acidv1.MaintenanceWindow{
{
Weekday: now.Weekday(),
StartTime: mustParseTime(futureWindowTimeStart),
EndTime: mustParseTime(futureWindowTimeEnd),
},
},
expected: futureTimeStart.Format("2006-01-02T15:04+00"),
},
{
name: "weekday maintenance windows is passed for today",
windows: []acidv1.MaintenanceWindow{
{
Weekday: now.Weekday(),
StartTime: mustParseTime(pastWindowTimeStart),
EndTime: mustParseTime(pastWindowTimeEnd),
},
},
expected: pastTimeStart.AddDate(0, 0, 7).Format("2006-01-02T15:04+00"),
},
{
name: "choose the earliest window",
windows: []acidv1.MaintenanceWindow{
{
Weekday: now.AddDate(0, 0, 2).Weekday(),
StartTime: mustParseTime(futureWindowTimeStart),
EndTime: mustParseTime(futureWindowTimeEnd),
},
{
Everyday: true,
StartTime: mustParseTime(pastWindowTimeStart),
EndTime: mustParseTime(pastWindowTimeEnd),
},
},
expected: pastTimeStart.AddDate(0, 0, 1).Format("2006-01-02T15:04+00"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cluster.Spec.MaintenanceWindows = tt.windows
schedule := cluster.GetSwitchoverSchedule()
if schedule != tt.expected {
t.Errorf("Expected GetSwitchoverSchedule to return %s, returned: %s", tt.expected, schedule)
}
})
}
}

View File

@ -2,6 +2,7 @@ package cluster
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"reflect" "reflect"
"strings" "strings"
@ -977,6 +978,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
err error err error
) )
updatedPodAnnotations := map[string]*string{}
syncReason := make([]string, 0) syncReason := make([]string, 0)
deployment, err = c.KubeClient. deployment, err = c.KubeClient.
Deployments(c.Namespace). Deployments(c.Namespace).
@ -1038,9 +1040,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
} }
newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec)) newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec))
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed { deletedPodAnnotations := []string{}
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations, &deletedPodAnnotations); changed {
specSync = true specSync = true
syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...) syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...)
for _, anno := range deletedPodAnnotations {
updatedPodAnnotations[anno] = nil
}
templateMetadataReq := map[string]map[string]map[string]map[string]map[string]*string{
"spec": {"template": {"metadata": {"annotations": updatedPodAnnotations}}}}
patch, err := json.Marshal(templateMetadataReq)
if err != nil {
return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pod template: %v", role, err)
}
deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(),
deployment.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "")
if err != nil {
c.logger.Errorf("failed to patch %s connection pooler's pod template: %v", role, err)
return nil, err
}
deployment.Spec.Template.Annotations = newPodAnnotations deployment.Spec.Template.Annotations = newPodAnnotations
} }
@ -1064,7 +1084,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
} }
newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations
if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations); changed { if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations, nil); changed {
deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations) deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1098,14 +1118,20 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil { if err != nil {
return nil, fmt.Errorf("could not delete pooler pod: %v", err) return nil, fmt.Errorf("could not delete pooler pod: %v", err)
} }
} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed { } else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations) metadataReq := map[string]map[string]map[string]*string{"metadata": {}}
if err != nil {
return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err) for anno, val := range deployment.Spec.Template.Annotations {
updatedPodAnnotations[anno] = &val
} }
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) metadataReq["metadata"]["annotations"] = updatedPodAnnotations
patch, err := json.Marshal(metadataReq)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err) return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pods: %v", role, err)
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("could not patch annotations for %s connection pooler's pod %q: %v", role, pod.Name, err)
} }
} }
} }

View File

@ -109,10 +109,15 @@ func (c *Cluster) servicePort(role PostgresRole) int32 {
return pgPort return pgPort
} }
func (c *Cluster) podDisruptionBudgetName() string { func (c *Cluster) PrimaryPodDisruptionBudgetName() string {
return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) return c.OpConfig.PDBNameFormat.Format("cluster", c.Name)
} }
func (c *Cluster) criticalOpPodDisruptionBudgetName() string {
pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-op-pdb")
return pdbTemplate.Format("cluster", c.Name)
}
func makeDefaultResources(config *config.Config) acidv1.Resources { func makeDefaultResources(config *config.Config) acidv1.Resources {
defaultRequests := acidv1.ResourceDescription{ defaultRequests := acidv1.ResourceDescription{
@ -1005,6 +1010,9 @@ func (c *Cluster) generateSpiloPodEnvVars(
if c.patroniUsesKubernetes() { if c.patroniUsesKubernetes() {
envVars = append(envVars, v1.EnvVar{Name: "DCS_ENABLE_KUBERNETES_API", Value: "true"}) envVars = append(envVars, v1.EnvVar{Name: "DCS_ENABLE_KUBERNETES_API", Value: "true"})
if c.OpConfig.EnablePodDisruptionBudget != nil && *c.OpConfig.EnablePodDisruptionBudget {
envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_BOOTSTRAP_LABELS", Value: "{\"critical-operation\":\"true\"}"})
}
} else { } else {
envVars = append(envVars, v1.EnvVar{Name: "ETCD_HOST", Value: c.OpConfig.EtcdHost}) envVars = append(envVars, v1.EnvVar{Name: "ETCD_HOST", Value: c.OpConfig.EtcdHost})
} }
@ -2207,7 +2215,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript
return result return result
} }
func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { func (c *Cluster) generatePrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget {
minAvailable := intstr.FromInt(1) minAvailable := intstr.FromInt(1)
pdbEnabled := c.OpConfig.EnablePodDisruptionBudget pdbEnabled := c.OpConfig.EnablePodDisruptionBudget
pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector
@ -2225,7 +2233,36 @@ func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{ return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.podDisruptionBudgetName(), Name: c.PrimaryPodDisruptionBudgetName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: c.annotationsSet(nil),
OwnerReferences: c.ownerReferences(),
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
},
}
}
func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget {
minAvailable := intstr.FromInt32(c.Spec.NumberOfInstances)
pdbEnabled := c.OpConfig.EnablePodDisruptionBudget
// if PodDisruptionBudget is disabled or if there are no DB pods, set the budget to 0.
if (pdbEnabled != nil && !(*pdbEnabled)) || c.Spec.NumberOfInstances <= 0 {
minAvailable = intstr.FromInt(0)
}
labels := c.labelsSet(false)
labels["critical-operation"] = "true"
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: c.criticalOpPodDisruptionBudgetName(),
Namespace: c.Namespace, Namespace: c.Namespace,
Labels: c.labelsSet(true), Labels: c.labelsSet(true),
Annotations: c.annotationsSet(nil), Annotations: c.annotationsSet(nil),

View File

@ -2349,22 +2349,34 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
} }
} }
testLabelsAndSelectors := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { testLabelsAndSelectors := func(isPrimary bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector return func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector
return fmt.Errorf("Object Namespace incorrect.") if podDisruptionBudget.ObjectMeta.Namespace != "myapp" {
} return fmt.Errorf("Object Namespace incorrect.")
if !reflect.DeepEqual(podDisruptionBudget.Labels, map[string]string{"team": "myapp", "cluster-name": "myapp-database"}) { }
return fmt.Errorf("Labels incorrect.") expectedLabels := map[string]string{"team": "myapp", "cluster-name": "myapp-database"}
} if !reflect.DeepEqual(podDisruptionBudget.Labels, expectedLabels) {
if !masterLabelSelectorDisabled && return fmt.Errorf("Labels incorrect, got %#v, expected %#v", podDisruptionBudget.Labels, expectedLabels)
!reflect.DeepEqual(podDisruptionBudget.Spec.Selector, &metav1.LabelSelector{ }
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}) { if !masterLabelSelectorDisabled {
if isPrimary {
expectedLabels := &metav1.LabelSelector{
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}
if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) {
return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels)
}
} else {
expectedLabels := &metav1.LabelSelector{
MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operation": "true"}}
if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) {
return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels)
}
}
}
return fmt.Errorf("MatchLabels incorrect.") return nil
} }
return nil
} }
testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
@ -2400,7 +2412,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1), hasMinAvailable(1),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2417,7 +2429,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(0), hasMinAvailable(0),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2434,7 +2446,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(0), hasMinAvailable(0),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2451,7 +2463,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-databass-budget"), hasName("postgres-myapp-database-databass-budget"),
hasMinAvailable(1), hasMinAvailable(1),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2468,7 +2480,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1), hasMinAvailable(1),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2485,13 +2497,99 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1), hasMinAvailable(1),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
} }
for _, tt := range tests { for _, tt := range tests {
result := tt.spec.generatePodDisruptionBudget() result := tt.spec.generatePrimaryPodDisruptionBudget()
for _, check := range tt.check {
err := check(tt.spec, result)
if err != nil {
t.Errorf("%s [%s]: PodDisruptionBudget spec is incorrect, %+v",
testName, tt.scenario, err)
}
}
}
testCriticalOp := []struct {
scenario string
spec *Cluster
check []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error
}{
{
scenario: "With multiple instances",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(3),
testLabelsAndSelectors(false),
},
},
{
scenario: "With zero instances",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors(false),
},
},
{
scenario: "With PodDisruptionBudget disabled",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors(false),
},
},
{
scenario: "With OwnerReference enabled",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(3),
testLabelsAndSelectors(false),
},
},
}
for _, tt := range testCriticalOp {
result := tt.spec.generateCriticalOpPodDisruptionBudget()
for _, check := range tt.check { for _, check := range tt.check {
err := check(tt.spec, result) err := check(tt.spec, result)
if err != nil { if err != nil {

View File

@ -106,6 +106,22 @@ func (c *Cluster) removeFailuresAnnotation() error {
return nil return nil
} }
func (c *Cluster) criticalOperationLabel(pods []v1.Pod, value *string) error {
metadataReq := map[string]map[string]map[string]*string{"metadata": {"labels": {"critical-operation": value}}}
patchReq, err := json.Marshal(metadataReq)
if err != nil {
return fmt.Errorf("could not marshal ObjectMeta: %v", err)
}
for _, pod := range pods {
_, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchReq, metav1.PatchOptions{})
if err != nil {
return err
}
}
return nil
}
/* /*
Execute upgrade when mode is set to manual or full or when the owning team is allowed for upgrade (and mode is "off"). Execute upgrade when mode is set to manual or full or when the owning team is allowed for upgrade (and mode is "off").
@ -129,17 +145,13 @@ func (c *Cluster) majorVersionUpgrade() error {
return nil return nil
} }
if !isInMainternanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping major version upgrade, not in maintenance window")
return nil
}
pods, err := c.listPods() pods, err := c.listPods()
if err != nil { if err != nil {
return err return err
} }
allRunning := true allRunning := true
isStandbyCluster := false
var masterPod *v1.Pod var masterPod *v1.Pod
@ -147,8 +159,9 @@ func (c *Cluster) majorVersionUpgrade() error {
ps, _ := c.patroni.GetMemberData(&pod) ps, _ := c.patroni.GetMemberData(&pod)
if ps.Role == "standby_leader" { if ps.Role == "standby_leader" {
c.logger.Errorf("skipping major version upgrade for %s/%s standby cluster. Re-deploy standby cluster with the required Postgres version specified", c.Namespace, c.Name) isStandbyCluster = true
return nil c.currentMajorVersion = ps.ServerVersion
break
} }
if ps.State != "running" { if ps.State != "running" {
@ -175,6 +188,9 @@ func (c *Cluster) majorVersionUpgrade() error {
} }
c.logger.Infof("recheck cluster version is already up to date. current: %d, min desired: %d", c.currentMajorVersion, desiredVersion) c.logger.Infof("recheck cluster version is already up to date. current: %d, min desired: %d", c.currentMajorVersion, desiredVersion)
return nil return nil
} else if isStandbyCluster {
c.logger.Warnf("skipping major version upgrade for %s/%s standby cluster. Re-deploy standby cluster with the required Postgres version specified", c.Namespace, c.Name)
return nil
} }
if _, exists := c.ObjectMeta.Annotations[majorVersionUpgradeFailureAnnotation]; exists { if _, exists := c.ObjectMeta.Annotations[majorVersionUpgradeFailureAnnotation]; exists {
@ -182,6 +198,11 @@ func (c *Cluster) majorVersionUpgrade() error {
return nil return nil
} }
if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping major version upgrade, not in maintenance window")
return nil
}
members, err := c.patroni.GetClusterMembers(masterPod) members, err := c.patroni.GetClusterMembers(masterPod)
if err != nil { if err != nil {
c.logger.Error("could not get cluster members data from Patroni API, skipping major version upgrade") c.logger.Error("could not get cluster members data from Patroni API, skipping major version upgrade")
@ -219,6 +240,17 @@ func (c *Cluster) majorVersionUpgrade() error {
if allRunning && masterPod != nil { if allRunning && masterPod != nil {
c.logger.Infof("healthy cluster ready to upgrade, current: %d desired: %d", c.currentMajorVersion, desiredVersion) c.logger.Infof("healthy cluster ready to upgrade, current: %d desired: %d", c.currentMajorVersion, desiredVersion)
if c.currentMajorVersion < desiredVersion { if c.currentMajorVersion < desiredVersion {
defer func() error {
if err = c.criticalOperationLabel(pods, nil); err != nil {
return fmt.Errorf("failed to remove critical-operation label: %s", err)
}
return nil
}()
val := "true"
if err = c.criticalOperationLabel(pods, &val); err != nil {
return fmt.Errorf("failed to assign critical-operation label: %s", err)
}
podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name} podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name}
c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods) c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)

View File

@ -280,11 +280,16 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
return fmt.Errorf("could not move pod: %v", err) return fmt.Errorf("could not move pod: %v", err)
} }
scheduleSwitchover := false
if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("postponing switchover, not in maintenance window")
scheduleSwitchover = true
}
err = retryutil.Retry(1*time.Minute, 5*time.Minute, err = retryutil.Retry(1*time.Minute, 5*time.Minute,
func() (bool, error) { func() (bool, error) {
err := c.Switchover(oldMaster, masterCandidateName) err := c.Switchover(oldMaster, masterCandidateName, scheduleSwitchover)
if err != nil { if err != nil {
c.logger.Errorf("could not failover to pod %q: %v", masterCandidateName, err) c.logger.Errorf("could not switchover to pod %q: %v", masterCandidateName, err)
return false, nil return false, nil
} }
return true, nil return true, nil
@ -445,7 +450,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
// do not recreate master now so it will keep the update flag and switchover will be retried on next sync // do not recreate master now so it will keep the update flag and switchover will be retried on next sync
return fmt.Errorf("skipping switchover: %v", err) return fmt.Errorf("skipping switchover: %v", err)
} }
if err := c.Switchover(masterPod, masterCandidate); err != nil { if err := c.Switchover(masterPod, masterCandidate, false); err != nil {
return fmt.Errorf("could not perform switch over: %v", err) return fmt.Errorf("could not perform switch over: %v", err)
} }
} else if newMasterPod == nil && len(replicas) == 0 { } else if newMasterPod == nil && len(replicas) == 0 {

View File

@ -23,8 +23,13 @@ const (
) )
func (c *Cluster) listResources() error { func (c *Cluster) listResources() error {
if c.PodDisruptionBudget != nil { if c.PrimaryPodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) c.logger.Infof("found primary pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta), c.PrimaryPodDisruptionBudget.UID)
}
if c.CriticalOpPodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget for critical operations: %q (uid: %q)", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta), c.CriticalOpPodDisruptionBudget.UID)
} }
if c.Statefulset != nil { if c.Statefulset != nil {
@ -162,8 +167,8 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
return fmt.Errorf("pod %q does not belong to cluster", podName) return fmt.Errorf("pod %q does not belong to cluster", podName)
} }
if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name); err != nil { if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name, ""); err != nil {
return fmt.Errorf("could not failover: %v", err) return fmt.Errorf("could not switchover: %v", err)
} }
return nil return nil
@ -329,7 +334,7 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe
} }
} }
if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed { if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(newService.Annotations) patchData, err := metaAnnotationsPatch(newService.Annotations)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err) return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err)
@ -417,59 +422,128 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset
return result return result
} }
func (c *Cluster) createPodDisruptionBudget() (*policyv1.PodDisruptionBudget, error) { func (c *Cluster) createPrimaryPodDisruptionBudget() error {
podDisruptionBudgetSpec := c.generatePodDisruptionBudget() c.logger.Debug("creating primary pod disruption budget")
if c.PrimaryPodDisruptionBudget != nil {
c.logger.Warning("primary pod disruption budget already exists in the cluster")
return nil
}
podDisruptionBudgetSpec := c.generatePrimaryPodDisruptionBudget()
podDisruptionBudget, err := c.KubeClient. podDisruptionBudget, err := c.KubeClient.
PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
if err != nil { if err != nil {
return nil, err return err
} }
c.PodDisruptionBudget = podDisruptionBudget c.logger.Infof("primary pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta))
c.PrimaryPodDisruptionBudget = podDisruptionBudget
return podDisruptionBudget, nil return nil
} }
func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { func (c *Cluster) createCriticalOpPodDisruptionBudget() error {
if c.PodDisruptionBudget == nil { c.logger.Debug("creating pod disruption budget for critical operations")
return fmt.Errorf("there is no pod disruption budget in the cluster") if c.CriticalOpPodDisruptionBudget != nil {
c.logger.Warning("pod disruption budget for critical operations already exists in the cluster")
return nil
} }
if err := c.deletePodDisruptionBudget(); err != nil { podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget()
return fmt.Errorf("could not delete pod disruption budget: %v", err) podDisruptionBudget, err := c.KubeClient.
PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
if err != nil {
return err
}
c.logger.Infof("pod disruption budget for critical operations %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta))
c.CriticalOpPodDisruptionBudget = podDisruptionBudget
return nil
}
func (c *Cluster) createPodDisruptionBudgets() error {
errors := make([]string, 0)
err := c.createPrimaryPodDisruptionBudget()
if err != nil {
errors = append(errors, fmt.Sprintf("could not create primary pod disruption budget: %v", err))
}
err = c.createCriticalOpPodDisruptionBudget()
if err != nil {
errors = append(errors, fmt.Sprintf("could not create pod disruption budget for critical operations: %v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}
func (c *Cluster) updatePrimaryPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
c.logger.Debug("updating primary pod disruption budget")
if c.PrimaryPodDisruptionBudget == nil {
return fmt.Errorf("there is no primary pod disruption budget in the cluster")
}
if err := c.deletePrimaryPodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
} }
newPdb, err := c.KubeClient. newPdb, err := c.KubeClient.
PodDisruptionBudgets(pdb.Namespace). PodDisruptionBudgets(pdb.Namespace).
Create(context.TODO(), pdb, metav1.CreateOptions{}) Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil { if err != nil {
return fmt.Errorf("could not create pod disruption budget: %v", err) return fmt.Errorf("could not create primary pod disruption budget: %v", err)
} }
c.PodDisruptionBudget = newPdb c.PrimaryPodDisruptionBudget = newPdb
return nil return nil
} }
func (c *Cluster) deletePodDisruptionBudget() error { func (c *Cluster) updateCriticalOpPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
c.logger.Debug("deleting pod disruption budget") c.logger.Debug("updating pod disruption budget for critical operations")
if c.PodDisruptionBudget == nil { if c.CriticalOpPodDisruptionBudget == nil {
c.logger.Debug("there is no pod disruption budget in the cluster") return fmt.Errorf("there is no pod disruption budget for critical operations in the cluster")
}
if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
newPdb, err := c.KubeClient.
PodDisruptionBudgets(pdb.Namespace).
Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err)
}
c.CriticalOpPodDisruptionBudget = newPdb
return nil
}
func (c *Cluster) deletePrimaryPodDisruptionBudget() error {
c.logger.Debug("deleting primary pod disruption budget")
if c.PrimaryPodDisruptionBudget == nil {
c.logger.Debug("there is no primary pod disruption budget in the cluster")
return nil return nil
} }
pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta) pdbName := util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)
err := c.KubeClient. err := c.KubeClient.
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). PodDisruptionBudgets(c.PrimaryPodDisruptionBudget.Namespace).
Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions) Delete(context.TODO(), c.PrimaryPodDisruptionBudget.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) { if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta))
} else if err != nil { } else if err != nil {
return fmt.Errorf("could not delete PodDisruptionBudget: %v", err) return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
} }
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta))
c.PodDisruptionBudget = nil c.PrimaryPodDisruptionBudget = nil
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) { func() (bool, error) {
@ -483,12 +557,67 @@ func (c *Cluster) deletePodDisruptionBudget() error {
return false, err2 return false, err2
}) })
if err != nil { if err != nil {
return fmt.Errorf("could not delete pod disruption budget: %v", err) return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
} }
return nil return nil
} }
func (c *Cluster) deleteCriticalOpPodDisruptionBudget() error {
c.logger.Debug("deleting pod disruption budget for critical operations")
if c.CriticalOpPodDisruptionBudget == nil {
c.logger.Debug("there is no pod disruption budget for critical operations in the cluster")
return nil
}
pdbName := util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)
err := c.KubeClient.
PodDisruptionBudgets(c.CriticalOpPodDisruptionBudget.Namespace).
Delete(context.TODO(), c.CriticalOpPodDisruptionBudget.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta))
} else if err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta))
c.CriticalOpPodDisruptionBudget = nil
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) {
_, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{})
if err2 == nil {
return false, nil
}
if k8sutil.ResourceNotFound(err2) {
return true, nil
}
return false, err2
})
if err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
return nil
}
func (c *Cluster) deletePodDisruptionBudgets() error {
errors := make([]string, 0)
if err := c.deletePrimaryPodDisruptionBudget(); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}
func (c *Cluster) deleteEndpoint(role PostgresRole) error { func (c *Cluster) deleteEndpoint(role PostgresRole) error {
c.setProcessName("deleting endpoint") c.setProcessName("deleting endpoint")
c.logger.Debugf("deleting %s endpoint", role) c.logger.Debugf("deleting %s endpoint", role)
@ -705,7 +834,12 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet {
return c.Statefulset return c.Statefulset
} }
// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget // GetPrimaryPodDisruptionBudget returns cluster's primary kubernetes PodDisruptionBudget
func (c *Cluster) GetPodDisruptionBudget() *policyv1.PodDisruptionBudget { func (c *Cluster) GetPrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.PodDisruptionBudget return c.PrimaryPodDisruptionBudget
}
// GetCriticalOpPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations
func (c *Cluster) GetCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.CriticalOpPodDisruptionBudget
} }

View File

@ -114,10 +114,10 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
} }
for slotName, slotAndPublication := range databaseSlotsList { for slotName, slotAndPublication := range databaseSlotsList {
tables := slotAndPublication.Publication newTables := slotAndPublication.Publication
tableNames := make([]string, len(tables)) tableNames := make([]string, len(newTables))
i := 0 i := 0
for t := range tables { for t := range newTables {
tableName, schemaName := getTableSchema(t) tableName, schemaName := getTableSchema(t)
tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
i++ i++
@ -126,6 +126,12 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
tableList := strings.Join(tableNames, ", ") tableList := strings.Join(tableNames, ", ")
currentTables, exists := currentPublications[slotName] currentTables, exists := currentPublications[slotName]
// if newTables is empty it means that it's definition was removed from streams section
// but when slot is defined in manifest we should sync publications, too
// by reusing current tables we make sure it is not
if len(newTables) == 0 {
tableList = currentTables
}
if !exists { if !exists {
createPublications[slotName] = tableList createPublications[slotName] = tableList
} else if currentTables != tableList { } else if currentTables != tableList {
@ -350,16 +356,8 @@ func (c *Cluster) syncStreams() error {
return nil return nil
} }
databaseSlots := make(map[string]map[string]zalandov1.Slot) // create map with every database and empty slot defintion
slotsToSync := make(map[string]map[string]string) // we need it to detect removal of streams from databases
requiredPatroniConfig := c.Spec.Patroni
if len(requiredPatroniConfig.Slots) > 0 {
for slotName, slotConfig := range requiredPatroniConfig.Slots {
slotsToSync[slotName] = slotConfig
}
}
if err := c.initDbConn(); err != nil { if err := c.initDbConn(); err != nil {
return fmt.Errorf("could not init database connection") return fmt.Errorf("could not init database connection")
} }
@ -372,13 +370,28 @@ func (c *Cluster) syncStreams() error {
if err != nil { if err != nil {
return fmt.Errorf("could not get list of databases: %v", err) return fmt.Errorf("could not get list of databases: %v", err)
} }
// get database name with empty list of slot, except template0 and template1 databaseSlots := make(map[string]map[string]zalandov1.Slot)
for dbName := range listDatabases { for dbName := range listDatabases {
if dbName != "template0" && dbName != "template1" { if dbName != "template0" && dbName != "template1" {
databaseSlots[dbName] = map[string]zalandov1.Slot{} databaseSlots[dbName] = map[string]zalandov1.Slot{}
} }
} }
// need to take explicitly defined slots into account whey syncing Patroni config
slotsToSync := make(map[string]map[string]string)
requiredPatroniConfig := c.Spec.Patroni
if len(requiredPatroniConfig.Slots) > 0 {
for slotName, slotConfig := range requiredPatroniConfig.Slots {
slotsToSync[slotName] = slotConfig
if _, exists := databaseSlots[slotConfig["database"]]; exists {
databaseSlots[slotConfig["database"]][slotName] = zalandov1.Slot{
Slot: slotConfig,
Publication: make(map[string]acidv1.StreamTable),
}
}
}
}
// get list of required slots and publications, group by database // get list of required slots and publications, group by database
for _, stream := range c.Spec.Streams { for _, stream := range c.Spec.Streams {
if _, exists := databaseSlots[stream.Database]; !exists { if _, exists := databaseSlots[stream.Database]; !exists {
@ -391,13 +404,13 @@ func (c *Cluster) syncStreams() error {
"type": "logical", "type": "logical",
} }
slotName := getSlotName(stream.Database, stream.ApplicationId) slotName := getSlotName(stream.Database, stream.ApplicationId)
if _, exists := databaseSlots[stream.Database][slotName]; !exists { slotAndPublication, exists := databaseSlots[stream.Database][slotName]
if !exists {
databaseSlots[stream.Database][slotName] = zalandov1.Slot{ databaseSlots[stream.Database][slotName] = zalandov1.Slot{
Slot: slot, Slot: slot,
Publication: stream.Tables, Publication: stream.Tables,
} }
} else { } else {
slotAndPublication := databaseSlots[stream.Database][slotName]
streamTables := slotAndPublication.Publication streamTables := slotAndPublication.Publication
for tableName, table := range stream.Tables { for tableName, table := range stream.Tables {
if _, exists := streamTables[tableName]; !exists { if _, exists := streamTables[tableName]; !exists {
@ -492,16 +505,17 @@ func (c *Cluster) syncStream(appId string) error {
continue continue
} }
streamExists = true streamExists = true
c.Streams[appId] = &stream
desiredStreams := c.generateFabricEventStream(appId) desiredStreams := c.generateFabricEventStream(appId)
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
c.setProcessName("updating event streams with applicationId %s", appId) c.setProcessName("updating event streams with applicationId %s", appId)
stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) updatedStream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{})
if err != nil { if err != nil {
return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
} }
c.Streams[appId] = stream c.Streams[appId] = updatedStream
} }
if match, reason := c.compareStreams(&stream, desiredStreams); !match { if match, reason := c.compareStreams(&stream, desiredStreams); !match {
c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason)
@ -545,7 +559,7 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab
for newKey, newValue := range newEventStreams.Annotations { for newKey, newValue := range newEventStreams.Annotations {
desiredAnnotations[newKey] = newValue desiredAnnotations[newKey] = newValue
} }
if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed { if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations, nil); changed {
match = false match = false
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason)) reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
} }

View File

@ -97,6 +97,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
} }
} }
if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
// do not apply any major version related changes yet
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}
if err = c.syncStatefulSet(); err != nil { if err = c.syncStatefulSet(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
err = fmt.Errorf("could not sync statefulsets: %v", err) err = fmt.Errorf("could not sync statefulsets: %v", err)
@ -112,8 +117,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
} }
c.logger.Debug("syncing pod disruption budgets") c.logger.Debug("syncing pod disruption budgets")
if err = c.syncPodDisruptionBudget(false); err != nil { if err = c.syncPodDisruptionBudgets(false); err != nil {
err = fmt.Errorf("could not sync pod disruption budget: %v", err) err = fmt.Errorf("could not sync pod disruption budgets: %v", err)
return err return err
} }
@ -148,7 +153,10 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
return fmt.Errorf("could not sync connection pooler: %v", err) return fmt.Errorf("could not sync connection pooler: %v", err)
} }
if len(c.Spec.Streams) > 0 { // sync if manifest stream count is different from stream CR count
// it can be that they are always different due to grouping of manifest streams
// but we would catch missed removals on update
if len(c.Spec.Streams) != len(c.Streams) {
c.logger.Debug("syncing streams") c.logger.Debug("syncing streams")
if err = c.syncStreams(); err != nil { if err = c.syncStreams(); err != nil {
err = fmt.Errorf("could not sync streams: %v", err) err = fmt.Errorf("could not sync streams: %v", err)
@ -230,7 +238,7 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error {
maps.Copy(annotations, cm.Annotations) maps.Copy(annotations, cm.Annotations)
// Patroni can add extra annotations so incl. current annotations in desired annotations // Patroni can add extra annotations so incl. current annotations in desired annotations
desiredAnnotations := c.annotationsSet(cm.Annotations) desiredAnnotations := c.annotationsSet(cm.Annotations)
if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredAnnotations) patchData, err := metaAnnotationsPatch(desiredAnnotations)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for %s config map: %v", configMapName, err) return fmt.Errorf("could not form patch for %s config map: %v", configMapName, err)
@ -275,7 +283,7 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error {
maps.Copy(annotations, ep.Annotations) maps.Copy(annotations, ep.Annotations)
// Patroni can add extra annotations so incl. current annotations in desired annotations // Patroni can add extra annotations so incl. current annotations in desired annotations
desiredAnnotations := c.annotationsSet(ep.Annotations) desiredAnnotations := c.annotationsSet(ep.Annotations)
if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredAnnotations) patchData, err := metaAnnotationsPatch(desiredAnnotations)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for %s endpoint: %v", endpointName, err) return fmt.Errorf("could not form patch for %s endpoint: %v", endpointName, err)
@ -320,7 +328,7 @@ func (c *Cluster) syncPatroniService() error {
maps.Copy(annotations, svc.Annotations) maps.Copy(annotations, svc.Annotations)
// Patroni can add extra annotations so incl. current annotations in desired annotations // Patroni can add extra annotations so incl. current annotations in desired annotations
desiredAnnotations := c.annotationsSet(svc.Annotations) desiredAnnotations := c.annotationsSet(svc.Annotations)
if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredAnnotations) patchData, err := metaAnnotationsPatch(desiredAnnotations)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for %s service: %v", serviceName, err) return fmt.Errorf("could not form patch for %s service: %v", serviceName, err)
@ -412,7 +420,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
return fmt.Errorf("could not update %s endpoint: %v", role, err) return fmt.Errorf("could not update %s endpoint: %v", role, err)
} }
} else { } else {
if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed { if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredEp.Annotations) patchData, err := metaAnnotationsPatch(desiredEp.Annotations)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for %s endpoint: %v", role, err) return fmt.Errorf("could not form patch for %s endpoint: %v", role, err)
@ -447,22 +455,22 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
return nil return nil
} }
func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { func (c *Cluster) syncPrimaryPodDisruptionBudget(isUpdate bool) error {
var ( var (
pdb *policyv1.PodDisruptionBudget pdb *policyv1.PodDisruptionBudget
err error err error
) )
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.PodDisruptionBudget = pdb c.PrimaryPodDisruptionBudget = pdb
newPDB := c.generatePodDisruptionBudget() newPDB := c.generatePrimaryPodDisruptionBudget()
match, reason := c.comparePodDisruptionBudget(pdb, newPDB) match, reason := c.comparePodDisruptionBudget(pdb, newPDB)
if !match { if !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason) c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err = c.updatePodDisruptionBudget(newPDB); err != nil { if err = c.updatePrimaryPodDisruptionBudget(newPDB); err != nil {
return err return err
} }
} else { } else {
c.PodDisruptionBudget = pdb c.PrimaryPodDisruptionBudget = pdb
} }
return nil return nil
@ -471,21 +479,74 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
return fmt.Errorf("could not get pod disruption budget: %v", err) return fmt.Errorf("could not get pod disruption budget: %v", err)
} }
// no existing pod disruption budget, create new one // no existing pod disruption budget, create new one
c.logger.Infof("could not find the cluster's pod disruption budget") c.logger.Infof("could not find the primary pod disruption budget")
if pdb, err = c.createPodDisruptionBudget(); err != nil { if err = c.createPrimaryPodDisruptionBudget(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create pod disruption budget: %v", err) return fmt.Errorf("could not create primary pod disruption budget: %v", err)
} }
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil { if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
} }
} }
c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) return nil
c.PodDisruptionBudget = pdb }
func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error {
var (
pdb *policyv1.PodDisruptionBudget
err error
)
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.CriticalOpPodDisruptionBudget = pdb
newPDB := c.generateCriticalOpPodDisruptionBudget()
match, reason := c.comparePodDisruptionBudget(pdb, newPDB)
if !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err = c.updateCriticalOpPodDisruptionBudget(newPDB); err != nil {
return err
}
} else {
c.CriticalOpPodDisruptionBudget = pdb
}
return nil
}
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get pod disruption budget: %v", err)
}
// no existing pod disruption budget, create new one
c.logger.Infof("could not find pod disruption budget for critical operations")
if err = c.createCriticalOpPodDisruptionBudget(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err)
}
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
}
}
return nil
}
func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error {
errors := make([]string, 0)
if err := c.syncPrimaryPodDisruptionBudget(isUpdate); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if err := c.syncCriticalOpPodDisruptionBudget(isUpdate); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil return nil
} }
@ -497,6 +558,7 @@ func (c *Cluster) syncStatefulSet() error {
) )
podsToRecreate := make([]v1.Pod, 0) podsToRecreate := make([]v1.Pod, 0)
isSafeToRecreatePods := true isSafeToRecreatePods := true
postponeReasons := make([]string, 0)
switchoverCandidates := make([]spec.NamespacedName, 0) switchoverCandidates := make([]spec.NamespacedName, 0)
pods, err := c.listPods() pods, err := c.listPods()
@ -561,13 +623,22 @@ func (c *Cluster) syncStatefulSet() error {
cmp := c.compareStatefulSetWith(desiredSts) cmp := c.compareStatefulSetWith(desiredSts)
if !cmp.rollingUpdate { if !cmp.rollingUpdate {
updatedPodAnnotations := map[string]*string{}
for _, anno := range cmp.deletedPodAnnotations {
updatedPodAnnotations[anno] = nil
}
for anno, val := range desiredSts.Spec.Template.Annotations {
updatedPodAnnotations[anno] = &val
}
metadataReq := map[string]map[string]map[string]*string{"metadata": {"annotations": updatedPodAnnotations}}
patch, err := json.Marshal(metadataReq)
if err != nil {
return fmt.Errorf("could not form patch for pod annotations: %v", err)
}
for _, pod := range pods { for _, pod := range pods {
if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations); changed { if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredSts.Spec.Template.Annotations) _, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not form patch for pod %q annotations: %v", pod.Name, err)
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil { if err != nil {
return fmt.Errorf("could not patch annotations for pod %q: %v", pod.Name, err) return fmt.Errorf("could not patch annotations for pod %q: %v", pod.Name, err)
} }
@ -646,12 +717,14 @@ func (c *Cluster) syncStatefulSet() error {
c.logger.Debug("syncing Patroni config") c.logger.Debug("syncing Patroni config")
if configPatched, restartPrimaryFirst, restartWait, err = c.syncPatroniConfig(pods, c.Spec.Patroni, requiredPgParameters); err != nil { if configPatched, restartPrimaryFirst, restartWait, err = c.syncPatroniConfig(pods, c.Spec.Patroni, requiredPgParameters); err != nil {
c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err) c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
postponeReasons = append(postponeReasons, "errors during Patroni config sync")
isSafeToRecreatePods = false isSafeToRecreatePods = false
} }
// restart Postgres where it is still pending // restart Postgres where it is still pending
if err = c.restartInstances(pods, restartWait, restartPrimaryFirst); err != nil { if err = c.restartInstances(pods, restartWait, restartPrimaryFirst); err != nil {
c.logger.Errorf("errors while restarting Postgres in pods via Patroni API: %v", err) c.logger.Errorf("errors while restarting Postgres in pods via Patroni API: %v", err)
postponeReasons = append(postponeReasons, "errors while restarting Postgres via Patroni API")
isSafeToRecreatePods = false isSafeToRecreatePods = false
} }
@ -666,7 +739,7 @@ func (c *Cluster) syncStatefulSet() error {
} }
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
} else { } else {
c.logger.Warningf("postpone pod recreation until next sync because of errors during config sync") c.logger.Warningf("postpone pod recreation until next sync - reason: %s", strings.Join(postponeReasons, `', '`))
} }
} }
@ -1142,7 +1215,7 @@ func (c *Cluster) updateSecret(
c.Secrets[secret.UID] = secret c.Secrets[secret.UID] = secret
} }
if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed { if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(generatedSecret.Annotations) patchData, err := metaAnnotationsPatch(generatedSecret.Annotations)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err)
@ -1587,19 +1660,38 @@ func (c *Cluster) syncLogicalBackupJob() error {
} }
c.logger.Infof("logical backup job %s updated", c.getLogicalBackupJobName()) c.logger.Infof("logical backup job %s updated", c.getLogicalBackupJobName())
} }
if match, reason := c.compareLogicalBackupJob(job, desiredJob); !match { if cmp := c.compareLogicalBackupJob(job, desiredJob); !cmp.match {
c.logger.Infof("logical job %s is not in the desired state and needs to be updated", c.logger.Infof("logical job %s is not in the desired state and needs to be updated",
c.getLogicalBackupJobName(), c.getLogicalBackupJobName(),
) )
if reason != "" { if len(cmp.reasons) != 0 {
c.logger.Infof("reason: %s", reason) for _, reason := range cmp.reasons {
c.logger.Infof("reason: %s", reason)
}
}
if len(cmp.deletedPodAnnotations) != 0 {
templateMetadataReq := map[string]map[string]map[string]map[string]map[string]map[string]map[string]*string{
"spec": {"jobTemplate": {"spec": {"template": {"metadata": {"annotations": {}}}}}}}
for _, anno := range cmp.deletedPodAnnotations {
templateMetadataReq["spec"]["jobTemplate"]["spec"]["template"]["metadata"]["annotations"][anno] = nil
}
patch, err := json.Marshal(templateMetadataReq)
if err != nil {
return fmt.Errorf("could not marshal ObjectMeta for logical backup job %q pod template: %v", jobName, err)
}
job, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), jobName, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "")
if err != nil {
c.logger.Errorf("failed to remove annotations from the logical backup job %q pod template: %v", jobName, err)
return err
}
} }
if err = c.patchLogicalBackupJob(desiredJob); err != nil { if err = c.patchLogicalBackupJob(desiredJob); err != nil {
return fmt.Errorf("could not update logical backup job to match desired state: %v", err) return fmt.Errorf("could not update logical backup job to match desired state: %v", err)
} }
c.logger.Info("the logical backup job is synced") c.logger.Info("the logical backup job is synced")
} }
if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations); changed { if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredJob.Annotations) patchData, err := metaAnnotationsPatch(desiredJob.Annotations)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err) return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err)

View File

@ -142,6 +142,181 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) {
} }
} }
func TestPodAnnotationsSync(t *testing.T) {
clusterName := "acid-test-cluster-2"
namespace := "default"
podAnnotation := "no-scale-down"
podAnnotations := map[string]string{podAnnotation: "true"}
customPodAnnotation := "foo"
customPodAnnotations := map[string]string{customPodAnnotation: "true"}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockClient := mocks.NewMockHTTPClient(ctrl)
client, _ := newFakeK8sAnnotationsClient()
pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
},
Spec: acidv1.PostgresSpec{
Volume: acidv1.Volume{
Size: "1Gi",
},
EnableConnectionPooler: boolToPointer(true),
EnableLogicalBackup: true,
EnableReplicaConnectionPooler: boolToPointer(true),
PodAnnotations: podAnnotations,
NumberOfInstances: 2,
},
}
var cluster = New(
Config{
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),
PatroniAPICheckTimeout: time.Duration(5),
PodManagementPolicy: "ordered_ready",
CustomPodAnnotations: customPodAnnotations,
ConnectionPooler: config.ConnectionPooler{
ConnectionPoolerDefaultCPURequest: "100m",
ConnectionPoolerDefaultCPULimit: "100m",
ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnectionPoolerDefaultMemoryLimit: "100Mi",
NumberOfInstances: k8sutil.Int32ToPointer(1),
},
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
MaxInstances: -1,
PodRoleLabel: "spilo-role",
ResourceCheckInterval: time.Duration(3),
ResourceCheckTimeout: time.Duration(10),
},
},
}, client, pg, logger, eventRecorder)
configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}`
response := http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(configJson))),
}
mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes()
cluster.patroni = patroni.New(patroniLogger, mockClient)
cluster.Name = clusterName
cluster.Namespace = namespace
clusterOptions := clusterLabelsOptions(cluster)
// create a statefulset
_, err := cluster.createStatefulSet()
assert.NoError(t, err)
// create a pods
podsList := createPods(cluster)
for _, pod := range podsList {
_, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
assert.NoError(t, err)
}
// create connection pooler
_, err = cluster.createConnectionPooler(mockInstallLookupFunction)
assert.NoError(t, err)
// create cron job
err = cluster.createLogicalBackupJob()
assert.NoError(t, err)
annotateResources(cluster)
err = cluster.Sync(&cluster.Postgresql)
assert.NoError(t, err)
// 1. PodAnnotations set
stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, sts := range stsList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.Contains(t, sts.Spec.Template.Annotations, annotation)
}
}
for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
assert.NoError(t, err)
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.Contains(t, deploy.Spec.Template.Annotations, annotation,
fmt.Sprintf("pooler deployment pod template %s should contain annotation %s, found %#v",
deploy.Name, annotation, deploy.Spec.Template.Annotations))
}
}
podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, pod := range podList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.Contains(t, pod.Annotations, annotation,
fmt.Sprintf("pod %s should contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations))
}
}
cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, cronJob := range cronJobList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.Contains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation,
fmt.Sprintf("logical backup cron job's pod template should contain annotation %s, found %#v",
annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations))
}
}
// 2 PodAnnotations removed
newSpec := cluster.Postgresql.DeepCopy()
newSpec.Spec.PodAnnotations = nil
cluster.OpConfig.CustomPodAnnotations = nil
err = cluster.Sync(newSpec)
assert.NoError(t, err)
stsList, err = cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, sts := range stsList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.NotContains(t, sts.Spec.Template.Annotations, annotation)
}
}
for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
assert.NoError(t, err)
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.NotContains(t, deploy.Spec.Template.Annotations, annotation,
fmt.Sprintf("pooler deployment pod template %s should not contain annotation %s, found %#v",
deploy.Name, annotation, deploy.Spec.Template.Annotations))
}
}
podList, err = cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, pod := range podList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.NotContains(t, pod.Annotations, annotation,
fmt.Sprintf("pod %s should not contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations))
}
}
cronJobList, err = cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, cronJob := range cronJobList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.NotContains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation,
fmt.Sprintf("logical backup cron job's pod template should not contain annotation %s, found %#v",
annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations))
}
}
}
func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
testName := "test config comparison" testName := "test config comparison"
client, _ := newFakeK8sSyncClient() client, _ := newFakeK8sSyncClient()

View File

@ -58,15 +58,16 @@ type WorkerStatus struct {
// ClusterStatus describes status of the cluster // ClusterStatus describes status of the cluster
type ClusterStatus struct { type ClusterStatus struct {
Team string Team string
Cluster string Cluster string
Namespace string Namespace string
MasterService *v1.Service MasterService *v1.Service
ReplicaService *v1.Service ReplicaService *v1.Service
MasterEndpoint *v1.Endpoints MasterEndpoint *v1.Endpoints
ReplicaEndpoint *v1.Endpoints ReplicaEndpoint *v1.Endpoints
StatefulSet *appsv1.StatefulSet StatefulSet *appsv1.StatefulSet
PodDisruptionBudget *policyv1.PodDisruptionBudget PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
CurrentProcess Process CurrentProcess Process
Worker uint32 Worker uint32

View File

@ -663,7 +663,7 @@ func parseResourceRequirements(resourcesRequirement v1.ResourceRequirements) (ac
return resources, nil return resources, nil
} }
func isInMainternanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool { func isInMaintenanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool {
if len(specMaintenanceWindows) == 0 { if len(specMaintenanceWindows) == 0 {
return true return true
} }

View File

@ -247,18 +247,18 @@ func createPods(cluster *Cluster) []v1.Pod {
for i, role := range []PostgresRole{Master, Replica} { for i, role := range []PostgresRole{Master, Replica} {
podsList = append(podsList, v1.Pod{ podsList = append(podsList, v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", clusterName, i), Name: fmt.Sprintf("%s-%d", cluster.Name, i),
Namespace: namespace, Namespace: namespace,
Labels: map[string]string{ Labels: map[string]string{
"application": "spilo", "application": "spilo",
"cluster-name": clusterName, "cluster-name": cluster.Name,
"spilo-role": string(role), "spilo-role": string(role),
}, },
}, },
}) })
podsList = append(podsList, v1.Pod{ podsList = append(podsList, v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-pooler-%s", clusterName, role), Name: fmt.Sprintf("%s-pooler-%s", cluster.Name, role),
Namespace: namespace, Namespace: namespace,
Labels: cluster.connectionPoolerLabels(role, true).MatchLabels, Labels: cluster.connectionPoolerLabels(role, true).MatchLabels,
}, },
@ -329,7 +329,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
if err != nil { if err != nil {
return nil, err return nil, err
} }
_, err = cluster.createPodDisruptionBudget() err = cluster.createPodDisruptionBudgets()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -705,8 +705,8 @@ func TestIsInMaintenanceWindow(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
cluster.Spec.MaintenanceWindows = tt.windows cluster.Spec.MaintenanceWindows = tt.windows
if isInMainternanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected { if isInMaintenanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected {
t.Errorf("Expected isInMainternanceWindow to return %t", tt.expected) t.Errorf("Expected isInMaintenanceWindow to return %t", tt.expected)
} }
}) })
} }

View File

@ -225,7 +225,7 @@ func (c *Cluster) syncVolumeClaims() error {
} }
newAnnotations := c.annotationsSet(nil) newAnnotations := c.annotationsSet(nil)
if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations); changed { if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations, nil); changed {
patchData, err := metaAnnotationsPatch(newAnnotations) patchData, err := metaAnnotationsPatch(newAnnotations)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err) return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err)

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2024 Compose, Zalando SE Copyright 2025 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -20,19 +20,19 @@ import (
) )
const ( const (
failoverPath = "/failover" switchoverPath = "/switchover"
configPath = "/config" configPath = "/config"
clusterPath = "/cluster" clusterPath = "/cluster"
statusPath = "/patroni" statusPath = "/patroni"
restartPath = "/restart" restartPath = "/restart"
ApiPort = 8008 ApiPort = 8008
timeout = 30 * time.Second timeout = 30 * time.Second
) )
// Interface describe patroni methods // Interface describe patroni methods
type Interface interface { type Interface interface {
GetClusterMembers(master *v1.Pod) ([]ClusterMember, error) GetClusterMembers(master *v1.Pod) ([]ClusterMember, error)
Switchover(master *v1.Pod, candidate string) error Switchover(master *v1.Pod, candidate string, scheduled_at string) error
SetPostgresParameters(server *v1.Pod, options map[string]string) error SetPostgresParameters(server *v1.Pod, options map[string]string) error
SetStandbyClusterParameters(server *v1.Pod, options map[string]interface{}) error SetStandbyClusterParameters(server *v1.Pod, options map[string]interface{}) error
GetMemberData(server *v1.Pod) (MemberData, error) GetMemberData(server *v1.Pod) (MemberData, error)
@ -103,7 +103,7 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer)
} }
}() }()
if resp.StatusCode != http.StatusOK { if resp.StatusCode < http.StatusOK || resp.StatusCode >= 300 {
bodyBytes, err := io.ReadAll(resp.Body) bodyBytes, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return fmt.Errorf("could not read response: %v", err) return fmt.Errorf("could not read response: %v", err)
@ -128,7 +128,7 @@ func (p *Patroni) httpGet(url string) (string, error) {
return "", fmt.Errorf("could not read response: %v", err) return "", fmt.Errorf("could not read response: %v", err)
} }
if response.StatusCode != http.StatusOK { if response.StatusCode < http.StatusOK || response.StatusCode >= 300 {
return string(bodyBytes), fmt.Errorf("patroni returned '%d'", response.StatusCode) return string(bodyBytes), fmt.Errorf("patroni returned '%d'", response.StatusCode)
} }
@ -136,9 +136,9 @@ func (p *Patroni) httpGet(url string) (string, error) {
} }
// Switchover by calling Patroni REST API // Switchover by calling Patroni REST API
func (p *Patroni) Switchover(master *v1.Pod, candidate string) error { func (p *Patroni) Switchover(master *v1.Pod, candidate string, scheduled_at string) error {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate}) err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate, "scheduled_at": scheduled_at})
if err != nil { if err != nil {
return fmt.Errorf("could not encode json: %v", err) return fmt.Errorf("could not encode json: %v", err)
} }
@ -146,7 +146,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
if err != nil { if err != nil {
return err return err
} }
return p.httpPostOrPatch(http.MethodPost, apiURLString+failoverPath, buf) return p.httpPostOrPatch(http.MethodPost, apiURLString+switchoverPath, buf)
} }
//TODO: add an option call /patroni to check if it is necessary to restart the server //TODO: add an option call /patroni to check if it is necessary to restart the server

View File

@ -81,8 +81,6 @@ spec:
] ]
} }
# Exemple of settings to make snapshot view working in the ui when using AWS # Exemple of settings to make snapshot view working in the ui when using AWS
# - name: WALE_S3_ENDPOINT
# value: https+path://s3.us-east-1.amazonaws.com:443
# - name: SPILO_S3_BACKUP_PREFIX # - name: SPILO_S3_BACKUP_PREFIX
# value: spilo/ # value: spilo/
# - name: AWS_ACCESS_KEY_ID # - name: AWS_ACCESS_KEY_ID
@ -102,5 +100,3 @@ spec:
# key: AWS_DEFAULT_REGION # key: AWS_DEFAULT_REGION
# - name: SPILO_S3_BACKUP_BUCKET # - name: SPILO_S3_BACKUP_BUCKET
# value: <s3 bucket used by the operator> # value: <s3 bucket used by the operator>
# - name: "USE_AWS_INSTANCE_PROFILE"
# value: "true"

View File

@ -95,14 +95,6 @@ DEFAULT_MEMORY_LIMIT = getenv('DEFAULT_MEMORY_LIMIT', '300Mi')
DEFAULT_CPU = getenv('DEFAULT_CPU', '10m') DEFAULT_CPU = getenv('DEFAULT_CPU', '10m')
DEFAULT_CPU_LIMIT = getenv('DEFAULT_CPU_LIMIT', '300m') DEFAULT_CPU_LIMIT = getenv('DEFAULT_CPU_LIMIT', '300m')
WALE_S3_ENDPOINT = getenv(
'WALE_S3_ENDPOINT',
'https+path://s3.eu-central-1.amazonaws.com:443',
)
USE_AWS_INSTANCE_PROFILE = (
getenv('USE_AWS_INSTANCE_PROFILE', 'false').lower() != 'false'
)
AWS_ENDPOINT = getenv('AWS_ENDPOINT') AWS_ENDPOINT = getenv('AWS_ENDPOINT')
@ -784,8 +776,6 @@ def get_versions(pg_cluster: str):
bucket=SPILO_S3_BACKUP_BUCKET, bucket=SPILO_S3_BACKUP_BUCKET,
pg_cluster=pg_cluster, pg_cluster=pg_cluster,
prefix=SPILO_S3_BACKUP_PREFIX, prefix=SPILO_S3_BACKUP_PREFIX,
s3_endpoint=WALE_S3_ENDPOINT,
use_aws_instance_profile=USE_AWS_INSTANCE_PROFILE,
), ),
) )
@ -797,9 +787,8 @@ def get_basebackups(pg_cluster: str, uid: str):
bucket=SPILO_S3_BACKUP_BUCKET, bucket=SPILO_S3_BACKUP_BUCKET,
pg_cluster=pg_cluster, pg_cluster=pg_cluster,
prefix=SPILO_S3_BACKUP_PREFIX, prefix=SPILO_S3_BACKUP_PREFIX,
s3_endpoint=WALE_S3_ENDPOINT,
uid=uid, uid=uid,
use_aws_instance_profile=USE_AWS_INSTANCE_PROFILE, postgresql_versions=OPERATOR_UI_CONFIG.get('postgresql_versions', DEFAULT_UI_CONFIG['postgresql_versions']),
), ),
) )
@ -991,8 +980,6 @@ def main(port, debug, clusters: list):
logger.info(f'Superuser team: {SUPERUSER_TEAM}') logger.info(f'Superuser team: {SUPERUSER_TEAM}')
logger.info(f'Target namespace: {TARGET_NAMESPACE}') logger.info(f'Target namespace: {TARGET_NAMESPACE}')
logger.info(f'Teamservice URL: {TEAM_SERVICE_URL}') logger.info(f'Teamservice URL: {TEAM_SERVICE_URL}')
logger.info(f'Use AWS instance_profile: {USE_AWS_INSTANCE_PROFILE}')
logger.info(f'WAL-E S3 endpoint: {WALE_S3_ENDPOINT}')
logger.info(f'AWS S3 endpoint: {AWS_ENDPOINT}') logger.info(f'AWS S3 endpoint: {AWS_ENDPOINT}')
if TARGET_NAMESPACE is None: if TARGET_NAMESPACE is None:

View File

@ -6,9 +6,8 @@ from os import environ, getenv
from requests import Session from requests import Session
from urllib.parse import urljoin from urllib.parse import urljoin
from uuid import UUID from uuid import UUID
from wal_e.cmd import configure_backup_cxt
from .utils import Attrs, defaulting, these from .utils import defaulting, these
from operator_ui.adapters.logger import logger from operator_ui.adapters.logger import logger
session = Session() session = Session()
@ -284,10 +283,8 @@ def read_stored_clusters(bucket, prefix, delimiter='/'):
def read_versions( def read_versions(
pg_cluster, pg_cluster,
bucket, bucket,
s3_endpoint,
prefix, prefix,
delimiter='/', delimiter='/',
use_aws_instance_profile=False,
): ):
return [ return [
'base' if uid == 'wal' else uid 'base' if uid == 'wal' else uid
@ -305,35 +302,72 @@ def read_versions(
if uid == 'wal' or defaulting(lambda: UUID(uid)) if uid == 'wal' or defaulting(lambda: UUID(uid))
] ]
BACKUP_VERSION_PREFIXES = ['', '10/', '11/', '12/', '13/', '14/', '15/', '16/', '17/'] def lsn_to_wal_segment_stop(finish_lsn, start_segment, wal_segment_size=16 * 1024 * 1024):
timeline = int(start_segment[:8], 16)
log_id = finish_lsn >> 32
seg_id = (finish_lsn & 0xFFFFFFFF) // wal_segment_size
return f"{timeline:08X}{log_id:08X}{seg_id:08X}"
def lsn_to_offset_hex(lsn, wal_segment_size=16 * 1024 * 1024):
return f"{lsn % wal_segment_size:08X}"
def read_basebackups( def read_basebackups(
pg_cluster, pg_cluster,
uid, uid,
bucket, bucket,
s3_endpoint,
prefix, prefix,
delimiter='/', postgresql_versions,
use_aws_instance_profile=False,
): ):
environ['WALE_S3_ENDPOINT'] = s3_endpoint
suffix = '' if uid == 'base' else '/' + uid suffix = '' if uid == 'base' else '/' + uid
backups = [] backups = []
for vp in BACKUP_VERSION_PREFIXES: for vp in postgresql_versions:
backup_prefix = f'{prefix}{pg_cluster}{suffix}/wal/{vp}/basebackups_005/'
logger.info(f"{bucket}/{backup_prefix}")
backups = backups + [ paginator = client('s3').get_paginator('list_objects_v2')
{ pages = paginator.paginate(Bucket=bucket, Prefix=backup_prefix)
key: value
for key, value in basebackup.__dict__.items() for page in pages:
if isinstance(value, str) or isinstance(value, int) for obj in page.get("Contents", []):
} key = obj["Key"]
for basebackup in Attrs.call( if not key.endswith("backup_stop_sentinel.json"):
f=configure_backup_cxt, continue
aws_instance_profile=use_aws_instance_profile,
s3_prefix=f's3://{bucket}/{prefix}{pg_cluster}{suffix}/wal/{vp}', response = client('s3').get_object(Bucket=bucket, Key=key)
)._backup_list(detail=True) backup_info = loads(response["Body"].read().decode("utf-8"))
] last_modified = response["LastModified"].astimezone(timezone.utc).isoformat()
backup_name = key.split("/")[-1].replace("_backup_stop_sentinel.json", "")
start_seg, start_offset = backup_name.split("_")[1], backup_name.split("_")[-1] if "_" in backup_name else None
if "LSN" in backup_info and "FinishLSN" in backup_info:
# WAL-G
lsn = backup_info["LSN"]
finish_lsn = backup_info["FinishLSN"]
backups.append({
"expanded_size_bytes": backup_info.get("UncompressedSize"),
"last_modified": last_modified,
"name": backup_name,
"wal_segment_backup_start": start_seg,
"wal_segment_backup_stop": lsn_to_wal_segment_stop(finish_lsn, start_seg),
"wal_segment_offset_backup_start": lsn_to_offset_hex(lsn),
"wal_segment_offset_backup_stop": lsn_to_offset_hex(finish_lsn),
})
elif "wal_segment_backup_stop" in backup_info:
# WAL-E
stop_seg = backup_info["wal_segment_backup_stop"]
stop_offset = backup_info["wal_segment_offset_backup_stop"]
backups.append({
"expanded_size_bytes": backup_info.get("expanded_size_bytes"),
"last_modified": last_modified,
"name": backup_name,
"wal_segment_backup_start": start_seg,
"wal_segment_backup_stop": stop_seg,
"wal_segment_offset_backup_start": start_offset,
"wal_segment_offset_backup_stop": stop_offset,
})
return backups return backups

View File

@ -11,5 +11,4 @@ kubernetes==11.0.0
python-json-logger==2.0.7 python-json-logger==2.0.7
requests==2.32.2 requests==2.32.2
stups-tokens>=1.1.19 stups-tokens>=1.1.19
wal_e==1.1.1
werkzeug==3.0.6 werkzeug==3.0.6