add cluster field for PVCs (#2785)
* add cluster field for PVCs * sync volumes on cluster creation * fully spell pvc in log messages
This commit is contained in:
parent
45e9227f55
commit
8231797efa
|
|
@ -366,7 +366,7 @@ configuration they are grouped under the `kubernetes` key.
|
|||
manifest. To keep secrets, set this option to `false`. The default is `true`.
|
||||
|
||||
* **enable_persistent_volume_claim_deletion**
|
||||
By default, the operator deletes PersistentVolumeClaims when removing the
|
||||
By default, the operator deletes persistent volume claims when removing the
|
||||
Postgres cluster manifest, no matter if `persistent_volume_claim_retention_policy`
|
||||
on the statefulset is set to `retain`. To keep PVCs set this option to `false`.
|
||||
The default is `true`.
|
||||
|
|
|
|||
|
|
@ -65,11 +65,11 @@ type kubeResources struct {
|
|||
PatroniConfigMaps map[string]*v1.ConfigMap
|
||||
Secrets map[types.UID]*v1.Secret
|
||||
Statefulset *appsv1.StatefulSet
|
||||
VolumeClaims map[types.UID]*v1.PersistentVolumeClaim
|
||||
PodDisruptionBudget *policyv1.PodDisruptionBudget
|
||||
LogicalBackupJob *batchv1.CronJob
|
||||
Streams map[string]*zalandov1.FabricEventStream
|
||||
//Pods are treated separately
|
||||
//PVCs are treated separately
|
||||
}
|
||||
|
||||
// Cluster describes postgresql cluster
|
||||
|
|
@ -140,6 +140,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
|
|||
Endpoints: make(map[PostgresRole]*v1.Endpoints),
|
||||
PatroniEndpoints: make(map[string]*v1.Endpoints),
|
||||
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
|
||||
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
|
||||
Streams: make(map[string]*zalandov1.FabricEventStream)},
|
||||
userSyncStrategy: users.DefaultUserSyncStrategy{
|
||||
PasswordEncryption: passwordEncryption,
|
||||
|
|
@ -363,6 +364,11 @@ func (c *Cluster) Create() (err error) {
|
|||
c.logger.Infof("pods are ready")
|
||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")
|
||||
|
||||
// sync volume may already transition volumes to gp3, if iops/throughput or type is specified
|
||||
if err = c.syncVolumes(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// sync resources created by Patroni
|
||||
if err = c.syncPatroniResources(); err != nil {
|
||||
c.logger.Warnf("Patroni resources not yet synced: %v", err)
|
||||
|
|
|
|||
|
|
@ -39,8 +39,8 @@ func (c *Cluster) listResources() error {
|
|||
c.logger.Infof("found logical backup job: %q (uid: %q)", util.NameFromMeta(c.LogicalBackupJob.ObjectMeta), c.LogicalBackupJob.UID)
|
||||
}
|
||||
|
||||
for _, secret := range c.Secrets {
|
||||
c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID, secret.ObjectMeta.Namespace)
|
||||
for uid, secret := range c.Secrets {
|
||||
c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), uid, secret.ObjectMeta.Namespace)
|
||||
}
|
||||
|
||||
for role, service := range c.Services {
|
||||
|
|
@ -70,13 +70,8 @@ func (c *Cluster) listResources() error {
|
|||
c.logger.Infof("found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
|
||||
}
|
||||
|
||||
pvcs, err := c.listPersistentVolumeClaims()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get the list of PVCs: %v", err)
|
||||
}
|
||||
|
||||
for _, obj := range pvcs {
|
||||
c.logger.Infof("found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
|
||||
for uid, pvc := range c.VolumeClaims {
|
||||
c.logger.Infof("found persistent volume claim: %q (uid: %q)", util.NameFromMeta(pvc.ObjectMeta), uid)
|
||||
}
|
||||
|
||||
for role, poolerObjs := range c.ConnectionPooler {
|
||||
|
|
@ -288,10 +283,10 @@ func (c *Cluster) deleteStatefulSet() error {
|
|||
|
||||
if c.OpConfig.EnablePersistentVolumeClaimDeletion != nil && *c.OpConfig.EnablePersistentVolumeClaimDeletion {
|
||||
if err := c.deletePersistentVolumeClaims(); err != nil {
|
||||
return fmt.Errorf("could not delete PersistentVolumeClaims: %v", err)
|
||||
return fmt.Errorf("could not delete persistent volume claims: %v", err)
|
||||
}
|
||||
} else {
|
||||
c.logger.Info("not deleting PersistentVolumeClaims because disabled in configuration")
|
||||
c.logger.Info("not deleting persistent volume claims because disabled in configuration")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -13,9 +13,9 @@ import (
|
|||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/zalando/postgres-operator/pkg/spec"
|
||||
"github.com/zalando/postgres-operator/pkg/util"
|
||||
"github.com/zalando/postgres-operator/pkg/util/constants"
|
||||
"github.com/zalando/postgres-operator/pkg/util/filesystems"
|
||||
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
|
||||
"github.com/zalando/postgres-operator/pkg/util/volumes"
|
||||
)
|
||||
|
||||
|
|
@ -185,7 +185,7 @@ func (c *Cluster) syncVolumeClaims() error {
|
|||
|
||||
if c.OpConfig.StorageResizeMode == "off" || c.OpConfig.StorageResizeMode == "ebs" {
|
||||
ignoreResize = true
|
||||
c.logger.Debugf("Storage resize mode is set to %q. Skipping volume size sync of PVCs.", c.OpConfig.StorageResizeMode)
|
||||
c.logger.Debugf("Storage resize mode is set to %q. Skipping volume size sync of persistent volume claims.", c.OpConfig.StorageResizeMode)
|
||||
}
|
||||
|
||||
newSize, err := resource.ParseQuantity(c.Spec.Volume.Size)
|
||||
|
|
@ -196,9 +196,10 @@ func (c *Cluster) syncVolumeClaims() error {
|
|||
|
||||
pvcs, err := c.listPersistentVolumeClaims()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not receive persistent volume claims: %v", err)
|
||||
return fmt.Errorf("could not list persistent volume claims: %v", err)
|
||||
}
|
||||
for _, pvc := range pvcs {
|
||||
c.VolumeClaims[pvc.UID] = &pvc
|
||||
needsUpdate := false
|
||||
currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage])
|
||||
if !ignoreResize && currentSize != manifestSize {
|
||||
|
|
@ -213,9 +214,11 @@ func (c *Cluster) syncVolumeClaims() error {
|
|||
|
||||
if needsUpdate {
|
||||
c.logger.Infof("updating persistent volume claim definition for volume %q", pvc.Name)
|
||||
if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil {
|
||||
updatedPvc, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update persistent volume claim: %q", err)
|
||||
}
|
||||
c.VolumeClaims[pvc.UID] = updatedPvc
|
||||
c.logger.Infof("successfully updated persistent volume claim %q", pvc.Name)
|
||||
} else {
|
||||
c.logger.Debugf("volume claim for volume %q do not require updates", pvc.Name)
|
||||
|
|
@ -227,10 +230,11 @@ func (c *Cluster) syncVolumeClaims() error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err)
|
||||
}
|
||||
_, err = c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
|
||||
patchedPvc, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not patch annotations of the persistent volume claim for volume %q: %v", pvc.Name, err)
|
||||
}
|
||||
c.VolumeClaims[pvc.UID] = patchedPvc
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -268,38 +272,50 @@ func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, erro
|
|||
|
||||
pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(context.TODO(), listOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not list of PersistentVolumeClaims: %v", err)
|
||||
return nil, fmt.Errorf("could not list of persistent volume claims: %v", err)
|
||||
}
|
||||
return pvcs.Items, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deletePersistentVolumeClaims() error {
|
||||
c.logger.Debug("deleting PVCs")
|
||||
pvcs, err := c.listPersistentVolumeClaims()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, pvc := range pvcs {
|
||||
c.logger.Debugf("deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta))
|
||||
if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil {
|
||||
c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err)
|
||||
c.setProcessName("deleting persistent volume claims")
|
||||
errors := make([]string, 0)
|
||||
for uid := range c.VolumeClaims {
|
||||
err := c.deletePersistentVolumeClaim(uid)
|
||||
if err != nil {
|
||||
errors = append(errors, fmt.Sprintf("%v", err))
|
||||
}
|
||||
}
|
||||
if len(pvcs) > 0 {
|
||||
c.logger.Debug("PVCs have been deleted")
|
||||
} else {
|
||||
c.logger.Debug("no PVCs to delete")
|
||||
|
||||
if len(errors) > 0 {
|
||||
c.logger.Warningf("could not delete all persistent volume claims: %v", strings.Join(errors, `', '`))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deletePersistentVolumeClaim(uid types.UID) error {
|
||||
c.setProcessName("deleting persistent volume claim")
|
||||
pvc := c.VolumeClaims[uid]
|
||||
c.logger.Debugf("deleting persistent volume claim %q", pvc.Name)
|
||||
err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions)
|
||||
if k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Debugf("persistent volume claim %q has already been deleted", pvc.Name)
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("could not delete persistent volume claim %q: %v", pvc.Name, err)
|
||||
}
|
||||
c.logger.Infof("persistent volume claim %q has been deleted", pvc.Name)
|
||||
delete(c.VolumeClaims, uid)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) {
|
||||
result := make([]*v1.PersistentVolume, 0)
|
||||
|
||||
pvcs, err := c.listPersistentVolumeClaims()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not list cluster's PersistentVolumeClaims: %v", err)
|
||||
return nil, fmt.Errorf("could not list cluster's persistent volume claims: %v", err)
|
||||
}
|
||||
|
||||
pods, err := c.listPods()
|
||||
|
|
|
|||
|
|
@ -93,7 +93,7 @@ func TestResizeVolumeClaim(t *testing.T) {
|
|||
|
||||
// check if listPersistentVolumeClaims returns only the PVCs matching the filter
|
||||
if len(pvcs) != len(pvcList.Items)-1 {
|
||||
t.Errorf("%s: could not find all PVCs, got %v, expected %v", testName, len(pvcs), len(pvcList.Items)-1)
|
||||
t.Errorf("%s: could not find all persistent volume claims, got %v, expected %v", testName, len(pvcs), len(pvcList.Items)-1)
|
||||
}
|
||||
|
||||
// check if PVCs were correctly resized
|
||||
|
|
|
|||
Loading…
Reference in New Issue