From 9e095e49d8cdfc4146341b91e7ce17883b009b93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Mu=C3=9Fler?= Date: Mon, 7 Dec 2020 17:52:02 +0100 Subject: [PATCH] Default volume migration done. --- pkg/cluster/cluster.go | 10 +------ pkg/cluster/sync.go | 5 ++++ pkg/cluster/volumes.go | 28 ++++++++++++++++++++ pkg/util/config/config.go | 2 +- pkg/util/volumes/ebs.go | 53 ++++++++++++++++++++++++++----------- pkg/util/volumes/volumes.go | 10 +++++++ 6 files changed, 83 insertions(+), 25 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 2e59320e4..ac3ef923f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -65,14 +65,6 @@ type kubeResources struct { //PVCs are treated separately } -type EBSVolume struct { - volumeId string - volumeType string - size int64 - iops int32 - throughput int32 -} - // Cluster describes postgresql cluster type Cluster struct { kubeResources @@ -98,7 +90,7 @@ type Cluster struct { processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects - EBSVolumes map[string]EBSVolume + EBSVolumes map[string]volumes.VolumeProperties VolumeResizer volumes.VolumeResizer } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index ce5a480d8..093454381 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -55,7 +55,12 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.logger.Debugf("syncing volumes using %q storage resize mode", c.OpConfig.StorageResizeMode) + if c.OpConfig.EnableEBSGp3Migration { + c.executeEBSMigration() + } + if c.OpConfig.StorageResizeMode == "mixed" { + // mixed op uses AWS API to adjust size,throughput,iops and calls pvc chance for file system resize // resize pvc to adjust filesystem size until better K8s support if err = c.syncVolumeClaims(); err != nil { diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index e35fcb38c..1752af691 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -251,3 +251,31 @@ func getPodNameFromPersistentVolume(pv *v1.PersistentVolume) *spec.NamespacedNam func quantityToGigabyte(q resource.Quantity) int64 { return q.ScaledValue(0) / (1 * constants.Gigabyte) } + +func (c *Cluster) executeEBSMigration() error { + if !c.OpConfig.EnableEBSGp3Migration { + return nil + } + + pvs, _, err := c.listVolumesWithManifestSize(c.Spec.Volume) + if err != nil { + return fmt.Errorf("could not list persistent volumes: %v", err) + } + + volumeIds := []string{} + for _, pv := range pvs { + volumeIds = append(volumeIds, pv.Spec.AWSElasticBlockStore.VolumeID) + } + + awsVolumes, err := c.VolumeResizer.DescribeVolumes(volumeIds) + + for _, volume := range awsVolumes { + if volume.VolumeType == "gp2" && volume.Size < c.OpConfig.EnableEBSGp3MaxSize { + c.logger.Info("Modifying EBS volume %s to type gp3 (%s)", volume.VolumeID, volume.Size) + c.VolumeResizer.ModifyVolume(volume.VolumeID, "gp3", volume.Size, 3000, 125) + } + c.EBSVolumes[volume.VolumeID] = volume + } + + return nil +} diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 19fe0ed67..8e79d0abe 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -199,7 +199,7 @@ type Config struct { EnableLazySpiloUpgrade bool `name:"enable_lazy_spilo_upgrade" default:"false"` EnablePgVersionEnvVar bool `name:"enable_pgversion_env_var" default:"false"` EnableEBSGp3Migration bool `name:"enable_ebs_gp3_migration" default:"false"` - EnableEBSGp3MaxSize int32 `name:"enable_ebs_gp3_max_size" default:"1000"` + EnableEBSGp3MaxSize int64 `name:"enable_ebs_gp3_max_size" default:"1000"` } // MustMarshal marshals the config or panics diff --git a/pkg/util/volumes/ebs.go b/pkg/util/volumes/ebs.go index 899f609db..b6b185311 100644 --- a/pkg/util/volumes/ebs.go +++ b/pkg/util/volumes/ebs.go @@ -20,27 +20,27 @@ type EBSVolumeResizer struct { } // ConnectToProvider connects to AWS. -func (c *EBSVolumeResizer) ConnectToProvider() error { - sess, err := session.NewSession(&aws.Config{Region: aws.String(c.AWSRegion)}) +func (r *EBSVolumeResizer) ConnectToProvider() error { + sess, err := session.NewSession(&aws.Config{Region: aws.String(r.AWSRegion)}) if err != nil { return fmt.Errorf("could not establish AWS session: %v", err) } - c.connection = ec2.New(sess) + r.connection = ec2.New(sess) return nil } // IsConnectedToProvider checks if AWS connection is established. -func (c *EBSVolumeResizer) IsConnectedToProvider() bool { - return c.connection != nil +func (r *EBSVolumeResizer) IsConnectedToProvider() bool { + return r.connection != nil } // VolumeBelongsToProvider checks if the given persistent volume is backed by EBS. -func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool { +func (r *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool { return pv.Spec.AWSElasticBlockStore != nil && pv.Annotations[constants.VolumeStorateProvisionerAnnotation] == constants.EBSProvisioner } // GetProviderVolumeID converts aws://eu-central-1b/vol-00f93d4827217c629 to vol-00f93d4827217c629 for EBS volumes -func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { +func (r *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { volumeID := pv.Spec.AWSElasticBlockStore.VolumeID if volumeID == "" { return "", fmt.Errorf("volume id is empty for volume %q", pv.Name) @@ -52,10 +52,33 @@ func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, return volumeID[idx:], nil } +// DescribeVolumes ... +func (r *EBSVolumeResizer) DescribeVolumes(volumeIds []string) ([]VolumeProperties, error) { + if !r.IsConnectedToProvider() { + err := r.ConnectToProvider() + if err != nil { + return nil, err + } + } + + volumeOutput, err := r.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: aws.StringSlice((volumeIds))}) + + if err != nil { + return nil, err + } + + p := []VolumeProperties{} + + for _, v := range volumeOutput.Volumes { + p = append(p, VolumeProperties{VolumeID: *v.VolumeId, Size: *v.Size, VolumeType: *v.VolumeType, Iops: *v.Iops, Throughput: *v.Throughput}) + } + return p, nil +} + // ResizeVolume actually calls AWS API to resize the EBS volume if necessary. -func (c *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error { +func (r *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error { /* first check if the volume is already of a requested size */ - volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeID}}) + volumeOutput, err := r.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeID}}) if err != nil { return fmt.Errorf("could not get information about the volume: %v", err) } @@ -68,7 +91,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error { return nil } input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeID} - output, err := c.connection.ModifyVolume(&input) + output, err := r.connection.ModifyVolume(&input) if err != nil { return fmt.Errorf("could not modify persistent volume: %v", err) } @@ -87,7 +110,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error { in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeID}} return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout, func() (bool, error) { - out, err := c.connection.DescribeVolumesModifications(&in) + out, err := r.connection.DescribeVolumesModifications(&in) if err != nil { return false, fmt.Errorf("could not describe volume modification: %v", err) } @@ -103,9 +126,9 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error { } // ModifyVolume Modify EBS volume -func (c *EBSVolumeResizer) ModifyVolume(volumeID string, newType string, newSize int64, iops int64, throughput int64) error { +func (r *EBSVolumeResizer) ModifyVolume(volumeID string, newType string, newSize int64, iops int64, throughput int64) error { /* first check if the volume is already of a requested size */ - volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeID}}) + volumeOutput, err := r.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeID}}) if err != nil { return fmt.Errorf("could not get information about the volume: %v", err) } @@ -119,7 +142,7 @@ func (c *EBSVolumeResizer) ModifyVolume(volumeID string, newType string, newSize } input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeID, VolumeType: &newType, Iops: &iops, Throughput: &throughput} - output, err := c.connection.ModifyVolume(&input) + output, err := r.connection.ModifyVolume(&input) if err != nil { return fmt.Errorf("could not modify persistent volume: %v", err) } @@ -138,7 +161,7 @@ func (c *EBSVolumeResizer) ModifyVolume(volumeID string, newType string, newSize in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeID}} return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout, func() (bool, error) { - out, err := c.connection.DescribeVolumesModifications(&in) + out, err := r.connection.DescribeVolumesModifications(&in) if err != nil { return false, fmt.Errorf("could not describe volume modification: %v", err) } diff --git a/pkg/util/volumes/volumes.go b/pkg/util/volumes/volumes.go index 60383e5b6..fa60697a2 100644 --- a/pkg/util/volumes/volumes.go +++ b/pkg/util/volumes/volumes.go @@ -2,6 +2,15 @@ package volumes import v1 "k8s.io/api/core/v1" +// VolumeProperties ... +type VolumeProperties struct { + VolumeID string + VolumeType string + Size int64 + Iops int64 + Throughput int64 +} + // VolumeResizer defines the set of methods used to implememnt provider-specific resizing of persistent volumes. type VolumeResizer interface { ConnectToProvider() error @@ -11,4 +20,5 @@ type VolumeResizer interface { ResizeVolume(providerVolumeID string, newSize int64) error ModifyVolume(providerVolumeID string, newType string, newSize int64, iops int64, throughput int64) error DisconnectFromProvider() error + DescribeVolumes(providerVolumesID []string) ([]VolumeProperties, error) }