Merge branch 'master' into master

This commit is contained in:
Allen Conlon 2025-02-18 14:32:30 -05:00 committed by GitHub
commit 7cdf692f37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 518 additions and 130 deletions

View File

@ -620,22 +620,34 @@ By default the topology key for the pod anti affinity is set to
`kubernetes.io/hostname`, you can set another topology key e.g.
`failure-domain.beta.kubernetes.io/zone`. See [built-in node labels](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#interlude-built-in-node-labels) for available topology keys.
## Pod Disruption Budget
## Pod Disruption Budgets
By default the operator uses a PodDisruptionBudget (PDB) to protect the cluster
from voluntarily disruptions and hence unwanted DB downtime. The `MinAvailable`
parameter of the PDB is set to `1` which prevents killing masters in single-node
clusters and/or the last remaining running instance in a multi-node cluster.
By default the operator creates two PodDisruptionBudgets (PDB) to protect the cluster
from voluntarily disruptions and hence unwanted DB downtime: so-called primary PDB and
and PDB for critical operations.
### Primary PDB
The `MinAvailable` parameter of this PDB is set to `1` and, if `pdb_master_label_selector`
is enabled, label selector includes `spilo-role=master` condition, which prevents killing
masters in single-node clusters and/or the last remaining running instance in a multi-node
cluster.
## PDB for critical operations
The `MinAvailable` parameter of this PDB is equal to the `numberOfInstances` set in the
cluster manifest, while label selector includes `critical-operation=true` condition. This
allows to protect all pods of a cluster, given they are labeled accordingly.
For example, Operator labels all Spilo pods with `critical-operation=true` during the major
version upgrade run. You may want to protect cluster pods during other critical operations
by assigning the label to pods yourself or using other means of automation.
The PDB is only relaxed in two scenarios:
* If a cluster is scaled down to `0` instances (e.g. for draining nodes)
* If the PDB is disabled in the configuration (`enable_pod_disruption_budget`)
The PDB is still in place having `MinAvailable` set to `0`. If enabled it will
be automatically set to `1` on scale up. Disabling PDBs helps avoiding blocking
Kubernetes upgrades in managed K8s environments at the cost of prolonged DB
downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384)
The PDBs are still in place having `MinAvailable` set to `0`. Disabling PDBs
helps avoiding blocking Kubernetes upgrades in managed K8s environments at the
cost of prolonged DB downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384)
for the use case.
## Add cluster-specific labels

View File

@ -230,7 +230,7 @@ kubectl delete postgresql acid-minimal-cluster
```
This should remove the associated StatefulSet, database Pods, Services and
Endpoints. The PersistentVolumes are released and the PodDisruptionBudget is
Endpoints. The PersistentVolumes are released and the PodDisruptionBudgets are
deleted. Secrets however are not deleted and backups will remain in place.
When deleting a cluster while it is still starting up or got stuck during that

View File

@ -334,13 +334,13 @@ configuration they are grouped under the `kubernetes` key.
pod namespace).
* **pdb_name_format**
defines the template for PDB (Pod Disruption Budget) names created by the
defines the template for primary PDB (Pod Disruption Budget) name created by the
operator. The default is `postgres-{cluster}-pdb`, where `{cluster}` is
replaced by the cluster name. Only the `{cluster}` placeholders is allowed in
the template.
* **pdb_master_label_selector**
By default the PDB will match the master role hence preventing nodes to be
By default the primary PDB will match the master role hence preventing nodes to be
drained if the node_readiness_label is not used. If this option if set to
`false` the `spilo-role=master` selector will not be added to the PDB.

View File

@ -1752,9 +1752,13 @@ class EndToEndTestCase(unittest.TestCase):
Test password rotation and removal of users due to retention policy
'''
k8s = self.k8s
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
leader = k8s.get_cluster_leader_pod()
today = date.today()
# remember number of secrets to make sure it stays the same
secret_count = k8s.count_secrets_with_label(cluster_label)
# enable password rotation for owner of foo database
pg_patch_rotation_single_users = {
"spec": {
@ -1810,6 +1814,7 @@ class EndToEndTestCase(unittest.TestCase):
enable_password_rotation = {
"data": {
"enable_password_rotation": "true",
"inherited_annotations": "environment",
"password_rotation_interval": "30",
"password_rotation_user_retention": "30", # should be set to 60
},
@ -1856,13 +1861,29 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: len(self.query_database_with_user(leader.metadata.name, "postgres", "SELECT 1", "foo_user")), 1,
"Could not connect to the database with rotation user {}".format(rotation_user), 10, 5)
# add annotation which triggers syncSecrets call
pg_annotation_patch = {
"metadata": {
"annotations": {
"environment": "test",
}
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_annotation_patch)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
time.sleep(10)
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), secret_count, "Unexpected number of secrets")
# check if rotation has been ignored for user from test_cross_namespace_secrets test
db_user_secret = k8s.get_secret(username="test.db_user", namespace="test")
secret_username = str(base64.b64decode(db_user_secret.data["username"]), 'utf-8')
self.assertEqual("test.db_user", secret_username,
"Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username))
# check if annotation for secret has been updated
self.assertTrue("environment" in db_user_secret.metadata.annotations, "Added annotation was not propagated to secret")
# disable password rotation for all other users (foo_user)
# and pick smaller intervals to see if the third fake rotation user is dropped
enable_password_rotation = {
@ -2100,7 +2121,7 @@ class EndToEndTestCase(unittest.TestCase):
patch_sset_propagate_annotations = {
"data": {
"downscaler_annotations": "deployment-time,downscaler/*",
"inherited_annotations": "owned-by",
"inherited_annotations": "environment,owned-by",
}
}
k8s.update_config(patch_sset_propagate_annotations)
@ -2547,7 +2568,10 @@ class EndToEndTestCase(unittest.TestCase):
self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed")
pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed")
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "primary pod disruption budget owner reference check failed")
pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-op-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget for critical operations owner reference check failed")
pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed")

View File

@ -59,16 +59,17 @@ type Config struct {
}
type kubeResources struct {
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
VolumeClaims map[types.UID]*v1.PersistentVolumeClaim
PodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
VolumeClaims map[types.UID]*v1.PersistentVolumeClaim
PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
}
@ -343,14 +344,10 @@ func (c *Cluster) Create() (err error) {
c.logger.Infof("secrets have been successfully created")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created")
if c.PodDisruptionBudget != nil {
return fmt.Errorf("pod disruption budget already exists in the cluster")
if err = c.createPodDisruptionBudgets(); err != nil {
return fmt.Errorf("could not create pod disruption budgets: %v", err)
}
pdb, err := c.createPodDisruptionBudget()
if err != nil {
return fmt.Errorf("could not create pod disruption budget: %v", err)
}
c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta))
c.logger.Info("pod disruption budgets have been successfully created")
if c.Statefulset != nil {
return fmt.Errorf("statefulset already exists in the cluster")
@ -1037,10 +1034,18 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations, nil)
initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
if initUsers {
// if inherited annotations differ secrets have to be synced on update
newAnnotations := c.annotationsSet(nil)
oldAnnotations := make(map[string]string)
for _, secret := range c.Secrets {
oldAnnotations = secret.ObjectMeta.Annotations
break
}
annotationsChanged, _ := c.compareAnnotations(oldAnnotations, newAnnotations, nil)
if initUsers || annotationsChanged {
c.logger.Debug("initialize users")
if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
@ -1048,8 +1053,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
return
}
}
if initUsers || annotationsChanged {
c.logger.Debug("syncing secrets")
//TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
@ -1081,9 +1085,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
}
// pod disruption budget
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
// pod disruption budgets
if err := c.syncPodDisruptionBudgets(true); err != nil {
c.logger.Errorf("could not sync pod disruption budgets: %v", err)
updateFailed = true
}
@ -1228,10 +1232,10 @@ func (c *Cluster) Delete() error {
c.logger.Info("not deleting secrets because disabled in configuration")
}
if err := c.deletePodDisruptionBudget(); err != nil {
if err := c.deletePodDisruptionBudgets(); err != nil {
anyErrors = true
c.logger.Warningf("could not delete pod disruption budget: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err)
c.logger.Warningf("could not delete pod disruption budgets: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budgets: %v", err)
}
for _, role := range []PostgresRole{Master, Replica} {
@ -1730,16 +1734,17 @@ func (c *Cluster) GetCurrentProcess() Process {
// GetStatus provides status of the cluster
func (c *Cluster) GetStatus() *ClusterStatus {
status := &ClusterStatus{
Cluster: c.Name,
Namespace: c.Namespace,
Team: c.Spec.TeamID,
Status: c.Status,
Spec: c.Spec,
MasterService: c.GetServiceMaster(),
ReplicaService: c.GetServiceReplica(),
StatefulSet: c.GetStatefulSet(),
PodDisruptionBudget: c.GetPodDisruptionBudget(),
CurrentProcess: c.GetCurrentProcess(),
Cluster: c.Name,
Namespace: c.Namespace,
Team: c.Spec.TeamID,
Status: c.Status,
Spec: c.Spec,
MasterService: c.GetServiceMaster(),
ReplicaService: c.GetServiceReplica(),
StatefulSet: c.GetStatefulSet(),
PrimaryPodDisruptionBudget: c.GetPrimaryPodDisruptionBudget(),
CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(),
CurrentProcess: c.GetCurrentProcess(),
Error: fmt.Errorf("error: %s", c.Error),
}

View File

@ -109,10 +109,15 @@ func (c *Cluster) servicePort(role PostgresRole) int32 {
return pgPort
}
func (c *Cluster) podDisruptionBudgetName() string {
func (c *Cluster) PrimaryPodDisruptionBudgetName() string {
return c.OpConfig.PDBNameFormat.Format("cluster", c.Name)
}
func (c *Cluster) criticalOpPodDisruptionBudgetName() string {
pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-op-pdb")
return pdbTemplate.Format("cluster", c.Name)
}
func makeDefaultResources(config *config.Config) acidv1.Resources {
defaultRequests := acidv1.ResourceDescription{
@ -2207,7 +2212,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript
return result
}
func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget {
func (c *Cluster) generatePrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget {
minAvailable := intstr.FromInt(1)
pdbEnabled := c.OpConfig.EnablePodDisruptionBudget
pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector
@ -2225,7 +2230,36 @@ func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: c.podDisruptionBudgetName(),
Name: c.PrimaryPodDisruptionBudgetName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: c.annotationsSet(nil),
OwnerReferences: c.ownerReferences(),
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
},
}
}
func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget {
minAvailable := intstr.FromInt32(c.Spec.NumberOfInstances)
pdbEnabled := c.OpConfig.EnablePodDisruptionBudget
// if PodDisruptionBudget is disabled or if there are no DB pods, set the budget to 0.
if (pdbEnabled != nil && !(*pdbEnabled)) || c.Spec.NumberOfInstances <= 0 {
minAvailable = intstr.FromInt(0)
}
labels := c.labelsSet(false)
labels["critical-operation"] = "true"
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: c.criticalOpPodDisruptionBudgetName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: c.annotationsSet(nil),

View File

@ -2349,22 +2349,34 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
}
}
testLabelsAndSelectors := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector
if podDisruptionBudget.ObjectMeta.Namespace != "myapp" {
return fmt.Errorf("Object Namespace incorrect.")
}
if !reflect.DeepEqual(podDisruptionBudget.Labels, map[string]string{"team": "myapp", "cluster-name": "myapp-database"}) {
return fmt.Errorf("Labels incorrect.")
}
if !masterLabelSelectorDisabled &&
!reflect.DeepEqual(podDisruptionBudget.Spec.Selector, &metav1.LabelSelector{
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}) {
testLabelsAndSelectors := func(isPrimary bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
return func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector
if podDisruptionBudget.ObjectMeta.Namespace != "myapp" {
return fmt.Errorf("Object Namespace incorrect.")
}
expectedLabels := map[string]string{"team": "myapp", "cluster-name": "myapp-database"}
if !reflect.DeepEqual(podDisruptionBudget.Labels, expectedLabels) {
return fmt.Errorf("Labels incorrect, got %#v, expected %#v", podDisruptionBudget.Labels, expectedLabels)
}
if !masterLabelSelectorDisabled {
if isPrimary {
expectedLabels := &metav1.LabelSelector{
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}
if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) {
return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels)
}
} else {
expectedLabels := &metav1.LabelSelector{
MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operation": "true"}}
if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) {
return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels)
}
}
}
return fmt.Errorf("MatchLabels incorrect.")
return nil
}
return nil
}
testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
@ -2400,7 +2412,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2417,7 +2429,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2434,7 +2446,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2451,7 +2463,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-databass-budget"),
hasMinAvailable(1),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2468,7 +2480,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
{
@ -2485,13 +2497,99 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1),
testLabelsAndSelectors,
testLabelsAndSelectors(true),
},
},
}
for _, tt := range tests {
result := tt.spec.generatePodDisruptionBudget()
result := tt.spec.generatePrimaryPodDisruptionBudget()
for _, check := range tt.check {
err := check(tt.spec, result)
if err != nil {
t.Errorf("%s [%s]: PodDisruptionBudget spec is incorrect, %+v",
testName, tt.scenario, err)
}
}
}
testCriticalOp := []struct {
scenario string
spec *Cluster
check []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error
}{
{
scenario: "With multiple instances",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(3),
testLabelsAndSelectors(false),
},
},
{
scenario: "With zero instances",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors(false),
},
},
{
scenario: "With PodDisruptionBudget disabled",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors(false),
},
},
{
scenario: "With OwnerReference enabled",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(3),
testLabelsAndSelectors(false),
},
},
}
for _, tt := range testCriticalOp {
result := tt.spec.generateCriticalOpPodDisruptionBudget()
for _, check := range tt.check {
err := check(tt.spec, result)
if err != nil {

View File

@ -106,6 +106,22 @@ func (c *Cluster) removeFailuresAnnotation() error {
return nil
}
func (c *Cluster) criticalOperationLabel(pods []v1.Pod, value *string) error {
metadataReq := map[string]map[string]map[string]*string{"metadata": {"labels": {"critical-operation": value}}}
patchReq, err := json.Marshal(metadataReq)
if err != nil {
return fmt.Errorf("could not marshal ObjectMeta: %v", err)
}
for _, pod := range pods {
_, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchReq, metav1.PatchOptions{})
if err != nil {
return err
}
}
return nil
}
/*
Execute upgrade when mode is set to manual or full or when the owning team is allowed for upgrade (and mode is "off").
@ -224,6 +240,17 @@ func (c *Cluster) majorVersionUpgrade() error {
if allRunning && masterPod != nil {
c.logger.Infof("healthy cluster ready to upgrade, current: %d desired: %d", c.currentMajorVersion, desiredVersion)
if c.currentMajorVersion < desiredVersion {
defer func() error {
if err = c.criticalOperationLabel(pods, nil); err != nil {
return fmt.Errorf("failed to remove critical-operation label: %s", err)
}
return nil
}()
val := "true"
if err = c.criticalOperationLabel(pods, &val); err != nil {
return fmt.Errorf("failed to assign critical-operation label: %s", err)
}
podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name}
c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)

View File

@ -23,8 +23,13 @@ const (
)
func (c *Cluster) listResources() error {
if c.PodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID)
if c.PrimaryPodDisruptionBudget != nil {
c.logger.Infof("found primary pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta), c.PrimaryPodDisruptionBudget.UID)
}
if c.CriticalOpPodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget for critical operations: %q (uid: %q)", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta), c.CriticalOpPodDisruptionBudget.UID)
}
if c.Statefulset != nil {
@ -417,59 +422,128 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset
return result
}
func (c *Cluster) createPodDisruptionBudget() (*policyv1.PodDisruptionBudget, error) {
podDisruptionBudgetSpec := c.generatePodDisruptionBudget()
func (c *Cluster) createPrimaryPodDisruptionBudget() error {
c.logger.Debug("creating primary pod disruption budget")
if c.PrimaryPodDisruptionBudget != nil {
c.logger.Warning("primary pod disruption budget already exists in the cluster")
return nil
}
podDisruptionBudgetSpec := c.generatePrimaryPodDisruptionBudget()
podDisruptionBudget, err := c.KubeClient.
PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
if err != nil {
return nil, err
return err
}
c.PodDisruptionBudget = podDisruptionBudget
c.logger.Infof("primary pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta))
c.PrimaryPodDisruptionBudget = podDisruptionBudget
return podDisruptionBudget, nil
return nil
}
func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
if c.PodDisruptionBudget == nil {
return fmt.Errorf("there is no pod disruption budget in the cluster")
func (c *Cluster) createCriticalOpPodDisruptionBudget() error {
c.logger.Debug("creating pod disruption budget for critical operations")
if c.CriticalOpPodDisruptionBudget != nil {
c.logger.Warning("pod disruption budget for critical operations already exists in the cluster")
return nil
}
if err := c.deletePodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete pod disruption budget: %v", err)
podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget()
podDisruptionBudget, err := c.KubeClient.
PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
if err != nil {
return err
}
c.logger.Infof("pod disruption budget for critical operations %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta))
c.CriticalOpPodDisruptionBudget = podDisruptionBudget
return nil
}
func (c *Cluster) createPodDisruptionBudgets() error {
errors := make([]string, 0)
err := c.createPrimaryPodDisruptionBudget()
if err != nil {
errors = append(errors, fmt.Sprintf("could not create primary pod disruption budget: %v", err))
}
err = c.createCriticalOpPodDisruptionBudget()
if err != nil {
errors = append(errors, fmt.Sprintf("could not create pod disruption budget for critical operations: %v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}
func (c *Cluster) updatePrimaryPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
c.logger.Debug("updating primary pod disruption budget")
if c.PrimaryPodDisruptionBudget == nil {
return fmt.Errorf("there is no primary pod disruption budget in the cluster")
}
if err := c.deletePrimaryPodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
}
newPdb, err := c.KubeClient.
PodDisruptionBudgets(pdb.Namespace).
Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create pod disruption budget: %v", err)
return fmt.Errorf("could not create primary pod disruption budget: %v", err)
}
c.PodDisruptionBudget = newPdb
c.PrimaryPodDisruptionBudget = newPdb
return nil
}
func (c *Cluster) deletePodDisruptionBudget() error {
c.logger.Debug("deleting pod disruption budget")
if c.PodDisruptionBudget == nil {
c.logger.Debug("there is no pod disruption budget in the cluster")
func (c *Cluster) updateCriticalOpPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
c.logger.Debug("updating pod disruption budget for critical operations")
if c.CriticalOpPodDisruptionBudget == nil {
return fmt.Errorf("there is no pod disruption budget for critical operations in the cluster")
}
if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
newPdb, err := c.KubeClient.
PodDisruptionBudgets(pdb.Namespace).
Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err)
}
c.CriticalOpPodDisruptionBudget = newPdb
return nil
}
func (c *Cluster) deletePrimaryPodDisruptionBudget() error {
c.logger.Debug("deleting primary pod disruption budget")
if c.PrimaryPodDisruptionBudget == nil {
c.logger.Debug("there is no primary pod disruption budget in the cluster")
return nil
}
pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)
pdbName := util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)
err := c.KubeClient.
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace).
Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions)
PodDisruptionBudgets(c.PrimaryPodDisruptionBudget.Namespace).
Delete(context.TODO(), c.PrimaryPodDisruptionBudget.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta))
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta))
} else if err != nil {
return fmt.Errorf("could not delete PodDisruptionBudget: %v", err)
return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
}
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta))
c.PodDisruptionBudget = nil
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta))
c.PrimaryPodDisruptionBudget = nil
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) {
@ -483,12 +557,67 @@ func (c *Cluster) deletePodDisruptionBudget() error {
return false, err2
})
if err != nil {
return fmt.Errorf("could not delete pod disruption budget: %v", err)
return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
}
return nil
}
func (c *Cluster) deleteCriticalOpPodDisruptionBudget() error {
c.logger.Debug("deleting pod disruption budget for critical operations")
if c.CriticalOpPodDisruptionBudget == nil {
c.logger.Debug("there is no pod disruption budget for critical operations in the cluster")
return nil
}
pdbName := util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)
err := c.KubeClient.
PodDisruptionBudgets(c.CriticalOpPodDisruptionBudget.Namespace).
Delete(context.TODO(), c.CriticalOpPodDisruptionBudget.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta))
} else if err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta))
c.CriticalOpPodDisruptionBudget = nil
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) {
_, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{})
if err2 == nil {
return false, nil
}
if k8sutil.ResourceNotFound(err2) {
return true, nil
}
return false, err2
})
if err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
return nil
}
func (c *Cluster) deletePodDisruptionBudgets() error {
errors := make([]string, 0)
if err := c.deletePrimaryPodDisruptionBudget(); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}
func (c *Cluster) deleteEndpoint(role PostgresRole) error {
c.setProcessName("deleting endpoint")
c.logger.Debugf("deleting %s endpoint", role)
@ -705,7 +834,12 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet {
return c.Statefulset
}
// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget
func (c *Cluster) GetPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.PodDisruptionBudget
// GetPrimaryPodDisruptionBudget returns cluster's primary kubernetes PodDisruptionBudget
func (c *Cluster) GetPrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.PrimaryPodDisruptionBudget
}
// GetCriticalOpPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations
func (c *Cluster) GetCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.CriticalOpPodDisruptionBudget
}

View File

@ -117,8 +117,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
c.logger.Debug("syncing pod disruption budgets")
if err = c.syncPodDisruptionBudget(false); err != nil {
err = fmt.Errorf("could not sync pod disruption budget: %v", err)
if err = c.syncPodDisruptionBudgets(false); err != nil {
err = fmt.Errorf("could not sync pod disruption budgets: %v", err)
return err
}
@ -452,22 +452,22 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
return nil
}
func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
func (c *Cluster) syncPrimaryPodDisruptionBudget(isUpdate bool) error {
var (
pdb *policyv1.PodDisruptionBudget
err error
)
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.PodDisruptionBudget = pdb
newPDB := c.generatePodDisruptionBudget()
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.PrimaryPodDisruptionBudget = pdb
newPDB := c.generatePrimaryPodDisruptionBudget()
match, reason := c.comparePodDisruptionBudget(pdb, newPDB)
if !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err = c.updatePodDisruptionBudget(newPDB); err != nil {
if err = c.updatePrimaryPodDisruptionBudget(newPDB); err != nil {
return err
}
} else {
c.PodDisruptionBudget = pdb
c.PrimaryPodDisruptionBudget = pdb
}
return nil
@ -476,21 +476,74 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
return fmt.Errorf("could not get pod disruption budget: %v", err)
}
// no existing pod disruption budget, create new one
c.logger.Infof("could not find the cluster's pod disruption budget")
c.logger.Infof("could not find the primary pod disruption budget")
if pdb, err = c.createPodDisruptionBudget(); err != nil {
if err = c.createPrimaryPodDisruptionBudget(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create pod disruption budget: %v", err)
return fmt.Errorf("could not create primary pod disruption budget: %v", err)
}
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
}
}
c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta))
c.PodDisruptionBudget = pdb
return nil
}
func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error {
var (
pdb *policyv1.PodDisruptionBudget
err error
)
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.CriticalOpPodDisruptionBudget = pdb
newPDB := c.generateCriticalOpPodDisruptionBudget()
match, reason := c.comparePodDisruptionBudget(pdb, newPDB)
if !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err = c.updateCriticalOpPodDisruptionBudget(newPDB); err != nil {
return err
}
} else {
c.CriticalOpPodDisruptionBudget = pdb
}
return nil
}
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get pod disruption budget: %v", err)
}
// no existing pod disruption budget, create new one
c.logger.Infof("could not find pod disruption budget for critical operations")
if err = c.createCriticalOpPodDisruptionBudget(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err)
}
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
}
}
return nil
}
func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error {
errors := make([]string, 0)
if err := c.syncPrimaryPodDisruptionBudget(isUpdate); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if err := c.syncCriticalOpPodDisruptionBudget(isUpdate); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}

View File

@ -58,15 +58,16 @@ type WorkerStatus struct {
// ClusterStatus describes status of the cluster
type ClusterStatus struct {
Team string
Cluster string
Namespace string
MasterService *v1.Service
ReplicaService *v1.Service
MasterEndpoint *v1.Endpoints
ReplicaEndpoint *v1.Endpoints
StatefulSet *appsv1.StatefulSet
PodDisruptionBudget *policyv1.PodDisruptionBudget
Team string
Cluster string
Namespace string
MasterService *v1.Service
ReplicaService *v1.Service
MasterEndpoint *v1.Endpoints
ReplicaEndpoint *v1.Endpoints
StatefulSet *appsv1.StatefulSet
PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
CurrentProcess Process
Worker uint32

View File

@ -329,7 +329,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
if err != nil {
return nil, err
}
_, err = cluster.createPodDisruptionBudget()
err = cluster.createPodDisruptionBudgets()
if err != nil {
return nil, err
}