Critical operation PDB (#2830)

Create the second PDB to cover Pods with a special "critical operation" label set.

This label is going to be assigned to all pg cluster's Pods by the Operator during a PG major version upgrade, by Patroni during a cluster/replica bootstrap. It can also be set manually or by any other automation tool.
This commit is contained in:
Polina Bungina 2025-01-29 14:41:08 +03:00 committed by GitHub
parent f49b4f1e97
commit a56ecaace7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 456 additions and 123 deletions

View File

@ -620,22 +620,34 @@ By default the topology key for the pod anti affinity is set to
`kubernetes.io/hostname`, you can set another topology key e.g. `kubernetes.io/hostname`, you can set another topology key e.g.
`failure-domain.beta.kubernetes.io/zone`. See [built-in node labels](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#interlude-built-in-node-labels) for available topology keys. `failure-domain.beta.kubernetes.io/zone`. See [built-in node labels](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#interlude-built-in-node-labels) for available topology keys.
## Pod Disruption Budget ## Pod Disruption Budgets
By default the operator uses a PodDisruptionBudget (PDB) to protect the cluster By default the operator creates two PodDisruptionBudgets (PDB) to protect the cluster
from voluntarily disruptions and hence unwanted DB downtime. The `MinAvailable` from voluntarily disruptions and hence unwanted DB downtime: so-called primary PDB and
parameter of the PDB is set to `1` which prevents killing masters in single-node and PDB for critical operations.
clusters and/or the last remaining running instance in a multi-node cluster.
### Primary PDB
The `MinAvailable` parameter of this PDB is set to `1` and, if `pdb_master_label_selector`
is enabled, label selector includes `spilo-role=master` condition, which prevents killing
masters in single-node clusters and/or the last remaining running instance in a multi-node
cluster.
## PDB for critical operations
The `MinAvailable` parameter of this PDB is equal to the `numberOfInstances` set in the
cluster manifest, while label selector includes `critical-operation=true` condition. This
allows to protect all pods of a cluster, given they are labeled accordingly.
For example, Operator labels all Spilo pods with `critical-operation=true` during the major
version upgrade run. You may want to protect cluster pods during other critical operations
by assigning the label to pods yourself or using other means of automation.
The PDB is only relaxed in two scenarios: The PDB is only relaxed in two scenarios:
* If a cluster is scaled down to `0` instances (e.g. for draining nodes) * If a cluster is scaled down to `0` instances (e.g. for draining nodes)
* If the PDB is disabled in the configuration (`enable_pod_disruption_budget`) * If the PDB is disabled in the configuration (`enable_pod_disruption_budget`)
The PDB is still in place having `MinAvailable` set to `0`. If enabled it will The PDBs are still in place having `MinAvailable` set to `0`. Disabling PDBs
be automatically set to `1` on scale up. Disabling PDBs helps avoiding blocking helps avoiding blocking Kubernetes upgrades in managed K8s environments at the
Kubernetes upgrades in managed K8s environments at the cost of prolonged DB cost of prolonged DB downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384)
downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384)
for the use case. for the use case.
## Add cluster-specific labels ## Add cluster-specific labels

View File

@ -230,7 +230,7 @@ kubectl delete postgresql acid-minimal-cluster
``` ```
This should remove the associated StatefulSet, database Pods, Services and This should remove the associated StatefulSet, database Pods, Services and
Endpoints. The PersistentVolumes are released and the PodDisruptionBudget is Endpoints. The PersistentVolumes are released and the PodDisruptionBudgets are
deleted. Secrets however are not deleted and backups will remain in place. deleted. Secrets however are not deleted and backups will remain in place.
When deleting a cluster while it is still starting up or got stuck during that When deleting a cluster while it is still starting up or got stuck during that

View File

@ -334,13 +334,13 @@ configuration they are grouped under the `kubernetes` key.
pod namespace). pod namespace).
* **pdb_name_format** * **pdb_name_format**
defines the template for PDB (Pod Disruption Budget) names created by the defines the template for primary PDB (Pod Disruption Budget) name created by the
operator. The default is `postgres-{cluster}-pdb`, where `{cluster}` is operator. The default is `postgres-{cluster}-pdb`, where `{cluster}` is
replaced by the cluster name. Only the `{cluster}` placeholders is allowed in replaced by the cluster name. Only the `{cluster}` placeholders is allowed in
the template. the template.
* **pdb_master_label_selector** * **pdb_master_label_selector**
By default the PDB will match the master role hence preventing nodes to be By default the primary PDB will match the master role hence preventing nodes to be
drained if the node_readiness_label is not used. If this option if set to drained if the node_readiness_label is not used. If this option if set to
`false` the `spilo-role=master` selector will not be added to the PDB. `false` the `spilo-role=master` selector will not be added to the PDB.

View File

@ -2547,7 +2547,10 @@ class EndToEndTestCase(unittest.TestCase):
self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed") self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed")
pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace) pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "primary pod disruption budget owner reference check failed")
pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-op-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget for critical operations owner reference check failed")
pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace) pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed") self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed")

View File

@ -59,16 +59,17 @@ type Config struct {
} }
type kubeResources struct { type kubeResources struct {
Services map[PostgresRole]*v1.Service Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints Endpoints map[PostgresRole]*v1.Endpoints
PatroniEndpoints map[string]*v1.Endpoints PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet Statefulset *appsv1.StatefulSet
VolumeClaims map[types.UID]*v1.PersistentVolumeClaim VolumeClaims map[types.UID]*v1.PersistentVolumeClaim
PodDisruptionBudget *policyv1.PodDisruptionBudget PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
Streams map[string]*zalandov1.FabricEventStream LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately //Pods are treated separately
} }
@ -343,14 +344,10 @@ func (c *Cluster) Create() (err error) {
c.logger.Infof("secrets have been successfully created") c.logger.Infof("secrets have been successfully created")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created")
if c.PodDisruptionBudget != nil { if err = c.createPodDisruptionBudgets(); err != nil {
return fmt.Errorf("pod disruption budget already exists in the cluster") return fmt.Errorf("could not create pod disruption budgets: %v", err)
} }
pdb, err := c.createPodDisruptionBudget() c.logger.Info("pod disruption budgets have been successfully created")
if err != nil {
return fmt.Errorf("could not create pod disruption budget: %v", err)
}
c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta))
if c.Statefulset != nil { if c.Statefulset != nil {
return fmt.Errorf("statefulset already exists in the cluster") return fmt.Errorf("statefulset already exists in the cluster")
@ -1081,9 +1078,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
} }
} }
// pod disruption budget // pod disruption budgets
if err := c.syncPodDisruptionBudget(true); err != nil { if err := c.syncPodDisruptionBudgets(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err) c.logger.Errorf("could not sync pod disruption budgets: %v", err)
updateFailed = true updateFailed = true
} }
@ -1228,10 +1225,10 @@ func (c *Cluster) Delete() error {
c.logger.Info("not deleting secrets because disabled in configuration") c.logger.Info("not deleting secrets because disabled in configuration")
} }
if err := c.deletePodDisruptionBudget(); err != nil { if err := c.deletePodDisruptionBudgets(); err != nil {
anyErrors = true anyErrors = true
c.logger.Warningf("could not delete pod disruption budget: %v", err) c.logger.Warningf("could not delete pod disruption budgets: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budgets: %v", err)
} }
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []PostgresRole{Master, Replica} {
@ -1730,16 +1727,17 @@ func (c *Cluster) GetCurrentProcess() Process {
// GetStatus provides status of the cluster // GetStatus provides status of the cluster
func (c *Cluster) GetStatus() *ClusterStatus { func (c *Cluster) GetStatus() *ClusterStatus {
status := &ClusterStatus{ status := &ClusterStatus{
Cluster: c.Name, Cluster: c.Name,
Namespace: c.Namespace, Namespace: c.Namespace,
Team: c.Spec.TeamID, Team: c.Spec.TeamID,
Status: c.Status, Status: c.Status,
Spec: c.Spec, Spec: c.Spec,
MasterService: c.GetServiceMaster(), MasterService: c.GetServiceMaster(),
ReplicaService: c.GetServiceReplica(), ReplicaService: c.GetServiceReplica(),
StatefulSet: c.GetStatefulSet(), StatefulSet: c.GetStatefulSet(),
PodDisruptionBudget: c.GetPodDisruptionBudget(), PrimaryPodDisruptionBudget: c.GetPrimaryPodDisruptionBudget(),
CurrentProcess: c.GetCurrentProcess(), CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(),
CurrentProcess: c.GetCurrentProcess(),
Error: fmt.Errorf("error: %s", c.Error), Error: fmt.Errorf("error: %s", c.Error),
} }

View File

@ -109,10 +109,15 @@ func (c *Cluster) servicePort(role PostgresRole) int32 {
return pgPort return pgPort
} }
func (c *Cluster) podDisruptionBudgetName() string { func (c *Cluster) PrimaryPodDisruptionBudgetName() string {
return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) return c.OpConfig.PDBNameFormat.Format("cluster", c.Name)
} }
func (c *Cluster) criticalOpPodDisruptionBudgetName() string {
pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-op-pdb")
return pdbTemplate.Format("cluster", c.Name)
}
func makeDefaultResources(config *config.Config) acidv1.Resources { func makeDefaultResources(config *config.Config) acidv1.Resources {
defaultRequests := acidv1.ResourceDescription{ defaultRequests := acidv1.ResourceDescription{
@ -2207,7 +2212,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript
return result return result
} }
func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { func (c *Cluster) generatePrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget {
minAvailable := intstr.FromInt(1) minAvailable := intstr.FromInt(1)
pdbEnabled := c.OpConfig.EnablePodDisruptionBudget pdbEnabled := c.OpConfig.EnablePodDisruptionBudget
pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector
@ -2225,7 +2230,36 @@ func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{ return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.podDisruptionBudgetName(), Name: c.PrimaryPodDisruptionBudgetName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: c.annotationsSet(nil),
OwnerReferences: c.ownerReferences(),
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
},
}
}
func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget {
minAvailable := intstr.FromInt32(c.Spec.NumberOfInstances)
pdbEnabled := c.OpConfig.EnablePodDisruptionBudget
// if PodDisruptionBudget is disabled or if there are no DB pods, set the budget to 0.
if (pdbEnabled != nil && !(*pdbEnabled)) || c.Spec.NumberOfInstances <= 0 {
minAvailable = intstr.FromInt(0)
}
labels := c.labelsSet(false)
labels["critical-operation"] = "true"
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: c.criticalOpPodDisruptionBudgetName(),
Namespace: c.Namespace, Namespace: c.Namespace,
Labels: c.labelsSet(true), Labels: c.labelsSet(true),
Annotations: c.annotationsSet(nil), Annotations: c.annotationsSet(nil),

View File

@ -2349,22 +2349,34 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
} }
} }
testLabelsAndSelectors := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { testLabelsAndSelectors := func(isPrimary bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector return func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector
return fmt.Errorf("Object Namespace incorrect.") if podDisruptionBudget.ObjectMeta.Namespace != "myapp" {
} return fmt.Errorf("Object Namespace incorrect.")
if !reflect.DeepEqual(podDisruptionBudget.Labels, map[string]string{"team": "myapp", "cluster-name": "myapp-database"}) { }
return fmt.Errorf("Labels incorrect.") expectedLabels := map[string]string{"team": "myapp", "cluster-name": "myapp-database"}
} if !reflect.DeepEqual(podDisruptionBudget.Labels, expectedLabels) {
if !masterLabelSelectorDisabled && return fmt.Errorf("Labels incorrect, got %#v, expected %#v", podDisruptionBudget.Labels, expectedLabels)
!reflect.DeepEqual(podDisruptionBudget.Spec.Selector, &metav1.LabelSelector{ }
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}) { if !masterLabelSelectorDisabled {
if isPrimary {
expectedLabels := &metav1.LabelSelector{
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}
if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) {
return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels)
}
} else {
expectedLabels := &metav1.LabelSelector{
MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operation": "true"}}
if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) {
return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels)
}
}
}
return fmt.Errorf("MatchLabels incorrect.") return nil
} }
return nil
} }
testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
@ -2400,7 +2412,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1), hasMinAvailable(1),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2417,7 +2429,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(0), hasMinAvailable(0),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2434,7 +2446,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(0), hasMinAvailable(0),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2451,7 +2463,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-databass-budget"), hasName("postgres-myapp-database-databass-budget"),
hasMinAvailable(1), hasMinAvailable(1),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2468,7 +2480,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1), hasMinAvailable(1),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
{ {
@ -2485,13 +2497,99 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
testPodDisruptionBudgetOwnerReference, testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-pdb"), hasName("postgres-myapp-database-pdb"),
hasMinAvailable(1), hasMinAvailable(1),
testLabelsAndSelectors, testLabelsAndSelectors(true),
}, },
}, },
} }
for _, tt := range tests { for _, tt := range tests {
result := tt.spec.generatePodDisruptionBudget() result := tt.spec.generatePrimaryPodDisruptionBudget()
for _, check := range tt.check {
err := check(tt.spec, result)
if err != nil {
t.Errorf("%s [%s]: PodDisruptionBudget spec is incorrect, %+v",
testName, tt.scenario, err)
}
}
}
testCriticalOp := []struct {
scenario string
spec *Cluster
check []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error
}{
{
scenario: "With multiple instances",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(3),
testLabelsAndSelectors(false),
},
},
{
scenario: "With zero instances",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors(false),
},
},
{
scenario: "With PodDisruptionBudget disabled",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(0),
testLabelsAndSelectors(false),
},
},
{
scenario: "With OwnerReference enabled",
spec: New(
Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}},
k8sutil.KubernetesClient{},
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger,
eventRecorder),
check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{
testPodDisruptionBudgetOwnerReference,
hasName("postgres-myapp-database-critical-op-pdb"),
hasMinAvailable(3),
testLabelsAndSelectors(false),
},
},
}
for _, tt := range testCriticalOp {
result := tt.spec.generateCriticalOpPodDisruptionBudget()
for _, check := range tt.check { for _, check := range tt.check {
err := check(tt.spec, result) err := check(tt.spec, result)
if err != nil { if err != nil {

View File

@ -23,8 +23,13 @@ const (
) )
func (c *Cluster) listResources() error { func (c *Cluster) listResources() error {
if c.PodDisruptionBudget != nil { if c.PrimaryPodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) c.logger.Infof("found primary pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta), c.PrimaryPodDisruptionBudget.UID)
}
if c.CriticalOpPodDisruptionBudget != nil {
c.logger.Infof("found pod disruption budget for critical operations: %q (uid: %q)", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta), c.CriticalOpPodDisruptionBudget.UID)
} }
if c.Statefulset != nil { if c.Statefulset != nil {
@ -417,59 +422,128 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset
return result return result
} }
func (c *Cluster) createPodDisruptionBudget() (*policyv1.PodDisruptionBudget, error) { func (c *Cluster) createPrimaryPodDisruptionBudget() error {
podDisruptionBudgetSpec := c.generatePodDisruptionBudget() c.logger.Debug("creating primary pod disruption budget")
if c.PrimaryPodDisruptionBudget != nil {
c.logger.Warning("primary pod disruption budget already exists in the cluster")
return nil
}
podDisruptionBudgetSpec := c.generatePrimaryPodDisruptionBudget()
podDisruptionBudget, err := c.KubeClient. podDisruptionBudget, err := c.KubeClient.
PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
if err != nil { if err != nil {
return nil, err return err
} }
c.PodDisruptionBudget = podDisruptionBudget c.logger.Infof("primary pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta))
c.PrimaryPodDisruptionBudget = podDisruptionBudget
return podDisruptionBudget, nil return nil
} }
func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { func (c *Cluster) createCriticalOpPodDisruptionBudget() error {
if c.PodDisruptionBudget == nil { c.logger.Debug("creating pod disruption budget for critical operations")
return fmt.Errorf("there is no pod disruption budget in the cluster") if c.CriticalOpPodDisruptionBudget != nil {
c.logger.Warning("pod disruption budget for critical operations already exists in the cluster")
return nil
} }
if err := c.deletePodDisruptionBudget(); err != nil { podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget()
return fmt.Errorf("could not delete pod disruption budget: %v", err) podDisruptionBudget, err := c.KubeClient.
PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace).
Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{})
if err != nil {
return err
}
c.logger.Infof("pod disruption budget for critical operations %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta))
c.CriticalOpPodDisruptionBudget = podDisruptionBudget
return nil
}
func (c *Cluster) createPodDisruptionBudgets() error {
errors := make([]string, 0)
err := c.createPrimaryPodDisruptionBudget()
if err != nil {
errors = append(errors, fmt.Sprintf("could not create primary pod disruption budget: %v", err))
}
err = c.createCriticalOpPodDisruptionBudget()
if err != nil {
errors = append(errors, fmt.Sprintf("could not create pod disruption budget for critical operations: %v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}
func (c *Cluster) updatePrimaryPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
c.logger.Debug("updating primary pod disruption budget")
if c.PrimaryPodDisruptionBudget == nil {
return fmt.Errorf("there is no primary pod disruption budget in the cluster")
}
if err := c.deletePrimaryPodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
} }
newPdb, err := c.KubeClient. newPdb, err := c.KubeClient.
PodDisruptionBudgets(pdb.Namespace). PodDisruptionBudgets(pdb.Namespace).
Create(context.TODO(), pdb, metav1.CreateOptions{}) Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil { if err != nil {
return fmt.Errorf("could not create pod disruption budget: %v", err) return fmt.Errorf("could not create primary pod disruption budget: %v", err)
} }
c.PodDisruptionBudget = newPdb c.PrimaryPodDisruptionBudget = newPdb
return nil return nil
} }
func (c *Cluster) deletePodDisruptionBudget() error { func (c *Cluster) updateCriticalOpPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error {
c.logger.Debug("deleting pod disruption budget") c.logger.Debug("updating pod disruption budget for critical operations")
if c.PodDisruptionBudget == nil { if c.CriticalOpPodDisruptionBudget == nil {
c.logger.Debug("there is no pod disruption budget in the cluster") return fmt.Errorf("there is no pod disruption budget for critical operations in the cluster")
}
if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
newPdb, err := c.KubeClient.
PodDisruptionBudgets(pdb.Namespace).
Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err)
}
c.CriticalOpPodDisruptionBudget = newPdb
return nil
}
func (c *Cluster) deletePrimaryPodDisruptionBudget() error {
c.logger.Debug("deleting primary pod disruption budget")
if c.PrimaryPodDisruptionBudget == nil {
c.logger.Debug("there is no primary pod disruption budget in the cluster")
return nil return nil
} }
pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta) pdbName := util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)
err := c.KubeClient. err := c.KubeClient.
PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). PodDisruptionBudgets(c.PrimaryPodDisruptionBudget.Namespace).
Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions) Delete(context.TODO(), c.PrimaryPodDisruptionBudget.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) { if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta))
} else if err != nil { } else if err != nil {
return fmt.Errorf("could not delete PodDisruptionBudget: %v", err) return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
} }
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta))
c.PodDisruptionBudget = nil c.PrimaryPodDisruptionBudget = nil
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) { func() (bool, error) {
@ -483,12 +557,67 @@ func (c *Cluster) deletePodDisruptionBudget() error {
return false, err2 return false, err2
}) })
if err != nil { if err != nil {
return fmt.Errorf("could not delete pod disruption budget: %v", err) return fmt.Errorf("could not delete primary pod disruption budget: %v", err)
} }
return nil return nil
} }
func (c *Cluster) deleteCriticalOpPodDisruptionBudget() error {
c.logger.Debug("deleting pod disruption budget for critical operations")
if c.CriticalOpPodDisruptionBudget == nil {
c.logger.Debug("there is no pod disruption budget for critical operations in the cluster")
return nil
}
pdbName := util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)
err := c.KubeClient.
PodDisruptionBudgets(c.CriticalOpPodDisruptionBudget.Namespace).
Delete(context.TODO(), c.CriticalOpPodDisruptionBudget.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta))
} else if err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta))
c.CriticalOpPodDisruptionBudget = nil
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) {
_, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{})
if err2 == nil {
return false, nil
}
if k8sutil.ResourceNotFound(err2) {
return true, nil
}
return false, err2
})
if err != nil {
return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err)
}
return nil
}
func (c *Cluster) deletePodDisruptionBudgets() error {
errors := make([]string, 0)
if err := c.deletePrimaryPodDisruptionBudget(); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil
}
func (c *Cluster) deleteEndpoint(role PostgresRole) error { func (c *Cluster) deleteEndpoint(role PostgresRole) error {
c.setProcessName("deleting endpoint") c.setProcessName("deleting endpoint")
c.logger.Debugf("deleting %s endpoint", role) c.logger.Debugf("deleting %s endpoint", role)
@ -705,7 +834,12 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet {
return c.Statefulset return c.Statefulset
} }
// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget // GetPrimaryPodDisruptionBudget returns cluster's primary kubernetes PodDisruptionBudget
func (c *Cluster) GetPodDisruptionBudget() *policyv1.PodDisruptionBudget { func (c *Cluster) GetPrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.PodDisruptionBudget return c.PrimaryPodDisruptionBudget
}
// GetCriticalOpPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations
func (c *Cluster) GetCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget {
return c.CriticalOpPodDisruptionBudget
} }

View File

@ -117,8 +117,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
} }
c.logger.Debug("syncing pod disruption budgets") c.logger.Debug("syncing pod disruption budgets")
if err = c.syncPodDisruptionBudget(false); err != nil { if err = c.syncPodDisruptionBudgets(false); err != nil {
err = fmt.Errorf("could not sync pod disruption budget: %v", err) err = fmt.Errorf("could not sync pod disruption budgets: %v", err)
return err return err
} }
@ -452,22 +452,22 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
return nil return nil
} }
func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { func (c *Cluster) syncPrimaryPodDisruptionBudget(isUpdate bool) error {
var ( var (
pdb *policyv1.PodDisruptionBudget pdb *policyv1.PodDisruptionBudget
err error err error
) )
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.PodDisruptionBudget = pdb c.PrimaryPodDisruptionBudget = pdb
newPDB := c.generatePodDisruptionBudget() newPDB := c.generatePrimaryPodDisruptionBudget()
match, reason := c.comparePodDisruptionBudget(pdb, newPDB) match, reason := c.comparePodDisruptionBudget(pdb, newPDB)
if !match { if !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason) c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err = c.updatePodDisruptionBudget(newPDB); err != nil { if err = c.updatePrimaryPodDisruptionBudget(newPDB); err != nil {
return err return err
} }
} else { } else {
c.PodDisruptionBudget = pdb c.PrimaryPodDisruptionBudget = pdb
} }
return nil return nil
@ -476,21 +476,74 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
return fmt.Errorf("could not get pod disruption budget: %v", err) return fmt.Errorf("could not get pod disruption budget: %v", err)
} }
// no existing pod disruption budget, create new one // no existing pod disruption budget, create new one
c.logger.Infof("could not find the cluster's pod disruption budget") c.logger.Infof("could not find the primary pod disruption budget")
if pdb, err = c.createPodDisruptionBudget(); err != nil { if err = c.createPrimaryPodDisruptionBudget(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create pod disruption budget: %v", err) return fmt.Errorf("could not create primary pod disruption budget: %v", err)
} }
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil { if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
} }
} }
c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) return nil
c.PodDisruptionBudget = pdb }
func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error {
var (
pdb *policyv1.PodDisruptionBudget
err error
)
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.CriticalOpPodDisruptionBudget = pdb
newPDB := c.generateCriticalOpPodDisruptionBudget()
match, reason := c.comparePodDisruptionBudget(pdb, newPDB)
if !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err = c.updateCriticalOpPodDisruptionBudget(newPDB); err != nil {
return err
}
} else {
c.CriticalOpPodDisruptionBudget = pdb
}
return nil
}
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get pod disruption budget: %v", err)
}
// no existing pod disruption budget, create new one
c.logger.Infof("could not find pod disruption budget for critical operations")
if err = c.createCriticalOpPodDisruptionBudget(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err)
}
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
}
}
return nil
}
func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error {
errors := make([]string, 0)
if err := c.syncPrimaryPodDisruptionBudget(isUpdate); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if err := c.syncCriticalOpPodDisruptionBudget(isUpdate); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}
return nil return nil
} }

View File

@ -58,15 +58,16 @@ type WorkerStatus struct {
// ClusterStatus describes status of the cluster // ClusterStatus describes status of the cluster
type ClusterStatus struct { type ClusterStatus struct {
Team string Team string
Cluster string Cluster string
Namespace string Namespace string
MasterService *v1.Service MasterService *v1.Service
ReplicaService *v1.Service ReplicaService *v1.Service
MasterEndpoint *v1.Endpoints MasterEndpoint *v1.Endpoints
ReplicaEndpoint *v1.Endpoints ReplicaEndpoint *v1.Endpoints
StatefulSet *appsv1.StatefulSet StatefulSet *appsv1.StatefulSet
PodDisruptionBudget *policyv1.PodDisruptionBudget PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
CurrentProcess Process CurrentProcess Process
Worker uint32 Worker uint32

View File

@ -329,7 +329,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
if err != nil { if err != nil {
return nil, err return nil, err
} }
_, err = cluster.createPodDisruptionBudget() err = cluster.createPodDisruptionBudgets()
if err != nil { if err != nil {
return nil, err return nil, err
} }