Default volume migration done.

This commit is contained in:
Jan Mußler 2020-12-07 17:52:02 +01:00
parent 6bb78c8de2
commit 9e095e49d8
6 changed files with 83 additions and 25 deletions

View File

@ -65,14 +65,6 @@ type kubeResources struct {
//PVCs are treated separately //PVCs are treated separately
} }
type EBSVolume struct {
volumeId string
volumeType string
size int64
iops int32
throughput int32
}
// Cluster describes postgresql cluster // Cluster describes postgresql cluster
type Cluster struct { type Cluster struct {
kubeResources kubeResources
@ -98,7 +90,7 @@ type Cluster struct {
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
EBSVolumes map[string]EBSVolume EBSVolumes map[string]volumes.VolumeProperties
VolumeResizer volumes.VolumeResizer VolumeResizer volumes.VolumeResizer
} }

View File

@ -55,7 +55,12 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
c.logger.Debugf("syncing volumes using %q storage resize mode", c.OpConfig.StorageResizeMode) c.logger.Debugf("syncing volumes using %q storage resize mode", c.OpConfig.StorageResizeMode)
if c.OpConfig.EnableEBSGp3Migration {
c.executeEBSMigration()
}
if c.OpConfig.StorageResizeMode == "mixed" { if c.OpConfig.StorageResizeMode == "mixed" {
// mixed op uses AWS API to adjust size,throughput,iops and calls pvc chance for file system resize
// resize pvc to adjust filesystem size until better K8s support // resize pvc to adjust filesystem size until better K8s support
if err = c.syncVolumeClaims(); err != nil { if err = c.syncVolumeClaims(); err != nil {

View File

@ -251,3 +251,31 @@ func getPodNameFromPersistentVolume(pv *v1.PersistentVolume) *spec.NamespacedNam
func quantityToGigabyte(q resource.Quantity) int64 { func quantityToGigabyte(q resource.Quantity) int64 {
return q.ScaledValue(0) / (1 * constants.Gigabyte) return q.ScaledValue(0) / (1 * constants.Gigabyte)
} }
func (c *Cluster) executeEBSMigration() error {
if !c.OpConfig.EnableEBSGp3Migration {
return nil
}
pvs, _, err := c.listVolumesWithManifestSize(c.Spec.Volume)
if err != nil {
return fmt.Errorf("could not list persistent volumes: %v", err)
}
volumeIds := []string{}
for _, pv := range pvs {
volumeIds = append(volumeIds, pv.Spec.AWSElasticBlockStore.VolumeID)
}
awsVolumes, err := c.VolumeResizer.DescribeVolumes(volumeIds)
for _, volume := range awsVolumes {
if volume.VolumeType == "gp2" && volume.Size < c.OpConfig.EnableEBSGp3MaxSize {
c.logger.Info("Modifying EBS volume %s to type gp3 (%s)", volume.VolumeID, volume.Size)
c.VolumeResizer.ModifyVolume(volume.VolumeID, "gp3", volume.Size, 3000, 125)
}
c.EBSVolumes[volume.VolumeID] = volume
}
return nil
}

View File

@ -199,7 +199,7 @@ type Config struct {
EnableLazySpiloUpgrade bool `name:"enable_lazy_spilo_upgrade" default:"false"` EnableLazySpiloUpgrade bool `name:"enable_lazy_spilo_upgrade" default:"false"`
EnablePgVersionEnvVar bool `name:"enable_pgversion_env_var" default:"false"` EnablePgVersionEnvVar bool `name:"enable_pgversion_env_var" default:"false"`
EnableEBSGp3Migration bool `name:"enable_ebs_gp3_migration" default:"false"` EnableEBSGp3Migration bool `name:"enable_ebs_gp3_migration" default:"false"`
EnableEBSGp3MaxSize int32 `name:"enable_ebs_gp3_max_size" default:"1000"` EnableEBSGp3MaxSize int64 `name:"enable_ebs_gp3_max_size" default:"1000"`
} }
// MustMarshal marshals the config or panics // MustMarshal marshals the config or panics

View File

@ -20,27 +20,27 @@ type EBSVolumeResizer struct {
} }
// ConnectToProvider connects to AWS. // ConnectToProvider connects to AWS.
func (c *EBSVolumeResizer) ConnectToProvider() error { func (r *EBSVolumeResizer) ConnectToProvider() error {
sess, err := session.NewSession(&aws.Config{Region: aws.String(c.AWSRegion)}) sess, err := session.NewSession(&aws.Config{Region: aws.String(r.AWSRegion)})
if err != nil { if err != nil {
return fmt.Errorf("could not establish AWS session: %v", err) return fmt.Errorf("could not establish AWS session: %v", err)
} }
c.connection = ec2.New(sess) r.connection = ec2.New(sess)
return nil return nil
} }
// IsConnectedToProvider checks if AWS connection is established. // IsConnectedToProvider checks if AWS connection is established.
func (c *EBSVolumeResizer) IsConnectedToProvider() bool { func (r *EBSVolumeResizer) IsConnectedToProvider() bool {
return c.connection != nil return r.connection != nil
} }
// VolumeBelongsToProvider checks if the given persistent volume is backed by EBS. // VolumeBelongsToProvider checks if the given persistent volume is backed by EBS.
func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool { func (r *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool {
return pv.Spec.AWSElasticBlockStore != nil && pv.Annotations[constants.VolumeStorateProvisionerAnnotation] == constants.EBSProvisioner return pv.Spec.AWSElasticBlockStore != nil && pv.Annotations[constants.VolumeStorateProvisionerAnnotation] == constants.EBSProvisioner
} }
// GetProviderVolumeID converts aws://eu-central-1b/vol-00f93d4827217c629 to vol-00f93d4827217c629 for EBS volumes // GetProviderVolumeID converts aws://eu-central-1b/vol-00f93d4827217c629 to vol-00f93d4827217c629 for EBS volumes
func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { func (r *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) {
volumeID := pv.Spec.AWSElasticBlockStore.VolumeID volumeID := pv.Spec.AWSElasticBlockStore.VolumeID
if volumeID == "" { if volumeID == "" {
return "", fmt.Errorf("volume id is empty for volume %q", pv.Name) return "", fmt.Errorf("volume id is empty for volume %q", pv.Name)
@ -52,10 +52,33 @@ func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string,
return volumeID[idx:], nil return volumeID[idx:], nil
} }
// DescribeVolumes ...
func (r *EBSVolumeResizer) DescribeVolumes(volumeIds []string) ([]VolumeProperties, error) {
if !r.IsConnectedToProvider() {
err := r.ConnectToProvider()
if err != nil {
return nil, err
}
}
volumeOutput, err := r.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: aws.StringSlice((volumeIds))})
if err != nil {
return nil, err
}
p := []VolumeProperties{}
for _, v := range volumeOutput.Volumes {
p = append(p, VolumeProperties{VolumeID: *v.VolumeId, Size: *v.Size, VolumeType: *v.VolumeType, Iops: *v.Iops, Throughput: *v.Throughput})
}
return p, nil
}
// ResizeVolume actually calls AWS API to resize the EBS volume if necessary. // ResizeVolume actually calls AWS API to resize the EBS volume if necessary.
func (c *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error { func (r *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error {
/* first check if the volume is already of a requested size */ /* first check if the volume is already of a requested size */
volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeID}}) volumeOutput, err := r.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeID}})
if err != nil { if err != nil {
return fmt.Errorf("could not get information about the volume: %v", err) return fmt.Errorf("could not get information about the volume: %v", err)
} }
@ -68,7 +91,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error {
return nil return nil
} }
input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeID} input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeID}
output, err := c.connection.ModifyVolume(&input) output, err := r.connection.ModifyVolume(&input)
if err != nil { if err != nil {
return fmt.Errorf("could not modify persistent volume: %v", err) return fmt.Errorf("could not modify persistent volume: %v", err)
} }
@ -87,7 +110,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error {
in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeID}} in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeID}}
return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout, return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout,
func() (bool, error) { func() (bool, error) {
out, err := c.connection.DescribeVolumesModifications(&in) out, err := r.connection.DescribeVolumesModifications(&in)
if err != nil { if err != nil {
return false, fmt.Errorf("could not describe volume modification: %v", err) return false, fmt.Errorf("could not describe volume modification: %v", err)
} }
@ -103,9 +126,9 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error {
} }
// ModifyVolume Modify EBS volume // ModifyVolume Modify EBS volume
func (c *EBSVolumeResizer) ModifyVolume(volumeID string, newType string, newSize int64, iops int64, throughput int64) error { func (r *EBSVolumeResizer) ModifyVolume(volumeID string, newType string, newSize int64, iops int64, throughput int64) error {
/* first check if the volume is already of a requested size */ /* first check if the volume is already of a requested size */
volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeID}}) volumeOutput, err := r.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeID}})
if err != nil { if err != nil {
return fmt.Errorf("could not get information about the volume: %v", err) return fmt.Errorf("could not get information about the volume: %v", err)
} }
@ -119,7 +142,7 @@ func (c *EBSVolumeResizer) ModifyVolume(volumeID string, newType string, newSize
} }
input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeID, VolumeType: &newType, Iops: &iops, Throughput: &throughput} input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeID, VolumeType: &newType, Iops: &iops, Throughput: &throughput}
output, err := c.connection.ModifyVolume(&input) output, err := r.connection.ModifyVolume(&input)
if err != nil { if err != nil {
return fmt.Errorf("could not modify persistent volume: %v", err) return fmt.Errorf("could not modify persistent volume: %v", err)
} }
@ -138,7 +161,7 @@ func (c *EBSVolumeResizer) ModifyVolume(volumeID string, newType string, newSize
in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeID}} in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeID}}
return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout, return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout,
func() (bool, error) { func() (bool, error) {
out, err := c.connection.DescribeVolumesModifications(&in) out, err := r.connection.DescribeVolumesModifications(&in)
if err != nil { if err != nil {
return false, fmt.Errorf("could not describe volume modification: %v", err) return false, fmt.Errorf("could not describe volume modification: %v", err)
} }

View File

@ -2,6 +2,15 @@ package volumes
import v1 "k8s.io/api/core/v1" import v1 "k8s.io/api/core/v1"
// VolumeProperties ...
type VolumeProperties struct {
VolumeID string
VolumeType string
Size int64
Iops int64
Throughput int64
}
// VolumeResizer defines the set of methods used to implememnt provider-specific resizing of persistent volumes. // VolumeResizer defines the set of methods used to implememnt provider-specific resizing of persistent volumes.
type VolumeResizer interface { type VolumeResizer interface {
ConnectToProvider() error ConnectToProvider() error
@ -11,4 +20,5 @@ type VolumeResizer interface {
ResizeVolume(providerVolumeID string, newSize int64) error ResizeVolume(providerVolumeID string, newSize int64) error
ModifyVolume(providerVolumeID string, newType string, newSize int64, iops int64, throughput int64) error ModifyVolume(providerVolumeID string, newType string, newSize int64, iops int64, throughput int64) error
DisconnectFromProvider() error DisconnectFromProvider() error
DescribeVolumes(providerVolumesID []string) ([]VolumeProperties, error)
} }