Merge branch 'master' into logical-backup-gcs-upload-metadata

This commit is contained in:
Felix Kunde 2025-07-15 12:12:28 +02:00 committed by GitHub
commit 3ef87b69d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1149 additions and 321 deletions

View File

@ -62,8 +62,6 @@ podAnnotations:
extraEnvs:
[]
# Exemple of settings to make snapshot view working in the ui when using AWS
# - name: WALE_S3_ENDPOINT
# value: https+path://s3.us-east-1.amazonaws.com:443
# - name: SPILO_S3_BACKUP_PREFIX
# value: spilo/
# - name: AWS_ACCESS_KEY_ID
@ -83,8 +81,6 @@ extraEnvs:
# key: AWS_DEFAULT_REGION
# - name: SPILO_S3_BACKUP_BUCKET
# value: <s3 bucket used by the operator>
# - name: "USE_AWS_INSTANCE_PROFILE"
# value: "true"
# configure UI service
service:

View File

@ -384,7 +384,7 @@ exceptions:
The interval of days can be set with `password_rotation_interval` (default
`90` = 90 days, minimum 1). On each rotation the user name and password values
are replaced in the K8s secret. They belong to a newly created user named after
the original role plus rotation date in YYMMDD format. All priviliges are
the original role plus rotation date in YYMMDD format. All privileges are
inherited meaning that migration scripts should still grant and revoke rights
against the original role. The timestamp of the next rotation (in RFC 3339
format, UTC timezone) is written to the secret as well. Note, if the rotation
@ -564,7 +564,7 @@ manifest affinity.
```
If `node_readiness_label_merge` is set to `"OR"` (default) the readiness label
affinty will be appended with its own expressions block:
affinity will be appended with its own expressions block:
```yaml
affinity:
@ -620,22 +620,34 @@ By default the topology key for the pod anti affinity is set to
`kubernetes.io/hostname`, you can set another topology key e.g.
`failure-domain.beta.kubernetes.io/zone`. See [built-in node labels](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#interlude-built-in-node-labels) for available topology keys.
## Pod Disruption Budget
## Pod Disruption Budgets
By default the operator uses a PodDisruptionBudget (PDB) to protect the cluster
from voluntarily disruptions and hence unwanted DB downtime. The `MinAvailable`
parameter of the PDB is set to `1` which prevents killing masters in single-node
clusters and/or the last remaining running instance in a multi-node cluster.
By default the operator creates two PodDisruptionBudgets (PDB) to protect the cluster
from voluntarily disruptions and hence unwanted DB downtime: so-called primary PDB and
and PDB for critical operations.
### Primary PDB
The `MinAvailable` parameter of this PDB is set to `1` and, if `pdb_master_label_selector`
is enabled, label selector includes `spilo-role=master` condition, which prevents killing
masters in single-node clusters and/or the last remaining running instance in a multi-node
cluster.
## PDB for critical operations
The `MinAvailable` parameter of this PDB is equal to the `numberOfInstances` set in the
cluster manifest, while label selector includes `critical-operation=true` condition. This
allows to protect all pods of a cluster, given they are labeled accordingly.
For example, Operator labels all Spilo pods with `critical-operation=true` during the major
version upgrade run. You may want to protect cluster pods during other critical operations
by assigning the label to pods yourself or using other means of automation.
The PDB is only relaxed in two scenarios:
* If a cluster is scaled down to `0` instances (e.g. for draining nodes)
* If the PDB is disabled in the configuration (`enable_pod_disruption_budget`)
The PDB is still in place having `MinAvailable` set to `0`. If enabled it will
be automatically set to `1` on scale up. Disabling PDBs helps avoiding blocking
Kubernetes upgrades in managed K8s environments at the cost of prolonged DB
downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384)
The PDBs are still in place having `MinAvailable` set to `0`. Disabling PDBs
helps avoiding blocking Kubernetes upgrades in managed K8s environments at the
cost of prolonged DB downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384)
for the use case.
## Add cluster-specific labels
@ -1128,7 +1140,7 @@ metadata:
iam.gke.io/gcp-service-account: <GCP_SERVICE_ACCOUNT_NAME>@<GCP_PROJECT_ID>.iam.gserviceaccount.com
```
2. Specify the new custom service account in your [operator paramaters](./reference/operator_parameters.md)
2. Specify the new custom service account in your [operator parameters](./reference/operator_parameters.md)
If using manual deployment or kustomize, this is done by setting
`pod_service_account_name` in your configuration file specified in the

View File

@ -230,7 +230,7 @@ kubectl delete postgresql acid-minimal-cluster
```
This should remove the associated StatefulSet, database Pods, Services and
Endpoints. The PersistentVolumes are released and the PodDisruptionBudget is
Endpoints. The PersistentVolumes are released and the PodDisruptionBudgets are
deleted. Secrets however are not deleted and backups will remain in place.
When deleting a cluster while it is still starting up or got stuck during that

View File

@ -116,9 +116,9 @@ These parameters are grouped directly under the `spec` key in the manifest.
* **maintenanceWindows**
a list which defines specific time frames when certain maintenance operations
are allowed. So far, it is only implemented for automatic major version
upgrades. Accepted formats are "01:00-06:00" for daily maintenance windows or
"Sat:00:00-04:00" for specific days, with all times in UTC.
such as automatic major upgrades or master pod migration. Accepted formats
are "01:00-06:00" for daily maintenance windows or "Sat:00:00-04:00" for specific
days, with all times in UTC.
* **users**
a map of usernames to user flags for the users that should be created in the
@ -247,7 +247,7 @@ These parameters are grouped directly under the `spec` key in the manifest.
[kubernetes volumeSource](https://godoc.org/k8s.io/api/core/v1#VolumeSource).
It allows you to mount existing PersistentVolumeClaims, ConfigMaps and Secrets inside the StatefulSet.
Also an `emptyDir` volume can be shared between initContainer and statefulSet.
Additionaly, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example).
Additionally, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example).
Set `isSubPathExpr` to true if you want to include [API environment variables](https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath-expanded-environment).
You can also specify in which container the additional Volumes will be mounted with the `targetContainers` array option.
If `targetContainers` is empty, additional volumes will be mounted only in the `postgres` container.
@ -257,7 +257,7 @@ These parameters are grouped directly under the `spec` key in the manifest.
## Prepared Databases
The operator can create databases with default owner, reader and writer roles
without the need to specifiy them under `users` or `databases` sections. Those
without the need to specify them under `users` or `databases` sections. Those
parameters are grouped under the `preparedDatabases` top-level key. For more
information, see [user docs](../user.md#prepared-databases-with-roles-and-default-privileges).

View File

@ -209,7 +209,7 @@ under the `users` key.
For all `LOGIN` roles that are not database owners the operator can rotate
credentials in the corresponding K8s secrets by replacing the username and
password. This means, new users will be added on each rotation inheriting
all priviliges from the original roles. The rotation date (in YYMMDD format)
all privileges from the original roles. The rotation date (in YYMMDD format)
is appended to the names of the new user. The timestamp of the next rotation
is written to the secret. The default is `false`.
@ -334,13 +334,13 @@ configuration they are grouped under the `kubernetes` key.
pod namespace).
* **pdb_name_format**
defines the template for PDB (Pod Disruption Budget) names created by the
defines the template for primary PDB (Pod Disruption Budget) name created by the
operator. The default is `postgres-{cluster}-pdb`, where `{cluster}` is
replaced by the cluster name. Only the `{cluster}` placeholders is allowed in
the template.
* **pdb_master_label_selector**
By default the PDB will match the master role hence preventing nodes to be
By default the primary PDB will match the master role hence preventing nodes to be
drained if the node_readiness_label is not used. If this option if set to
`false` the `spilo-role=master` selector will not be added to the PDB.
@ -552,7 +552,7 @@ configuration they are grouped under the `kubernetes` key.
pods with `InitialDelaySeconds: 6`, `PeriodSeconds: 10`, `TimeoutSeconds: 5`,
`SuccessThreshold: 1` and `FailureThreshold: 3`. When enabling readiness
probes it is recommended to switch the `pod_management_policy` to `parallel`
to avoid unneccesary waiting times in case of multiple instances failing.
to avoid unnecessary waiting times in case of multiple instances failing.
The default is `false`.
* **storage_resize_mode**
@ -701,7 +701,7 @@ In the CRD-based configuration they are grouped under the `load_balancer` key.
replaced by the cluster name, `{namespace}` is replaced with the namespace
and `{hostedzone}` is replaced with the hosted zone (the value of the
`db_hosted_zone` parameter). The `{team}` placeholder can still be used,
although it is not recommened because the team of a cluster can change.
although it is not recommended because the team of a cluster can change.
If the cluster name starts with the `teamId` it will also be part of the
DNS, aynway. No other placeholders are allowed!
@ -720,7 +720,7 @@ In the CRD-based configuration they are grouped under the `load_balancer` key.
is replaced by the cluster name, `{namespace}` is replaced with the
namespace and `{hostedzone}` is replaced with the hosted zone (the value of
the `db_hosted_zone` parameter). The `{team}` placeholder can still be used,
although it is not recommened because the team of a cluster can change.
although it is not recommended because the team of a cluster can change.
If the cluster name starts with the `teamId` it will also be part of the
DNS, aynway. No other placeholders are allowed!

View File

@ -900,7 +900,7 @@ the PostgreSQL version between source and target cluster has to be the same.
To start a cluster as standby, add the following `standby` section in the YAML
file. You can stream changes from archived WAL files (AWS S3 or Google Cloud
Storage) or from a remote primary. Only one option can be specfied in the
Storage) or from a remote primary. Only one option can be specified in the
manifest:
```yaml
@ -911,7 +911,7 @@ spec:
For GCS, you have to define STANDBY_GOOGLE_APPLICATION_CREDENTIALS as a
[custom pod environment variable](administrator.md#custom-pod-environment-variables).
It is not set from the config to allow for overridding.
It is not set from the config to allow for overriding.
```yaml
spec:
@ -1282,7 +1282,7 @@ minutes if the certificates have changed and reloads postgres accordingly.
### TLS certificates for connection pooler
By default, the pgBouncer image generates its own TLS certificate like Spilo.
When the `tls` section is specfied in the manifest it will be used for the
When the `tls` section is specified in the manifest it will be used for the
connection pooler pod(s) as well. The security context options are hard coded
to `runAsUser: 100` and `runAsGroup: 101`. The `fsGroup` will be the same
like for Spilo.

View File

@ -1187,7 +1187,7 @@ class EndToEndTestCase(unittest.TestCase):
Test major version upgrade: with full upgrade, maintenance window, and annotation
"""
def check_version():
p = k8s.patroni_rest("acid-upgrade-test-0", "")
p = k8s.patroni_rest("acid-upgrade-test-0", "") or {}
version = p.get("server_version", 0) // 10000
return version
@ -1237,7 +1237,7 @@ class EndToEndTestCase(unittest.TestCase):
# should not upgrade because current time is not in maintenanceWindow
current_time = datetime.now()
maintenance_window_future = f"{(current_time+timedelta(minutes=60)).strftime('%H:%M')}-{(current_time+timedelta(minutes=120)).strftime('%H:%M')}"
pg_patch_version_15 = {
pg_patch_version_15_outside_mw = {
"spec": {
"postgresql": {
"version": "15"
@ -1248,10 +1248,10 @@ class EndToEndTestCase(unittest.TestCase):
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15)
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_outside_mw)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
# no pod replacement outside of the maintenance window
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 14, "Version should not be upgraded")
@ -1259,12 +1259,12 @@ class EndToEndTestCase(unittest.TestCase):
second_annotations = get_annotations()
self.assertIsNone(second_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure should not be set")
# change the version again to trigger operator sync
# change maintenanceWindows to current
maintenance_window_current = f"{(current_time-timedelta(minutes=30)).strftime('%H:%M')}-{(current_time+timedelta(minutes=30)).strftime('%H:%M')}"
pg_patch_version_16 = {
pg_patch_version_15_in_mw = {
"spec": {
"postgresql": {
"version": "16"
"version": "15"
},
"maintenanceWindows": [
maintenance_window_current
@ -1273,13 +1273,13 @@ class EndToEndTestCase(unittest.TestCase):
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_16)
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label)
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 16, "Version should be upgraded from 14 to 16")
self.eventuallyEqual(check_version, 15, "Version should be upgraded from 14 to 15")
# check if annotation for last upgrade's success is updated after second upgrade
third_annotations = get_annotations()
@ -1303,20 +1303,20 @@ class EndToEndTestCase(unittest.TestCase):
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_17)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 16, "Version should not be upgraded because annotation for last upgrade's failure is set")
# change the version back to 15 and should remove failure annotation
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 15, "Version should not be upgraded because annotation for last upgrade's failure is set")
# change the version back to 15 and should remove failure annotation
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15_in_mw)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 15, "Version should not be upgraded from 15")
fourth_annotations = get_annotations()
self.assertIsNone(fourth_annotations.get("last-major-upgrade-failure"), "Annotation for last upgrade's failure is not removed")
@ -1752,9 +1752,13 @@ class EndToEndTestCase(unittest.TestCase):
Test password rotation and removal of users due to retention policy
'''
k8s = self.k8s
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
leader = k8s.get_cluster_leader_pod()
today = date.today()
# remember number of secrets to make sure it stays the same
secret_count = k8s.count_secrets_with_label(cluster_label)
# enable password rotation for owner of foo database
pg_patch_rotation_single_users = {
"spec": {
@ -1810,6 +1814,7 @@ class EndToEndTestCase(unittest.TestCase):
enable_password_rotation = {
"data": {
"enable_password_rotation": "true",
"inherited_annotations": "environment",
"password_rotation_interval": "30",
"password_rotation_user_retention": "30", # should be set to 60
},
@ -1856,13 +1861,29 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: len(self.query_database_with_user(leader.metadata.name, "postgres", "SELECT 1", "foo_user")), 1,
"Could not connect to the database with rotation user {}".format(rotation_user), 10, 5)
# add annotation which triggers syncSecrets call
pg_annotation_patch = {
"metadata": {
"annotations": {
"environment": "test",
}
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_annotation_patch)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
time.sleep(10)
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), secret_count, "Unexpected number of secrets")
# check if rotation has been ignored for user from test_cross_namespace_secrets test
db_user_secret = k8s.get_secret(username="test.db_user", namespace="test")
secret_username = str(base64.b64decode(db_user_secret.data["username"]), 'utf-8')
self.assertEqual("test.db_user", secret_username,
"Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username))
# check if annotation for secret has been updated
self.assertTrue("environment" in db_user_secret.metadata.annotations, "Added annotation was not propagated to secret")
# disable password rotation for all other users (foo_user)
# and pick smaller intervals to see if the third fake rotation user is dropped
enable_password_rotation = {
@ -2100,7 +2121,7 @@ class EndToEndTestCase(unittest.TestCase):
patch_sset_propagate_annotations = {
"data": {
"downscaler_annotations": "deployment-time,downscaler/*",
"inherited_annotations": "owned-by",
"inherited_annotations": "environment,owned-by",
}
}
k8s.update_config(patch_sset_propagate_annotations)
@ -2547,7 +2568,10 @@ class EndToEndTestCase(unittest.TestCase):
self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed")
pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed")
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "primary pod disruption budget owner reference check failed")
pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-op-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget for critical operations owner reference check failed")
pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed")

View File

@ -59,16 +59,17 @@ type Config struct {
}
type kubeResources struct {
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
VolumeClaims map[types.UID]*v1.PersistentVolumeClaim
PodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
VolumeClaims map[types.UID]*v1.PersistentVolumeClaim
PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
}
@ -105,10 +106,17 @@ type Cluster struct {
}
type compareStatefulsetResult struct {
match bool
replace bool
rollingUpdate bool
reasons []string
match bool
replace bool
rollingUpdate bool
reasons []string
deletedPodAnnotations []string
}
type compareLogicalBackupJobResult struct {
match bool
reasons []string
deletedPodAnnotations []string
}
// New creates a new cluster. This function should be called from a controller.
@ -336,14 +344,10 @@ func (c *Cluster) Create() (err error) {
c.logger.Infof("secrets have been successfully created")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created")
if c.PodDisruptionBudget != nil {
return fmt.Errorf("pod disruption budget already exists in the cluster")
if err = c.createPodDisruptionBudgets(); err != nil {
return fmt.Errorf("could not create pod disruption budgets: %v", err)
}
pdb, err := c.createPodDisruptionBudget()
if err != nil {
return fmt.Errorf("could not create pod disruption budget: %v", err)
}
c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta))
c.logger.Info("pod disruption budgets have been successfully created")
if c.Statefulset != nil {
return fmt.Errorf("statefulset already exists in the cluster")
@ -431,6 +435,7 @@ func (c *Cluster) Create() (err error) {
}
func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compareStatefulsetResult {
deletedPodAnnotations := []string{}
reasons := make([]string, 0)
var match, needsRollUpdate, needsReplace bool
@ -445,7 +450,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
needsReplace = true
reasons = append(reasons, "new statefulset's ownerReferences do not match")
}
if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations); changed {
if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations, nil); changed {
match = false
needsReplace = true
reasons = append(reasons, "new statefulset's annotations do not match: "+reason)
@ -519,7 +524,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
}
}
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed {
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations, &deletedPodAnnotations); changed {
match = false
needsReplace = true
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
@ -541,7 +546,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
continue
}
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed {
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations, nil); changed {
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q do not match the current ones: %s", name, reason))
}
@ -579,7 +584,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
match = false
}
return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace}
return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace, deletedPodAnnotations: deletedPodAnnotations}
}
type containerCondition func(a, b v1.Container) bool
@ -781,7 +786,7 @@ func volumeMountExists(mount v1.VolumeMount, mounts []v1.VolumeMount) bool {
return false
}
func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) {
func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]string) (bool, string) {
reason := ""
ignoredAnnotations := make(map[string]bool)
for _, ignore := range c.OpConfig.IgnoredAnnotations {
@ -794,6 +799,9 @@ func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string)
}
if _, ok := new[key]; !ok {
reason += fmt.Sprintf(" Removed %q.", key)
if removedList != nil {
*removedList = append(*removedList, key)
}
}
}
@ -836,41 +844,46 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
return true, ""
}
func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) {
func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLogicalBackupJobResult {
deletedPodAnnotations := []string{}
reasons := make([]string, 0)
match := true
if cur.Spec.Schedule != new.Spec.Schedule {
return false, fmt.Sprintf("new job's schedule %q does not match the current one %q",
new.Spec.Schedule, cur.Spec.Schedule)
match = false
reasons = append(reasons, fmt.Sprintf("new job's schedule %q does not match the current one %q", new.Spec.Schedule, cur.Spec.Schedule))
}
newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
curImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
if newImage != curImage {
return false, fmt.Sprintf("new job's image %q does not match the current one %q",
newImage, curImage)
match = false
reasons = append(reasons, fmt.Sprintf("new job's image %q does not match the current one %q", newImage, curImage))
}
newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation, &deletedPodAnnotations); changed {
match = false
reasons = append(reasons, fmt.Sprint("new job's pod template metadata annotations do not match "+reason))
}
newPgVersion := getPgVersion(new)
curPgVersion := getPgVersion(cur)
if newPgVersion != curPgVersion {
return false, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q",
newPgVersion, curPgVersion)
match = false
reasons = append(reasons, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", newPgVersion, curPgVersion))
}
needsReplace := false
reasons := make([]string, 0)
needsReplace, reasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, reasons)
contReasons := make([]string, 0)
needsReplace, contReasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, contReasons)
if needsReplace {
return false, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(reasons, `', '`))
match = false
reasons = append(reasons, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(contReasons, `', '`)))
}
return true, ""
return &compareLogicalBackupJobResult{match: match, reasons: reasons, deletedPodAnnotations: deletedPodAnnotations}
}
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
@ -881,7 +894,7 @@ func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBud
if !reflect.DeepEqual(new.ObjectMeta.OwnerReferences, cur.ObjectMeta.OwnerReferences) {
return false, "new PDB's owner references do not match the current ones"
}
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed {
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations, nil); changed {
return false, "new PDB's annotations do not match the current ones:" + reason
}
return true, ""
@ -957,6 +970,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
defer c.mu.Unlock()
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
// do not apply any major version related changes yet
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}
c.setSpec(newSpec)
defer func() {
@ -1016,10 +1034,18 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)
initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
if initUsers {
// if inherited annotations differ secrets have to be synced on update
newAnnotations := c.annotationsSet(nil)
oldAnnotations := make(map[string]string)
for _, secret := range c.Secrets {
oldAnnotations = secret.ObjectMeta.Annotations
break
}
annotationsChanged, _ := c.compareAnnotations(oldAnnotations, newAnnotations, nil)
if initUsers || annotationsChanged {
c.logger.Debug("initialize users")
if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
@ -1027,8 +1053,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
return
}
}
if initUsers || annotationsChanged {
c.logger.Debug("syncing secrets")
//TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
@ -1060,9 +1085,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
}
// pod disruption budget
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
// pod disruption budgets
if err := c.syncPodDisruptionBudgets(true); err != nil {
c.logger.Errorf("could not sync pod disruption budgets: %v", err)
updateFailed = true
}
@ -1135,6 +1160,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// streams
if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) {
c.logger.Debug("syncing streams")
if err := c.syncStreams(); err != nil {
c.logger.Errorf("could not sync streams: %v", err)
updateFailed = true
@ -1207,10 +1233,10 @@ func (c *Cluster) Delete() error {
c.logger.Info("not deleting secrets because disabled in configuration")
}
if err := c.deletePodDisruptionBudget(); err != nil {
if err := c.deletePodDisruptionBudgets(); err != nil {
anyErrors = true
c.logger.Warningf("could not delete pod disruption budget: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err)
c.logger.Warningf("could not delete pod disruption budgets: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budgets: %v", err)
}
for _, role := range []PostgresRole{Master, Replica} {
@ -1709,16 +1735,17 @@ func (c *Cluster) GetCurrentProcess() Process {
// GetStatus provides status of the cluster
func (c *Cluster) GetStatus() *ClusterStatus {
status := &ClusterStatus{
Cluster: c.Name,
Namespace: c.Namespace,
Team: c.Spec.TeamID,
Status: c.Status,
Spec: c.Spec,
MasterService: c.GetServiceMaster(),
ReplicaService: c.GetServiceReplica(),
StatefulSet: c.GetStatefulSet(),
PodDisruptionBudget: c.GetPodDisruptionBudget(),
CurrentProcess: c.GetCurrentProcess(),
Cluster: c.Name,
Namespace: c.Namespace,
Team: c.Spec.TeamID,
Status: c.Status,
Spec: c.Spec,
MasterService: c.GetServiceMaster(),
ReplicaService: c.GetServiceReplica(),
StatefulSet: c.GetStatefulSet(),
PrimaryPodDisruptionBudget: c.GetPrimaryPodDisruptionBudget(),
CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(),
CurrentProcess: c.GetCurrentProcess(),
Error: fmt.Errorf("error: %s", c.Error),
}
@ -1731,18 +1758,58 @@ func (c *Cluster) GetStatus() *ClusterStatus {
return status
}
// Switchover does a switchover (via Patroni) to a candidate pod
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error {
func (c *Cluster) GetSwitchoverSchedule() string {
var possibleSwitchover, schedule time.Time
now := time.Now().UTC()
for _, window := range c.Spec.MaintenanceWindows {
// in the best case it is possible today
possibleSwitchover = time.Date(now.Year(), now.Month(), now.Day(), window.StartTime.Hour(), window.StartTime.Minute(), 0, 0, time.UTC)
if window.Everyday {
if now.After(possibleSwitchover) {
// we are already past the time for today, try tomorrow
possibleSwitchover = possibleSwitchover.AddDate(0, 0, 1)
}
} else {
if now.Weekday() != window.Weekday {
// get closest possible time for this window
possibleSwitchover = possibleSwitchover.AddDate(0, 0, int((7+window.Weekday-now.Weekday())%7))
} else if now.After(possibleSwitchover) {
// we are already past the time for today, try next week
possibleSwitchover = possibleSwitchover.AddDate(0, 0, 7)
}
}
if (schedule == time.Time{}) || possibleSwitchover.Before(schedule) {
schedule = possibleSwitchover
}
}
return schedule.Format("2006-01-02T15:04+00")
}
// Switchover does a switchover (via Patroni) to a candidate pod
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName, scheduled bool) error {
var err error
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
stopCh := make(chan struct{})
ch := c.registerPodSubscriber(candidate)
defer c.unregisterPodSubscriber(candidate)
defer close(stopCh)
if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
var scheduled_at string
if scheduled {
scheduled_at = c.GetSwitchoverSchedule()
} else {
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
scheduled_at = ""
}
if err = c.patroni.Switchover(curMaster, candidate.Name, scheduled_at); err == nil {
if scheduled {
c.logger.Infof("switchover from %q to %q is scheduled at %s", curMaster.Name, candidate, scheduled_at)
return nil
}
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
_, err = c.waitForPodLabel(ch, stopCh, nil)
@ -1750,6 +1817,9 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
err = fmt.Errorf("could not get master pod label: %v", err)
}
} else {
if scheduled {
return fmt.Errorf("could not schedule switchover: %v", err)
}
err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
}

View File

@ -1680,12 +1680,20 @@ func TestCompareLogicalBackupJob(t *testing.T) {
}
}
match, reason := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob)
if match != tt.match {
t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), match, currentCronJob, desiredCronJob)
} else {
if !strings.HasPrefix(reason, tt.reason) {
t.Errorf("%s - expected reason prefix %s, found %s", t.Name(), tt.reason, reason)
cmp := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob)
if cmp.match != tt.match {
t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), cmp.match, currentCronJob, desiredCronJob)
} else if !cmp.match {
found := false
for _, reason := range cmp.reasons {
if strings.HasPrefix(reason, tt.reason) {
found = true
break
}
found = false
}
if !found {
t.Errorf("%s - expected reason prefix %s, not found in %#v", t.Name(), tt.reason, cmp.reasons)
}
}
})
@ -2057,3 +2065,91 @@ func TestCompareVolumeMounts(t *testing.T) {
})
}
}
func TestGetSwitchoverSchedule(t *testing.T) {
now := time.Now()
futureTimeStart := now.Add(1 * time.Hour)
futureWindowTimeStart := futureTimeStart.Format("15:04")
futureWindowTimeEnd := now.Add(2 * time.Hour).Format("15:04")
pastTimeStart := now.Add(-2 * time.Hour)
pastWindowTimeStart := pastTimeStart.Format("15:04")
pastWindowTimeEnd := now.Add(-1 * time.Hour).Format("15:04")
tests := []struct {
name string
windows []acidv1.MaintenanceWindow
expected string
}{
{
name: "everyday maintenance windows is later today",
windows: []acidv1.MaintenanceWindow{
{
Everyday: true,
StartTime: mustParseTime(futureWindowTimeStart),
EndTime: mustParseTime(futureWindowTimeEnd),
},
},
expected: futureTimeStart.Format("2006-01-02T15:04+00"),
},
{
name: "everyday maintenance window is tomorrow",
windows: []acidv1.MaintenanceWindow{
{
Everyday: true,
StartTime: mustParseTime(pastWindowTimeStart),
EndTime: mustParseTime(pastWindowTimeEnd),
},
},
expected: pastTimeStart.AddDate(0, 0, 1).Format("2006-01-02T15:04+00"),
},
{
name: "weekday maintenance windows is later today",
windows: []acidv1.MaintenanceWindow{
{
Weekday: now.Weekday(),
StartTime: mustParseTime(futureWindowTimeStart),
EndTime: mustParseTime(futureWindowTimeEnd),
},
},
expected: futureTimeStart.Format("2006-01-02T15:04+00"),
},
{
name: "weekday maintenance windows is passed for today",
windows: []acidv1.MaintenanceWindow{
{
Weekday: now.Weekday(),
StartTime: mustParseTime(pastWindowTimeStart),
EndTime: mustParseTime(pastWindowTimeEnd),
},
},
expected: pastTimeStart.AddDate(0, 0, 7).Format("2006-01-02T15:04+00"),
},
{
name: "choose the earliest window",
windows: []acidv1.MaintenanceWindow{
{
Weekday: now.AddDate(0, 0, 2).Weekday(),
StartTime: mustParseTime(futureWindowTimeStart),
EndTime: mustParseTime(futureWindowTimeEnd),
},
{
Everyday: true,
StartTime: mustParseTime(pastWindowTimeStart),
EndTime: mustParseTime(pastWindowTimeEnd),
},
},
expected: pastTimeStart.AddDate(0, 0, 1).Format("2006-01-02T15:04+00"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cluster.Spec.MaintenanceWindows = tt.windows
schedule := cluster.GetSwitchoverSchedule()
if schedule != tt.expected {
t.Errorf("Expected GetSwitchoverSchedule to return %s, returned: %s", tt.expected, schedule)
}
})
}
}

View File

@ -2,6 +2,7 @@ package cluster
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
@ -977,6 +978,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
err error
)
updatedPodAnnotations := map[string]*string{}
syncReason := make([]string, 0)
deployment, err = c.KubeClient.
Deployments(c.Namespace).
@ -1038,9 +1040,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
}
newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec))
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed {
deletedPodAnnotations := []string{}
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations, &deletedPodAnnotations); changed {
specSync = true
syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...)
for _, anno := range deletedPodAnnotations {
updatedPodAnnotations[anno] = nil
}
templateMetadataReq := map[string]map[string]map[string]map[string]map[string]*string{
"spec": {"template": {"metadata": {"annotations": updatedPodAnnotations}}}}
patch, err := json.Marshal(templateMetadataReq)
if err != nil {
return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pod template: %v", role, err)
}
deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(),
deployment.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "")
if err != nil {
c.logger.Errorf("failed to patch %s connection pooler's pod template: %v", role, err)
return nil, err
}
deployment.Spec.Template.Annotations = newPodAnnotations
}
@ -1064,7 +1084,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
}
newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations
if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations); changed {
if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations, nil); changed {
deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations)
if err != nil {
return nil, err
@ -1098,14 +1118,20 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil {
return nil, fmt.Errorf("could not delete pooler pod: %v", err)
}
} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed {
patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err)
} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations, nil); changed {
metadataReq := map[string]map[string]map[string]*string{"metadata": {}}
for anno, val := range deployment.Spec.Template.Annotations {
updatedPodAnnotations[anno] = &val
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
metadataReq["metadata"]["annotations"] = updatedPodAnnotations
patch, err := json.Marshal(metadataReq)
if err != nil {
return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err)
return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pods: %v", role, err)
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("could not patch annotations for %s connection pooler's pod %q: %v", role, pod.Name, err)
}
}
}

View File

@ -109,10 +109,15 @@ func (c *Cluster) servicePort(role PostgresRole) int32 {
return pgPort
}
func (c *Cluster) podDisruptionBudgetName() string {
func (c *Cluster) PrimaryPodDisruptionBudgetName() string {
return c.OpConfig.PDBNameFormat.Format("cluster", c.Name)
}
func (c *Cluster) criticalOpPodDisruptionBudgetName() string {
pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-op-pdb")
return pdbTemplate.Format("cluster", c.Name)
}
func makeDefaultResources(config *config.Config) acidv1.Resources {
defaultRequests := acidv1.ResourceDescription{
@ -1005,6 +1010,9 @@ func (c *Cluster) generateSpiloPodEnvVars(
if c.patroniUsesKubernetes() {
envVars = append(envVars, v1.EnvVar{Name: "DCS_ENABLE_KUBERNETES_API", Value: "true"})
if c.OpConfig.EnablePodDisruptionBudget != nil && *c.OpConfig.EnablePodDisruptionBudget {
envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_BOOTSTRAP_LABELS", Value: "{\"critical-operation\":\"true\"}"})
}
} else {
envVars = append(envVars, v1.EnvVar{Name: "ETCD_HOST", Value: c.OpConfig.EtcdHost})
}
@ -2207,7 +2215,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript
return result
}
func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget {
func (c *Cluster) generatePrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget {
minAvailable := intstr.FromInt(1)
pdbEnabled := c.OpConfig.EnablePodDisruptionBudget
pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector
@ -2225,7 +2233,36 @@ func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: c.podDisruptionBudgetName(),
Name: c.PrimaryPodDisruptionBudgetName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: c.annotationsSet(nil),
OwnerReferences: c.ownerReferences(),
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
},
}
}
func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget {
minAvailable := intstr.FromInt32(c.Spec.NumberOfInstances)
pdbEnabled := c.OpConfig.EnablePodDisruptionBudget
// if PodDisruptionBudget is disabled or if there are no DB pods, set the budget to 0.
if (pdbEnabled != nil && !(*pdbEnabled)) || c.Spec.NumberOfInstances <= 0 {
minAvailable = intstr.FromInt(0)
}
labels := c.labelsSet(false)
labels["critical-operation"] = "true"
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: c.criticalOpPodDisruptionBudgetName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: c.annotationsSet(nil),

View File

@ -2349,22 +2349,34 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
}
}
testLabelsAndSelectors := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector
if podDisruptionBudget.ObjectMeta.Namespace != "myapp" {
return fmt.Errorf("Object Namespace incorrect.")
}
if !reflect.DeepEqual(podDisruptionBudget.Labels, map[string]string{"team": "myapp", "cluster-name": "myapp-database"}) {
return fmt.Errorf("Labels incorrect.")
}
if !masterLabelSelectorDisabled &&
!reflect.DeepEqual(podDisruptionBudget.Spec.Selector, &metav1.LabelSelector{
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}) {
testLabelsAndSelectors := func(isPrimary bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
return func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector
if podDisruptionBudget.ObjectMeta.Namespace != "myapp" {
return fmt.Errorf("Object Namespace incorrect.")
}
expectedLabels := map[string]string{"team": "myapp", "cluster-name": "myapp-database"}
if !reflect.DeepEqual(podDisruptionBudget.Labels, expectedLabels) {
return fmt.Errorf("Labels incorrect, got %#v, expected %#v", podDisruptionBudget.Labels, expectedLabels)
}
if !masterLabelSelectorDisabled {
if isPrimary {
expectedLabels := &metav1.LabelSelector{
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}
if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) {
return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels)
}
} else {
expectedLabels := &metav1.LabelSelector{
MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operation": "true"}}
if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) {
return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels)
}
}
}
return fmt.Errorf("MatchLabels incorrect.")
return nil
}
return nil
}
testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
@ -2400,7 +2412,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2417,7 +2429,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2434,7 +2446,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2451,7 +2463,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-databass-budget"),
hasMinAvailable(1),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2468,7 +2480,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2485,13 +2497,99 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
}
for _, tt := range tests {
result := tt.spec.generatePodDisruptionBudget()
result := tt.spec.generatePrimaryPodDisruptionBudget()
for _, check := range tt.check {
err := check(tt.spec, result)
if err != nil {
t.Errorf("%s [%s]: PodDisruptionBudget spec is incorrect, %+v",
testName, tt.scenario, err)
}
}
}
testCriticalOp := []struct {
scenario string
spec *Cluster
check []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error
}{
{
scenario: "With multiple instances",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(3),
testLabelsAndSelectors(false),
},
},
{
scenario: "With zero instances",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors(false),
},
},
{
scenario: "With PodDisruptionBudget disabled",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors(false),
},
},
{
scenario: "With OwnerReference enabled",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(3),
testLabelsAndSelectors(false),
},
},
}
for _, tt := range testCriticalOp {
result := tt.spec.generateCriticalOpPodDisruptionBudget()
for _, check := range tt.check {
err := check(tt.spec, result)
if err != nil {

View File

@ -106,6 +106,22 @@ func (c *Cluster) removeFailuresAnnotation() error {
return nil
}
func (c *Cluster) criticalOperationLabel(pods []v1.Pod, value *string) error {
metadataReq := map[string]map[string]map[string]*string{"metadata": {"labels": {"critical-operation": value}}}
patchReq, err := json.Marshal(metadataReq)
if err != nil {
return fmt.Errorf("could not marshal ObjectMeta: %v", err)
}
for _, pod := range pods {
_, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchReq, metav1.PatchOptions{})
if err != nil {
return err
}
}
return nil
}
/*
Execute upgrade when mode is set to manual or full or when the owning team is allowed for upgrade (and mode is "off").
@ -129,17 +145,13 @@ func (c *Cluster) majorVersionUpgrade() error {
return nil
}
if !isInMainternanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping major version upgrade, not in maintenance window")
return nil
}
pods, err := c.listPods()
if err != nil {
return err
}
allRunning := true
isStandbyCluster := false
var masterPod *v1.Pod
@ -147,8 +159,9 @@ func (c *Cluster) majorVersionUpgrade() error {
ps, _ := c.patroni.GetMemberData(&pod)
if ps.Role == "standby_leader" {
c.logger.Errorf("skipping major version upgrade for %s/%s standby cluster. Re-deploy standby cluster with the required Postgres version specified", c.Namespace, c.Name)
return nil
isStandbyCluster = true
c.currentMajorVersion = ps.ServerVersion
break
}
if ps.State != "running" {
@ -175,6 +188,9 @@ func (c *Cluster) majorVersionUpgrade() error {
}
c.logger.Infof("recheck cluster version is already up to date. current: %d, min desired: %d", c.currentMajorVersion, desiredVersion)
return nil
} else if isStandbyCluster {
c.logger.Warnf("skipping major version upgrade for %s/%s standby cluster. Re-deploy standby cluster with the required Postgres version specified", c.Namespace, c.Name)
return nil
}
if _, exists := c.ObjectMeta.Annotations[majorVersionUpgradeFailureAnnotation]; exists {
@ -182,6 +198,11 @@ func (c *Cluster) majorVersionUpgrade() error {
return nil
}
if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping major version upgrade, not in maintenance window")
return nil
}
members, err := c.patroni.GetClusterMembers(masterPod)
if err != nil {
c.logger.Error("could not get cluster members data from Patroni API, skipping major version upgrade")
@ -219,6 +240,17 @@ func (c *Cluster) majorVersionUpgrade() error {
if allRunning && masterPod != nil {
c.logger.Infof("healthy cluster ready to upgrade, current: %d desired: %d", c.currentMajorVersion, desiredVersion)
if c.currentMajorVersion < desiredVersion {
defer func() error {
if err = c.criticalOperationLabel(pods, nil); err != nil {
return fmt.Errorf("failed to remove critical-operation label: %s", err)
}
return nil
}()
val := "true"
if err = c.criticalOperationLabel(pods, &val); err != nil {
return fmt.Errorf("failed to assign critical-operation label: %s", err)
}
podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name}
c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)

View File

@ -280,11 +280,16 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
return fmt.Errorf("could not move pod: %v", err)
}
scheduleSwitchover := false
if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("postponing switchover, not in maintenance window")
scheduleSwitchover = true
}
err = retryutil.Retry(1*time.Minute, 5*time.Minute,
func() (bool, error) {
err := c.Switchover(oldMaster, masterCandidateName)
err := c.Switchover(oldMaster, masterCandidateName, scheduleSwitchover)
if err != nil {
c.logger.Errorf("could not failover to pod %q: %v", masterCandidateName, err)
c.logger.Errorf("could not switchover to pod %q: %v", masterCandidateName, err)
return false, nil
}
return true, nil
@ -445,7 +450,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
// do not recreate master now so it will keep the update flag and switchover will be retried on next sync
return fmt.Errorf("skipping switchover: %v", err)
}
if err := c.Switchover(masterPod, masterCandidate); err != nil {
if err := c.Switchover(masterPod, masterCandidate, false); err != nil {
return fmt.Errorf("could not perform switch over: %v", err)
}
} else if newMasterPod == nil && len(replicas) == 0 {

View File

@ -23,8 +23,13 @@ const (
)
func (c *Cluster) listResources() error {
if c.PodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID)
if c.PrimaryPodDisruptionBudget != nil {
c.logger.Infof("found primary pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta), c.PrimaryPodDisruptionBudget.UID)
}
if c.CriticalOpPodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget for critical operations: %q (uid: %q)", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta), c.CriticalOpPodDisruptionBudget.UID)
}
if c.Statefulset != nil {
@ -162,8 +167,8 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
return fmt.Errorf("pod %q does not belong to cluster", podName)
}
if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name); err != nil {
return fmt.Errorf("could not failover: %v", err)
if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name, ""); err != nil {
return fmt.Errorf("could not switchover: %v", err)
}
return nil
@ -329,7 +334,7 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe
}
}
if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed {
if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(newService.Annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err)
@ -417,59 +422,128 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset
return result
}
func (c *Cluster) createPodDisruptionBudget() (*policyv1.PodDisruptionBudget, error) {
podDisruptionBudgetSpec := c.generatePodDisruptionBudget()
func (c *Cluster) createPrimaryPodDisruptionBudget() error {
c.logger.Debug("creating primary pod disruption budget")
if c.PrimaryPodDisruptionBudget != nil {
c.logger.Warning("primary pod disruption budget already exists in the cluster")
return nil
}
podDisruptionBudgetSpec := c.generatePrimaryPodDisruptionBudget()
podDisruptionBudget, err := c.KubeClient.
PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
if err != nil {
return nil, err
return err
}
c.PodDisruptionBudget = podDisruptionBudget
c.logger.Infof("primary pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta))
c.PrimaryPodDisruptionBudget = podDisruptionBudget
return podDisruptionBudget, nil
return nil
}
func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
if c.PodDisruptionBudget == nil {
return fmt.Errorf("there is no pod disruption budget in the cluster")
func (c *Cluster) createCriticalOpPodDisruptionBudget() error {
c.logger.Debug("creating pod disruption budget for critical operations")
if c.CriticalOpPodDisruptionBudget != nil {
c.logger.Warning("pod disruption budget for critical operations already exists in the cluster")
return nil
}
if err := c.deletePodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete pod disruption budget: %v", err)
podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget()
podDisruptionBudget, err := c.KubeClient.
PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
if err != nil {
return err
}
c.logger.Infof("pod disruption budget for critical operations %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta))
c.CriticalOpPodDisruptionBudget = podDisruptionBudget
return nil
}
func (c *Cluster) createPodDisruptionBudgets() error {
errors := make([]string, 0)
err := c.createPrimaryPodDisruptionBudget()
if err != nil {
errors = append(errors, fmt.Sprintf("could not create primary pod disruption budget: %v", err))
}
err = c.createCriticalOpPodDisruptionBudget()
if err != nil {
errors = append(errors, fmt.Sprintf("could not create pod disruption budget for critical operations: %v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}
func (c *Cluster) updatePrimaryPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
c.logger.Debug("updating primary pod disruption budget")
if c.PrimaryPodDisruptionBudget == nil {
return fmt.Errorf("there is no primary pod disruption budget in the cluster")
}
if err := c.deletePrimaryPodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
}
newPdb, err := c.KubeClient.
PodDisruptionBudgets(pdb.Namespace).
Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create pod disruption budget: %v", err)
return fmt.Errorf("could not create primary pod disruption budget: %v", err)
}
c.PodDisruptionBudget = newPdb
c.PrimaryPodDisruptionBudget = newPdb
return nil
}
func (c *Cluster) deletePodDisruptionBudget() error {
c.logger.Debug("deleting pod disruption budget")
if c.PodDisruptionBudget == nil {
c.logger.Debug("there is no pod disruption budget in the cluster")
func (c *Cluster) updateCriticalOpPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
c.logger.Debug("updating pod disruption budget for critical operations")
if c.CriticalOpPodDisruptionBudget == nil {
return fmt.Errorf("there is no pod disruption budget for critical operations in the cluster")
}
if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
newPdb, err := c.KubeClient.
PodDisruptionBudgets(pdb.Namespace).
Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err)
}
c.CriticalOpPodDisruptionBudget = newPdb
return nil
}
func (c *Cluster) deletePrimaryPodDisruptionBudget() error {
c.logger.Debug("deleting primary pod disruption budget")
if c.PrimaryPodDisruptionBudget == nil {
c.logger.Debug("there is no primary pod disruption budget in the cluster")
return nil
}
pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)
pdbName := util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)
err := c.KubeClient.
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace).
Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions)
PodDisruptionBudgets(c.PrimaryPodDisruptionBudget.Namespace).
Delete(context.TODO(), c.PrimaryPodDisruptionBudget.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta))
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta))
} else if err != nil {
return fmt.Errorf("could not delete PodDisruptionBudget: %v", err)
return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
}
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta))
c.PodDisruptionBudget = nil
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta))
c.PrimaryPodDisruptionBudget = nil
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) {
@ -483,12 +557,67 @@ func (c *Cluster) deletePodDisruptionBudget() error {
return false, err2
})
if err != nil {
return fmt.Errorf("could not delete pod disruption budget: %v", err)
return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
}
return nil
}
func (c *Cluster) deleteCriticalOpPodDisruptionBudget() error {
c.logger.Debug("deleting pod disruption budget for critical operations")
if c.CriticalOpPodDisruptionBudget == nil {
c.logger.Debug("there is no pod disruption budget for critical operations in the cluster")
return nil
}
pdbName := util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)
err := c.KubeClient.
PodDisruptionBudgets(c.CriticalOpPodDisruptionBudget.Namespace).
Delete(context.TODO(), c.CriticalOpPodDisruptionBudget.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta))
} else if err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta))
c.CriticalOpPodDisruptionBudget = nil
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) {
_, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{})
if err2 == nil {
return false, nil
}
if k8sutil.ResourceNotFound(err2) {
return true, nil
}
return false, err2
})
if err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
return nil
}
func (c *Cluster) deletePodDisruptionBudgets() error {
errors := make([]string, 0)
if err := c.deletePrimaryPodDisruptionBudget(); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}
func (c *Cluster) deleteEndpoint(role PostgresRole) error {
c.setProcessName("deleting endpoint")
c.logger.Debugf("deleting %s endpoint", role)
@ -705,7 +834,12 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet {
return c.Statefulset
}
// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget
func (c *Cluster) GetPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.PodDisruptionBudget
// GetPrimaryPodDisruptionBudget returns cluster's primary kubernetes PodDisruptionBudget
func (c *Cluster) GetPrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.PrimaryPodDisruptionBudget
}
// GetCriticalOpPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations
func (c *Cluster) GetCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.CriticalOpPodDisruptionBudget
}

View File

@ -114,10 +114,10 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
}
for slotName, slotAndPublication := range databaseSlotsList {
tables := slotAndPublication.Publication
tableNames := make([]string, len(tables))
newTables := slotAndPublication.Publication
tableNames := make([]string, len(newTables))
i := 0
for t := range tables {
for t := range newTables {
tableName, schemaName := getTableSchema(t)
tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
i++
@ -126,6 +126,12 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
tableList := strings.Join(tableNames, ", ")
currentTables, exists := currentPublications[slotName]
// if newTables is empty it means that it's definition was removed from streams section
// but when slot is defined in manifest we should sync publications, too
// by reusing current tables we make sure it is not
if len(newTables) == 0 {
tableList = currentTables
}
if !exists {
createPublications[slotName] = tableList
} else if currentTables != tableList {
@ -350,16 +356,8 @@ func (c *Cluster) syncStreams() error {
return nil
}
databaseSlots := make(map[string]map[string]zalandov1.Slot)
slotsToSync := make(map[string]map[string]string)
requiredPatroniConfig := c.Spec.Patroni
if len(requiredPatroniConfig.Slots) > 0 {
for slotName, slotConfig := range requiredPatroniConfig.Slots {
slotsToSync[slotName] = slotConfig
}
}
// create map with every database and empty slot defintion
// we need it to detect removal of streams from databases
if err := c.initDbConn(); err != nil {
return fmt.Errorf("could not init database connection")
}
@ -372,13 +370,28 @@ func (c *Cluster) syncStreams() error {
if err != nil {
return fmt.Errorf("could not get list of databases: %v", err)
}
// get database name with empty list of slot, except template0 and template1
databaseSlots := make(map[string]map[string]zalandov1.Slot)
for dbName := range listDatabases {
if dbName != "template0" && dbName != "template1" {
databaseSlots[dbName] = map[string]zalandov1.Slot{}
}
}
// need to take explicitly defined slots into account whey syncing Patroni config
slotsToSync := make(map[string]map[string]string)
requiredPatroniConfig := c.Spec.Patroni
if len(requiredPatroniConfig.Slots) > 0 {
for slotName, slotConfig := range requiredPatroniConfig.Slots {
slotsToSync[slotName] = slotConfig
if _, exists := databaseSlots[slotConfig["database"]]; exists {
databaseSlots[slotConfig["database"]][slotName] = zalandov1.Slot{
Slot: slotConfig,
Publication: make(map[string]acidv1.StreamTable),
}
}
}
}
// get list of required slots and publications, group by database
for _, stream := range c.Spec.Streams {
if _, exists := databaseSlots[stream.Database]; !exists {
@ -391,13 +404,13 @@ func (c *Cluster) syncStreams() error {
"type": "logical",
}
slotName := getSlotName(stream.Database, stream.ApplicationId)
if _, exists := databaseSlots[stream.Database][slotName]; !exists {
slotAndPublication, exists := databaseSlots[stream.Database][slotName]
if !exists {
databaseSlots[stream.Database][slotName] = zalandov1.Slot{
Slot: slot,
Publication: stream.Tables,
}
} else {
slotAndPublication := databaseSlots[stream.Database][slotName]
streamTables := slotAndPublication.Publication
for tableName, table := range stream.Tables {
if _, exists := streamTables[tableName]; !exists {
@ -492,16 +505,17 @@ func (c *Cluster) syncStream(appId string) error {
continue
}
streamExists = true
c.Streams[appId] = &stream
desiredStreams := c.generateFabricEventStream(appId)
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
c.setProcessName("updating event streams with applicationId %s", appId)
stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{})
updatedStream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
}
c.Streams[appId] = stream
c.Streams[appId] = updatedStream
}
if match, reason := c.compareStreams(&stream, desiredStreams); !match {
c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason)
@ -545,7 +559,7 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab
for newKey, newValue := range newEventStreams.Annotations {
desiredAnnotations[newKey] = newValue
}
if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed {
if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations, nil); changed {
match = false
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
}

View File

@ -97,6 +97,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
}
if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
// do not apply any major version related changes yet
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}
if err = c.syncStatefulSet(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
err = fmt.Errorf("could not sync statefulsets: %v", err)
@ -112,8 +117,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
c.logger.Debug("syncing pod disruption budgets")
if err = c.syncPodDisruptionBudget(false); err != nil {
err = fmt.Errorf("could not sync pod disruption budget: %v", err)
if err = c.syncPodDisruptionBudgets(false); err != nil {
err = fmt.Errorf("could not sync pod disruption budgets: %v", err)
return err
}
@ -148,7 +153,10 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
return fmt.Errorf("could not sync connection pooler: %v", err)
}
if len(c.Spec.Streams) > 0 {
// sync if manifest stream count is different from stream CR count
// it can be that they are always different due to grouping of manifest streams
// but we would catch missed removals on update
if len(c.Spec.Streams) != len(c.Streams) {
c.logger.Debug("syncing streams")
if err = c.syncStreams(); err != nil {
err = fmt.Errorf("could not sync streams: %v", err)
@ -230,7 +238,7 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error {
maps.Copy(annotations, cm.Annotations)
// Patroni can add extra annotations so incl. current annotations in desired annotations
desiredAnnotations := c.annotationsSet(cm.Annotations)
if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed {
if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredAnnotations)
if err != nil {
return fmt.Errorf("could not form patch for %s config map: %v", configMapName, err)
@ -275,7 +283,7 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error {
maps.Copy(annotations, ep.Annotations)
// Patroni can add extra annotations so incl. current annotations in desired annotations
desiredAnnotations := c.annotationsSet(ep.Annotations)
if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed {
if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredAnnotations)
if err != nil {
return fmt.Errorf("could not form patch for %s endpoint: %v", endpointName, err)
@ -320,7 +328,7 @@ func (c *Cluster) syncPatroniService() error {
maps.Copy(annotations, svc.Annotations)
// Patroni can add extra annotations so incl. current annotations in desired annotations
desiredAnnotations := c.annotationsSet(svc.Annotations)
if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed {
if changed, _ := c.compareAnnotations(annotations, desiredAnnotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredAnnotations)
if err != nil {
return fmt.Errorf("could not form patch for %s service: %v", serviceName, err)
@ -412,7 +420,7 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
return fmt.Errorf("could not update %s endpoint: %v", role, err)
}
} else {
if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed {
if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredEp.Annotations)
if err != nil {
return fmt.Errorf("could not form patch for %s endpoint: %v", role, err)
@ -447,22 +455,22 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
return nil
}
func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
func (c *Cluster) syncPrimaryPodDisruptionBudget(isUpdate bool) error {
var (
pdb *policyv1.PodDisruptionBudget
err error
)
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.PodDisruptionBudget = pdb
newPDB := c.generatePodDisruptionBudget()
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.PrimaryPodDisruptionBudget = pdb
newPDB := c.generatePrimaryPodDisruptionBudget()
match, reason := c.comparePodDisruptionBudget(pdb, newPDB)
if !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err = c.updatePodDisruptionBudget(newPDB); err != nil {
if err = c.updatePrimaryPodDisruptionBudget(newPDB); err != nil {
return err
}
} else {
c.PodDisruptionBudget = pdb
c.PrimaryPodDisruptionBudget = pdb
}
return nil
@ -471,21 +479,74 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
return fmt.Errorf("could not get pod disruption budget: %v", err)
}
// no existing pod disruption budget, create new one
c.logger.Infof("could not find the cluster's pod disruption budget")
c.logger.Infof("could not find the primary pod disruption budget")
if pdb, err = c.createPodDisruptionBudget(); err != nil {
if err = c.createPrimaryPodDisruptionBudget(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create pod disruption budget: %v", err)
return fmt.Errorf("could not create primary pod disruption budget: %v", err)
}
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
}
}
c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta))
c.PodDisruptionBudget = pdb
return nil
}
func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error {
var (
pdb *policyv1.PodDisruptionBudget
err error
)
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.CriticalOpPodDisruptionBudget = pdb
newPDB := c.generateCriticalOpPodDisruptionBudget()
match, reason := c.comparePodDisruptionBudget(pdb, newPDB)
if !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err = c.updateCriticalOpPodDisruptionBudget(newPDB); err != nil {
return err
}
} else {
c.CriticalOpPodDisruptionBudget = pdb
}
return nil
}
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get pod disruption budget: %v", err)
}
// no existing pod disruption budget, create new one
c.logger.Infof("could not find pod disruption budget for critical operations")
if err = c.createCriticalOpPodDisruptionBudget(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err)
}
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
}
}
return nil
}
func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error {
errors := make([]string, 0)
if err := c.syncPrimaryPodDisruptionBudget(isUpdate); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if err := c.syncCriticalOpPodDisruptionBudget(isUpdate); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}
@ -497,6 +558,7 @@ func (c *Cluster) syncStatefulSet() error {
)
podsToRecreate := make([]v1.Pod, 0)
isSafeToRecreatePods := true
postponeReasons := make([]string, 0)
switchoverCandidates := make([]spec.NamespacedName, 0)
pods, err := c.listPods()
@ -561,13 +623,22 @@ func (c *Cluster) syncStatefulSet() error {
cmp := c.compareStatefulSetWith(desiredSts)
if !cmp.rollingUpdate {
updatedPodAnnotations := map[string]*string{}
for _, anno := range cmp.deletedPodAnnotations {
updatedPodAnnotations[anno] = nil
}
for anno, val := range desiredSts.Spec.Template.Annotations {
updatedPodAnnotations[anno] = &val
}
metadataReq := map[string]map[string]map[string]*string{"metadata": {"annotations": updatedPodAnnotations}}
patch, err := json.Marshal(metadataReq)
if err != nil {
return fmt.Errorf("could not form patch for pod annotations: %v", err)
}
for _, pod := range pods {
if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations); changed {
patchData, err := metaAnnotationsPatch(desiredSts.Spec.Template.Annotations)
if err != nil {
return fmt.Errorf("could not form patch for pod %q annotations: %v", pod.Name, err)
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if changed, _ := c.compareAnnotations(pod.Annotations, desiredSts.Spec.Template.Annotations, nil); changed {
_, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not patch annotations for pod %q: %v", pod.Name, err)
}
@ -646,12 +717,14 @@ func (c *Cluster) syncStatefulSet() error {
c.logger.Debug("syncing Patroni config")
if configPatched, restartPrimaryFirst, restartWait, err = c.syncPatroniConfig(pods, c.Spec.Patroni, requiredPgParameters); err != nil {
c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
postponeReasons = append(postponeReasons, "errors during Patroni config sync")
isSafeToRecreatePods = false
}
// restart Postgres where it is still pending
if err = c.restartInstances(pods, restartWait, restartPrimaryFirst); err != nil {
c.logger.Errorf("errors while restarting Postgres in pods via Patroni API: %v", err)
postponeReasons = append(postponeReasons, "errors while restarting Postgres via Patroni API")
isSafeToRecreatePods = false
}
@ -666,7 +739,7 @@ func (c *Cluster) syncStatefulSet() error {
}
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
} else {
c.logger.Warningf("postpone pod recreation until next sync because of errors during config sync")
c.logger.Warningf("postpone pod recreation until next sync - reason: %s", strings.Join(postponeReasons, `', '`))
}
}
@ -1142,7 +1215,7 @@ func (c *Cluster) updateSecret(
c.Secrets[secret.UID] = secret
}
if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations); changed {
if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(generatedSecret.Annotations)
if err != nil {
return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err)
@ -1587,19 +1660,38 @@ func (c *Cluster) syncLogicalBackupJob() error {
}
c.logger.Infof("logical backup job %s updated", c.getLogicalBackupJobName())
}
if match, reason := c.compareLogicalBackupJob(job, desiredJob); !match {
if cmp := c.compareLogicalBackupJob(job, desiredJob); !cmp.match {
c.logger.Infof("logical job %s is not in the desired state and needs to be updated",
c.getLogicalBackupJobName(),
)
if reason != "" {
c.logger.Infof("reason: %s", reason)
if len(cmp.reasons) != 0 {
for _, reason := range cmp.reasons {
c.logger.Infof("reason: %s", reason)
}
}
if len(cmp.deletedPodAnnotations) != 0 {
templateMetadataReq := map[string]map[string]map[string]map[string]map[string]map[string]map[string]*string{
"spec": {"jobTemplate": {"spec": {"template": {"metadata": {"annotations": {}}}}}}}
for _, anno := range cmp.deletedPodAnnotations {
templateMetadataReq["spec"]["jobTemplate"]["spec"]["template"]["metadata"]["annotations"][anno] = nil
}
patch, err := json.Marshal(templateMetadataReq)
if err != nil {
return fmt.Errorf("could not marshal ObjectMeta for logical backup job %q pod template: %v", jobName, err)
}
job, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(), jobName, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "")
if err != nil {
c.logger.Errorf("failed to remove annotations from the logical backup job %q pod template: %v", jobName, err)
return err
}
}
if err = c.patchLogicalBackupJob(desiredJob); err != nil {
return fmt.Errorf("could not update logical backup job to match desired state: %v", err)
}
c.logger.Info("the logical backup job is synced")
}
if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations); changed {
if changed, _ := c.compareAnnotations(job.Annotations, desiredJob.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(desiredJob.Annotations)
if err != nil {
return fmt.Errorf("could not form patch for the logical backup job %q: %v", jobName, err)

View File

@ -142,6 +142,181 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) {
}
}
func TestPodAnnotationsSync(t *testing.T) {
clusterName := "acid-test-cluster-2"
namespace := "default"
podAnnotation := "no-scale-down"
podAnnotations := map[string]string{podAnnotation: "true"}
customPodAnnotation := "foo"
customPodAnnotations := map[string]string{customPodAnnotation: "true"}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockClient := mocks.NewMockHTTPClient(ctrl)
client, _ := newFakeK8sAnnotationsClient()
pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
},
Spec: acidv1.PostgresSpec{
Volume: acidv1.Volume{
Size: "1Gi",
},
EnableConnectionPooler: boolToPointer(true),
EnableLogicalBackup: true,
EnableReplicaConnectionPooler: boolToPointer(true),
PodAnnotations: podAnnotations,
NumberOfInstances: 2,
},
}
var cluster = New(
Config{
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),
PatroniAPICheckTimeout: time.Duration(5),
PodManagementPolicy: "ordered_ready",
CustomPodAnnotations: customPodAnnotations,
ConnectionPooler: config.ConnectionPooler{
ConnectionPoolerDefaultCPURequest: "100m",
ConnectionPoolerDefaultCPULimit: "100m",
ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnectionPoolerDefaultMemoryLimit: "100Mi",
NumberOfInstances: k8sutil.Int32ToPointer(1),
},
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
MaxInstances: -1,
PodRoleLabel: "spilo-role",
ResourceCheckInterval: time.Duration(3),
ResourceCheckTimeout: time.Duration(10),
},
},
}, client, pg, logger, eventRecorder)
configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}`
response := http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(configJson))),
}
mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes()
cluster.patroni = patroni.New(patroniLogger, mockClient)
cluster.Name = clusterName
cluster.Namespace = namespace
clusterOptions := clusterLabelsOptions(cluster)
// create a statefulset
_, err := cluster.createStatefulSet()
assert.NoError(t, err)
// create a pods
podsList := createPods(cluster)
for _, pod := range podsList {
_, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
assert.NoError(t, err)
}
// create connection pooler
_, err = cluster.createConnectionPooler(mockInstallLookupFunction)
assert.NoError(t, err)
// create cron job
err = cluster.createLogicalBackupJob()
assert.NoError(t, err)
annotateResources(cluster)
err = cluster.Sync(&cluster.Postgresql)
assert.NoError(t, err)
// 1. PodAnnotations set
stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, sts := range stsList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.Contains(t, sts.Spec.Template.Annotations, annotation)
}
}
for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
assert.NoError(t, err)
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.Contains(t, deploy.Spec.Template.Annotations, annotation,
fmt.Sprintf("pooler deployment pod template %s should contain annotation %s, found %#v",
deploy.Name, annotation, deploy.Spec.Template.Annotations))
}
}
podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, pod := range podList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.Contains(t, pod.Annotations, annotation,
fmt.Sprintf("pod %s should contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations))
}
}
cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, cronJob := range cronJobList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.Contains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation,
fmt.Sprintf("logical backup cron job's pod template should contain annotation %s, found %#v",
annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations))
}
}
// 2 PodAnnotations removed
newSpec := cluster.Postgresql.DeepCopy()
newSpec.Spec.PodAnnotations = nil
cluster.OpConfig.CustomPodAnnotations = nil
err = cluster.Sync(newSpec)
assert.NoError(t, err)
stsList, err = cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, sts := range stsList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.NotContains(t, sts.Spec.Template.Annotations, annotation)
}
}
for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
assert.NoError(t, err)
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.NotContains(t, deploy.Spec.Template.Annotations, annotation,
fmt.Sprintf("pooler deployment pod template %s should not contain annotation %s, found %#v",
deploy.Name, annotation, deploy.Spec.Template.Annotations))
}
}
podList, err = cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, pod := range podList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.NotContains(t, pod.Annotations, annotation,
fmt.Sprintf("pod %s should not contain annotation %s, found %#v", pod.Name, annotation, pod.Annotations))
}
}
cronJobList, err = cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, cronJob := range cronJobList.Items {
for _, annotation := range []string{podAnnotation, customPodAnnotation} {
assert.NotContains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, annotation,
fmt.Sprintf("logical backup cron job's pod template should not contain annotation %s, found %#v",
annotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations))
}
}
}
func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
testName := "test config comparison"
client, _ := newFakeK8sSyncClient()

View File

@ -58,15 +58,16 @@ type WorkerStatus struct {
// ClusterStatus describes status of the cluster
type ClusterStatus struct {
Team string
Cluster string
Namespace string
MasterService *v1.Service
ReplicaService *v1.Service
MasterEndpoint *v1.Endpoints
ReplicaEndpoint *v1.Endpoints
StatefulSet *appsv1.StatefulSet
PodDisruptionBudget *policyv1.PodDisruptionBudget
Team string
Cluster string
Namespace string
MasterService *v1.Service
ReplicaService *v1.Service
MasterEndpoint *v1.Endpoints
ReplicaEndpoint *v1.Endpoints
StatefulSet *appsv1.StatefulSet
PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
CurrentProcess Process
Worker uint32

View File

@ -663,7 +663,7 @@ func parseResourceRequirements(resourcesRequirement v1.ResourceRequirements) (ac
return resources, nil
}
func isInMainternanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool {
func isInMaintenanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool {
if len(specMaintenanceWindows) == 0 {
return true
}

View File

@ -247,18 +247,18 @@ func createPods(cluster *Cluster) []v1.Pod {
for i, role := range []PostgresRole{Master, Replica} {
podsList = append(podsList, v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", clusterName, i),
Name: fmt.Sprintf("%s-%d", cluster.Name, i),
Namespace: namespace,
Labels: map[string]string{
"application": "spilo",
"cluster-name": clusterName,
"cluster-name": cluster.Name,
"spilo-role": string(role),
},
},
})
podsList = append(podsList, v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-pooler-%s", clusterName, role),
Name: fmt.Sprintf("%s-pooler-%s", cluster.Name, role),
Namespace: namespace,
Labels: cluster.connectionPoolerLabels(role, true).MatchLabels,
},
@ -329,7 +329,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
if err != nil {
return nil, err
}
_, err = cluster.createPodDisruptionBudget()
err = cluster.createPodDisruptionBudgets()
if err != nil {
return nil, err
}
@ -705,8 +705,8 @@ func TestIsInMaintenanceWindow(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cluster.Spec.MaintenanceWindows = tt.windows
if isInMainternanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected {
t.Errorf("Expected isInMainternanceWindow to return %t", tt.expected)
if isInMaintenanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected {
t.Errorf("Expected isInMaintenanceWindow to return %t", tt.expected)
}
})
}

View File

@ -225,7 +225,7 @@ func (c *Cluster) syncVolumeClaims() error {
}
newAnnotations := c.annotationsSet(nil)
if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations); changed {
if changed, _ := c.compareAnnotations(pvc.Annotations, newAnnotations, nil); changed {
patchData, err := metaAnnotationsPatch(newAnnotations)
if err != nil {
return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err)

View File

@ -20,19 +20,19 @@ import (
)
const (
failoverPath = "/failover"
configPath = "/config"
clusterPath = "/cluster"
statusPath = "/patroni"
restartPath = "/restart"
ApiPort = 8008
timeout = 30 * time.Second
switchoverPath = "/switchover"
configPath = "/config"
clusterPath = "/cluster"
statusPath = "/patroni"
restartPath = "/restart"
ApiPort = 8008
timeout = 30 * time.Second
)
// Interface describe patroni methods
type Interface interface {
GetClusterMembers(master *v1.Pod) ([]ClusterMember, error)
Switchover(master *v1.Pod, candidate string) error
Switchover(master *v1.Pod, candidate string, scheduled_at string) error
SetPostgresParameters(server *v1.Pod, options map[string]string) error
SetStandbyClusterParameters(server *v1.Pod, options map[string]interface{}) error
GetMemberData(server *v1.Pod) (MemberData, error)
@ -103,7 +103,7 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer)
}
}()
if resp.StatusCode != http.StatusOK {
if resp.StatusCode < http.StatusOK || resp.StatusCode >= 300 {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("could not read response: %v", err)
@ -128,7 +128,7 @@ func (p *Patroni) httpGet(url string) (string, error) {
return "", fmt.Errorf("could not read response: %v", err)
}
if response.StatusCode != http.StatusOK {
if response.StatusCode < http.StatusOK || response.StatusCode >= 300 {
return string(bodyBytes), fmt.Errorf("patroni returned '%d'", response.StatusCode)
}
@ -136,9 +136,9 @@ func (p *Patroni) httpGet(url string) (string, error) {
}
// Switchover by calling Patroni REST API
func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
func (p *Patroni) Switchover(master *v1.Pod, candidate string, scheduled_at string) error {
buf := &bytes.Buffer{}
err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate})
err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate, "scheduled_at": scheduled_at})
if err != nil {
return fmt.Errorf("could not encode json: %v", err)
}
@ -146,7 +146,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
if err != nil {
return err
}
return p.httpPostOrPatch(http.MethodPost, apiURLString+failoverPath, buf)
return p.httpPostOrPatch(http.MethodPost, apiURLString+switchoverPath, buf)
}
//TODO: add an option call /patroni to check if it is necessary to restart the server

View File

@ -81,8 +81,6 @@ spec:
]
}
# Exemple of settings to make snapshot view working in the ui when using AWS
# - name: WALE_S3_ENDPOINT
# value: https+path://s3.us-east-1.amazonaws.com:443
# - name: SPILO_S3_BACKUP_PREFIX
# value: spilo/
# - name: AWS_ACCESS_KEY_ID
@ -102,5 +100,3 @@ spec:
# key: AWS_DEFAULT_REGION
# - name: SPILO_S3_BACKUP_BUCKET
# value: <s3 bucket used by the operator>
# - name: "USE_AWS_INSTANCE_PROFILE"
# value: "true"

View File

@ -95,14 +95,6 @@ DEFAULT_MEMORY_LIMIT = getenv('DEFAULT_MEMORY_LIMIT', '300Mi')
DEFAULT_CPU = getenv('DEFAULT_CPU', '10m')
DEFAULT_CPU_LIMIT = getenv('DEFAULT_CPU_LIMIT', '300m')
WALE_S3_ENDPOINT = getenv(
'WALE_S3_ENDPOINT',
'https+path://s3.eu-central-1.amazonaws.com:443',
)
USE_AWS_INSTANCE_PROFILE = (
getenv('USE_AWS_INSTANCE_PROFILE', 'false').lower() != 'false'
)
AWS_ENDPOINT = getenv('AWS_ENDPOINT')
@ -784,8 +776,6 @@ def get_versions(pg_cluster: str):
bucket=SPILO_S3_BACKUP_BUCKET,
pg_cluster=pg_cluster,
prefix=SPILO_S3_BACKUP_PREFIX,
s3_endpoint=WALE_S3_ENDPOINT,
use_aws_instance_profile=USE_AWS_INSTANCE_PROFILE,
),
)
@ -797,9 +787,8 @@ def get_basebackups(pg_cluster: str, uid: str):
bucket=SPILO_S3_BACKUP_BUCKET,
pg_cluster=pg_cluster,
prefix=SPILO_S3_BACKUP_PREFIX,
s3_endpoint=WALE_S3_ENDPOINT,
uid=uid,
use_aws_instance_profile=USE_AWS_INSTANCE_PROFILE,
postgresql_versions=OPERATOR_UI_CONFIG.get('postgresql_versions', DEFAULT_UI_CONFIG['postgresql_versions']),
),
)
@ -991,8 +980,6 @@ def main(port, debug, clusters: list):
logger.info(f'Superuser team: {SUPERUSER_TEAM}')
logger.info(f'Target namespace: {TARGET_NAMESPACE}')
logger.info(f'Teamservice URL: {TEAM_SERVICE_URL}')
logger.info(f'Use AWS instance_profile: {USE_AWS_INSTANCE_PROFILE}')
logger.info(f'WAL-E S3 endpoint: {WALE_S3_ENDPOINT}')
logger.info(f'AWS S3 endpoint: {AWS_ENDPOINT}')
if TARGET_NAMESPACE is None:

View File

@ -6,9 +6,8 @@ from os import environ, getenv
from requests import Session
from urllib.parse import urljoin
from uuid import UUID
from wal_e.cmd import configure_backup_cxt
from .utils import Attrs, defaulting, these
from .utils import defaulting, these
from operator_ui.adapters.logger import logger
session = Session()
@ -284,10 +283,8 @@ def read_stored_clusters(bucket, prefix, delimiter='/'):
def read_versions(
pg_cluster,
bucket,
s3_endpoint,
prefix,
delimiter='/',
use_aws_instance_profile=False,
):
return [
'base' if uid == 'wal' else uid
@ -305,35 +302,72 @@ def read_versions(
if uid == 'wal' or defaulting(lambda: UUID(uid))
]
BACKUP_VERSION_PREFIXES = ['', '10/', '11/', '12/', '13/', '14/', '15/', '16/', '17/']
def lsn_to_wal_segment_stop(finish_lsn, start_segment, wal_segment_size=16 * 1024 * 1024):
timeline = int(start_segment[:8], 16)
log_id = finish_lsn >> 32
seg_id = (finish_lsn & 0xFFFFFFFF) // wal_segment_size
return f"{timeline:08X}{log_id:08X}{seg_id:08X}"
def lsn_to_offset_hex(lsn, wal_segment_size=16 * 1024 * 1024):
return f"{lsn % wal_segment_size:08X}"
def read_basebackups(
pg_cluster,
uid,
bucket,
s3_endpoint,
prefix,
delimiter='/',
use_aws_instance_profile=False,
postgresql_versions,
):
environ['WALE_S3_ENDPOINT'] = s3_endpoint
suffix = '' if uid == 'base' else '/' + uid
backups = []
for vp in BACKUP_VERSION_PREFIXES:
for vp in postgresql_versions:
backup_prefix = f'{prefix}{pg_cluster}{suffix}/wal/{vp}/basebackups_005/'
logger.info(f"{bucket}/{backup_prefix}")
backups = backups + [
{
key: value
for key, value in basebackup.__dict__.items()
if isinstance(value, str) or isinstance(value, int)
}
for basebackup in Attrs.call(
f=configure_backup_cxt,
aws_instance_profile=use_aws_instance_profile,
s3_prefix=f's3://{bucket}/{prefix}{pg_cluster}{suffix}/wal/{vp}',
)._backup_list(detail=True)
]
paginator = client('s3').get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=backup_prefix)
for page in pages:
for obj in page.get("Contents", []):
key = obj["Key"]
if not key.endswith("backup_stop_sentinel.json"):
continue
response = client('s3').get_object(Bucket=bucket, Key=key)
backup_info = loads(response["Body"].read().decode("utf-8"))
last_modified = response["LastModified"].astimezone(timezone.utc).isoformat()
backup_name = key.split("/")[-1].replace("_backup_stop_sentinel.json", "")
start_seg, start_offset = backup_name.split("_")[1], backup_name.split("_")[-1] if "_" in backup_name else None
if "LSN" in backup_info and "FinishLSN" in backup_info:
# WAL-G
lsn = backup_info["LSN"]
finish_lsn = backup_info["FinishLSN"]
backups.append({
"expanded_size_bytes": backup_info.get("UncompressedSize"),
"last_modified": last_modified,
"name": backup_name,
"wal_segment_backup_start": start_seg,
"wal_segment_backup_stop": lsn_to_wal_segment_stop(finish_lsn, start_seg),
"wal_segment_offset_backup_start": lsn_to_offset_hex(lsn),
"wal_segment_offset_backup_stop": lsn_to_offset_hex(finish_lsn),
})
elif "wal_segment_backup_stop" in backup_info:
# WAL-E
stop_seg = backup_info["wal_segment_backup_stop"]
stop_offset = backup_info["wal_segment_offset_backup_stop"]
backups.append({
"expanded_size_bytes": backup_info.get("expanded_size_bytes"),
"last_modified": last_modified,
"name": backup_name,
"wal_segment_backup_start": start_seg,
"wal_segment_backup_stop": stop_seg,
"wal_segment_offset_backup_start": start_offset,
"wal_segment_offset_backup_stop": stop_offset,
})
return backups

View File

@ -11,5 +11,4 @@ kubernetes==11.0.0
python-json-logger==2.0.7
requests==2.32.2
stups-tokens>=1.1.19
wal_e==1.1.1
werkzeug==3.0.6