From 7b0ca31bfb00cf9a02ae1feee746501a6e1c47a7 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Tue, 6 Jun 2017 13:53:27 +0200 Subject: [PATCH] Implements EBS volume resizing #35. In order to support volumes different from EBS and filesystems other than EXT2/3/4 the respective code parts were implemented as interfaces. Adding the new resize for the volume or the filesystem will require implementing the interface, but no other changes in the cluster code itself. Volume resizing first changes the EBS and the filesystem, and only afterwards is reflected in the Kubernetes "PersistentVolume" object. This is done deliberately to be able to check if the volume needs resizing by peeking at the Size of the PersistentVolume structure. We recheck, nevertheless, in the EBSVolumeResizer, whether the actual EBS volume size doesn't match the spec, since call to the AWS ModifyVolume is counted against the resize limit of once every 6 hours, even for those calls that shouldn't result in an actual resize (i.e. when the size matches the one for the running volume). As a collateral, split the constants into multiple files, move the volume code into a separate file and fix minor issues related to the error reporting. --- glide.lock | 2 +- glide.yaml | 2 + pkg/cluster/cluster.go | 77 +++++----- pkg/{controller => cluster}/exec.go | 4 +- pkg/cluster/filesystems.go | 46 ++++++ pkg/cluster/k8sres.go | 4 +- pkg/cluster/pod.go | 35 ----- pkg/cluster/sync.go | 57 +++++--- pkg/cluster/util.go | 8 +- pkg/cluster/volumes.go | 178 +++++++++++++++++++++++ pkg/controller/postgresql.go | 4 +- pkg/controller/util.go | 1 + pkg/util/constants/annotations.go | 9 ++ pkg/util/constants/aws.go | 16 ++ pkg/util/constants/constants.go | 35 ----- pkg/util/constants/kubernetes.go | 12 ++ pkg/util/constants/postgresql.go | 9 ++ pkg/util/constants/roles.go | 14 ++ pkg/util/constants/thirdpartyresource.go | 9 ++ pkg/util/constants/units.go | 5 + pkg/util/filesystems/ext234.go | 38 +++++ pkg/util/filesystems/filesystems.go | 6 + pkg/util/teams/teams_test.go | 2 +- pkg/util/volumes/ebs.go | 101 +++++++++++++ pkg/util/volumes/volumes.go | 14 ++ 25 files changed, 554 insertions(+), 134 deletions(-) rename pkg/{controller => cluster}/exec.go (92%) create mode 100644 pkg/cluster/filesystems.go create mode 100644 pkg/cluster/volumes.go create mode 100644 pkg/util/constants/annotations.go create mode 100644 pkg/util/constants/aws.go delete mode 100644 pkg/util/constants/constants.go create mode 100644 pkg/util/constants/kubernetes.go create mode 100644 pkg/util/constants/postgresql.go create mode 100644 pkg/util/constants/roles.go create mode 100644 pkg/util/constants/thirdpartyresource.go create mode 100644 pkg/util/constants/units.go create mode 100644 pkg/util/filesystems/ext234.go create mode 100644 pkg/util/filesystems/filesystems.go create mode 100644 pkg/util/volumes/ebs.go create mode 100644 pkg/util/volumes/volumes.go diff --git a/glide.lock b/glide.lock index 3df271947..24db2b0d8 100644 --- a/glide.lock +++ b/glide.lock @@ -7,7 +7,7 @@ imports: - compute/metadata - internal - name: github.com/aws/aws-sdk-go - version: 63ce630574a5ec05ecd8e8de5cea16332a5a684d + version: e766cfe96ef7320817087fa4cd92c09abdb87310 subpackages: - aws - aws/awserr diff --git a/glide.yaml b/glide.yaml index b16cc5f03..12ad3c548 100644 --- a/glide.yaml +++ b/glide.yaml @@ -22,3 +22,5 @@ import: version: ee39d359dd0896c4c0eccf23f033f158ad3d3bd7 subpackages: - pkg/client/unversioned/remotecommand +- package: github.com/aws/aws-sdk-go + version: ^1.8.24 diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index af2c92857..854abfed4 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -26,6 +26,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/teams" "github.com/zalando-incubator/postgres-operator/pkg/util/users" + "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" ) var ( @@ -37,6 +38,7 @@ var ( type Config struct { KubeClient *kubernetes.Clientset //TODO: move clients to the better place? RestClient *rest.RESTClient + RestConfig *rest.Config TeamsAPIClient *teams.API OpConfig config.Config InfrastructureRoles map[string]spec.PgUser // inherited from the controller @@ -68,6 +70,13 @@ type Cluster struct { podEventsQueue *cache.FIFO } +type compareStatefulsetResult struct { + match bool + replace bool + rollingUpdate bool + reasons []string +} + func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} @@ -244,20 +253,24 @@ func (c *Cluster) sameVolumeWith(volume spec.Volume) (match bool, reason string) return } -func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (match, needsReplace, needsRollUpdate bool, reason string) { +func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *compareStatefulsetResult { + reasons := make([]string, 0) + var match, needsRollUpdate, needsReplace bool + match = true //TODO: improve me if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas { match = false - reason = "new statefulset's number of replicas doesn't match the current one" + reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one") } if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) { needsRollUpdate = true - reason = "new statefulset's container specification doesn't match the current one" + reasons = append(reasons, "new statefulset's container specification doesn't match the current one") } if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { + c.logger.Warnf("statefulset '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) - return + return &compareStatefulsetResult{} } // In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through // and the combined effect of all the changes should be applied. @@ -267,48 +280,44 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (matc if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName { needsReplace = true needsRollUpdate = true - reason = "new statefulset's serviceAccountName service asccount name doesn't match the current one" + reasons = append(reasons, "new statefulset's serviceAccountName service asccount name doesn't match the current one") } if *c.Statefulset.Spec.Template.Spec.TerminationGracePeriodSeconds != *statefulSet.Spec.Template.Spec.TerminationGracePeriodSeconds { needsReplace = true needsRollUpdate = true - reason = "new statefulset's terminationGracePeriodSeconds doesn't match the current one" + reasons = append(reasons, "new statefulset's terminationGracePeriodSeconds doesn't match the current one") } // Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta if !reflect.DeepEqual(c.Statefulset.Spec.Template.Labels, statefulSet.Spec.Template.Labels) { needsReplace = true needsRollUpdate = true - reason = "new statefulset's metadata labels doesn't match the current one" + reasons = append(reasons, "new statefulset's metadata labels doesn't match the current one") } if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) { needsRollUpdate = true needsReplace = true - reason = "new statefulset's metadata annotations doesn't match the current one" + reasons = append(reasons, "new statefulset's metadata annotations doesn't match the current one") } if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) { needsReplace = true - needsRollUpdate = true - reason = "new statefulset's volumeClaimTemplates contains different number of volumes to the old one" + reasons = append(reasons, "new statefulset's volumeClaimTemplates contains different number of volumes to the old one") } for i := 0; i < len(c.Statefulset.Spec.VolumeClaimTemplates); i++ { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name // Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta if name != statefulSet.Spec.VolumeClaimTemplates[i].Name { needsReplace = true - needsRollUpdate = true - reason = fmt.Sprintf("new statefulset's name for volume %d doesn't match the current one", i) + reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d doesn't match the current one", i)) continue } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { needsReplace = true - needsRollUpdate = true - reason = fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name) + reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name)) } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name needsReplace = true - needsRollUpdate = true - reason = fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name) + reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name)) } } @@ -316,28 +325,27 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (matc container2 := statefulSet.Spec.Template.Spec.Containers[0] if container1.Image != container2.Image { needsRollUpdate = true - reason = "new statefulset's container image doesn't match the current one" + reasons = append(reasons, "new statefulset's container image doesn't match the current one") } if !reflect.DeepEqual(container1.Ports, container2.Ports) { needsRollUpdate = true - reason = "new statefulset's container ports don't match the current one" + reasons = append(reasons, "new statefulset's container ports don't match the current one") } if !compareResources(&container1.Resources, &container2.Resources) { needsRollUpdate = true - reason = "new statefulset's container resources don't match the current ones" + reasons = append(reasons, "new statefulset's container resources don't match the current ones") } if !reflect.DeepEqual(container1.Env, container2.Env) { needsRollUpdate = true - reason = "new statefulset's container environment doesn't match the current one" + reasons = append(reasons, "new statefulset's container environment doesn't match the current one") } if needsRollUpdate || needsReplace { match = false } - - return + return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace} } func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) (equal bool) { @@ -387,22 +395,16 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.logger.Infof("service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta)) } - if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match { - c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason) - //TODO: update PVC - } - newStatefulSet, err := c.genStatefulSet(newSpec.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } + cmp := c.compareStatefulSetWith(newStatefulSet) - sameSS, needsReplace, rollingUpdate, reason := c.compareStatefulSetWith(newStatefulSet) - - if !sameSS { - c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, reason) + if !cmp.match { + c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, cmp.reasons) //TODO: mind the case of updating allowedSourceRanges - if !needsReplace { + if !cmp.replace { if err := c.updateStatefulSet(newStatefulSet); err != nil { c.setStatus(spec.ClusterStatusUpdateFailed) return fmt.Errorf("could not upate statefulset: %v", err) @@ -423,7 +425,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { //TODO: rewrite pg version in tpr spec } - if rollingUpdate { + if cmp.rollingUpdate { c.logger.Infof("Rolling update is needed") // TODO: wait for actual streaming to the replica if err := c.recreatePods(); err != nil { @@ -432,6 +434,15 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } c.logger.Infof("Rolling update has been finished") } + + if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match { + c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason) + if err := c.resizeVolumes(newSpec.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { + return fmt.Errorf("Could not update volumes: %v", err) + } + c.logger.Infof("volumes have been updated successfully") + } + c.setStatus(spec.ClusterStatusRunning) return nil diff --git a/pkg/controller/exec.go b/pkg/cluster/exec.go similarity index 92% rename from pkg/controller/exec.go rename to pkg/cluster/exec.go index 2d0af6818..fbd913c21 100644 --- a/pkg/controller/exec.go +++ b/pkg/cluster/exec.go @@ -1,4 +1,4 @@ -package controller +package cluster import ( "bytes" @@ -11,7 +11,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" ) -func (c *Controller) ExecCommand(podName spec.NamespacedName, command []string) (string, error) { +func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (string, error) { var ( execOut bytes.Buffer execErr bytes.Buffer diff --git a/pkg/cluster/filesystems.go b/pkg/cluster/filesystems.go new file mode 100644 index 000000000..fe636ed95 --- /dev/null +++ b/pkg/cluster/filesystems.go @@ -0,0 +1,46 @@ +package cluster + +import ( + "fmt" + "strings" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/filesystems" +) + +func (c *Cluster) getPostgresFilesystemInfo(podName *spec.NamespacedName) (device, fstype string, err error) { + out, err := c.ExecCommand(podName, "bash", "-c", fmt.Sprintf("df -T %s|tail -1", constants.PostgresDataMount)) + if err != nil { + return "", "", err + } + fields := strings.Fields(out) + if len(fields) < 2 { + return "", "", fmt.Errorf("too few fields in the df output") + } + + return fields[0], fields[1], nil +} + +func (c *Cluster) resizePostgresFilesystem(podName *spec.NamespacedName, resizers []filesystems.FilesystemResizer) error { + // resize2fs always writes to stderr, and ExecCommand considers a non-empty stderr an error + // first, determine the device and the filesystem + deviceName, fsType, err := c.getPostgresFilesystemInfo(podName) + if err != nil { + return fmt.Errorf("could not get device and type for the postgres filesystem: %v", err) + } + for _, resizer := range resizers { + if !resizer.CanResizeFilesystem(fsType) { + continue + } + err := resizer.ResizeFilesystem(deviceName, func(cmd string) (out string, err error) { + return c.ExecCommand(podName, "bash", "-c", cmd) + }) + + if err != nil { + return err + } + return nil + } + return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %s", fsType) +} diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 035d9cef6..203afb26e 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -213,7 +213,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, }, { Name: "PGROOT", - Value: "/home/postgres/pgdata/pgroot", + Value: constants.PostgresDataPath, }, { Name: "ETCD_HOST", @@ -293,7 +293,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, VolumeMounts: []v1.VolumeMount{ { Name: constants.DataVolumeName, - MountPath: "/home/postgres/pgdata", //TODO: fetch from manifesto + MountPath: constants.PostgresDataMount, //TODO: fetch from manifesto }, }, Env: envVars, diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 363074d73..49497d929 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -24,19 +24,6 @@ func (c *Cluster) listPods() ([]v1.Pod, error) { return pods.Items, nil } -func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { - ns := c.Metadata.Namespace - listOptions := v1.ListOptions{ - LabelSelector: c.labelsSet().String(), - } - - pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(listOptions) - if err != nil { - return nil, fmt.Errorf("could not get list of PersistentVolumeClaims: %v", err) - } - return pvcs.Items, nil -} - func (c *Cluster) deletePods() error { c.logger.Debugln("Deleting pods") pods, err := c.listPods() @@ -63,28 +50,6 @@ func (c *Cluster) deletePods() error { return nil } -func (c *Cluster) deletePersistenVolumeClaims() error { - c.logger.Debugln("Deleting PVCs") - ns := c.Metadata.Namespace - pvcs, err := c.listPersistentVolumeClaims() - if err != nil { - return err - } - for _, pvc := range pvcs { - c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) - if err := c.KubeClient.PersistentVolumeClaims(ns).Delete(pvc.Name, c.deleteOptions); err != nil { - c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) - } - } - if len(pvcs) > 0 { - c.logger.Debugln("PVCs have been deleted") - } else { - c.logger.Debugln("No PVCs to delete") - } - - return nil -} - func (c *Cluster) deletePod(podName spec.NamespacedName) error { ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 3ac35a274..bdc108a68 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -5,6 +5,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" + "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" ) func (c *Cluster) Sync() error { @@ -44,18 +45,22 @@ func (c *Cluster) Sync() error { } } - if c.databaseAccessDisabled() { - return nil - } - if err := c.initDbConn(); err != nil { - return fmt.Errorf("could not init db connection: %v", err) - } else { - c.logger.Debugf("Syncing roles") - if err := c.SyncRoles(); err != nil { - return fmt.Errorf("could not sync roles: %v", err) + if !c.databaseAccessDisabled() { + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init db connection: %v", err) + } else { + c.logger.Debugf("Syncing roles") + if err := c.SyncRoles(); err != nil { + return fmt.Errorf("could not sync roles: %v", err) + } } } + c.logger.Debugf("Syncing persistent volumes") + if err := c.SyncVolumes(); err != nil { + return fmt.Errorf("could not sync persistent volumes: %v", err) + } + return nil } @@ -114,7 +119,7 @@ func (c *Cluster) syncEndpoint() error { func (c *Cluster) syncStatefulSet() error { cSpec := c.Spec - var rollUpdate, needsReplace bool + var rollUpdate bool if c.Statefulset == nil { c.logger.Infof("could not find the cluster's statefulset") pods, err := c.listPods() @@ -139,24 +144,20 @@ func (c *Cluster) syncStatefulSet() error { return nil } } + /* TODO: should check that we need to replace the statefulset */ if !rollUpdate { - var ( - match bool - reason string - ) - desiredSS, err := c.genStatefulSet(cSpec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } - match, needsReplace, rollUpdate, reason = c.compareStatefulSetWith(desiredSS) - if match { + cmp := c.compareStatefulSetWith(desiredSS) + if cmp.match { return nil } - c.logStatefulSetChanges(c.Statefulset, desiredSS, false, reason) + c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) - if !needsReplace { + if !cmp.replace { if err := c.updateStatefulSet(desiredSS); err != nil { return fmt.Errorf("could not update statefulset: %v", err) } @@ -166,7 +167,7 @@ func (c *Cluster) syncStatefulSet() error { } } - if !rollUpdate { + if !cmp.rollingUpdate { c.logger.Debugln("No rolling update is needed") return nil } @@ -199,3 +200,19 @@ func (c *Cluster) SyncRoles() error { } return nil } + +/* SyncVolume reads all persistent volumes and checks that their size matches the one declared in the statefulset */ +func (c *Cluster) SyncVolumes() error { + act, err := c.VolumesNeedResizing(c.Spec.Volume) + if err != nil { + return fmt.Errorf("could not compare size of the volumes: %v", err) + } + if !act { + return nil + } + if err := c.resizeVolumes(c.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { + return fmt.Errorf("Could not sync volumes: %v", err) + } + c.logger.Infof("volumes have been synced successfully") + return nil +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 2302e1d47..0212c103f 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -63,7 +63,7 @@ func specPatch(spec interface{}) ([]byte, error) { }{spec}) } -func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reason string) { +func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { if isUpdate { c.logger.Infof("statefulset '%s' has been changed", util.NameFromMeta(old.ObjectMeta), @@ -75,8 +75,10 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate } c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) - if reason != "" { - c.logger.Infof("Reason: %s", reason) + if len(reasons) > 0 { + for _, reason := range reasons { + c.logger.Infof("Reason: %s", reason) + } } } diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go new file mode 100644 index 000000000..c54859665 --- /dev/null +++ b/pkg/cluster/volumes.go @@ -0,0 +1,178 @@ +package cluster + +import ( + "fmt" + "strconv" + "strings" + + "k8s.io/client-go/pkg/api/resource" + "k8s.io/client-go/pkg/api/v1" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util" + "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/filesystems" + "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" +) + +func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { + ns := c.Metadata.Namespace + listOptions := v1.ListOptions{ + LabelSelector: c.labelsSet().String(), + } + + pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(listOptions) + if err != nil { + return nil, fmt.Errorf("could not list of PersistentVolumeClaims: %v", err) + } + return pvcs.Items, nil +} + +func (c *Cluster) deletePersistenVolumeClaims() error { + c.logger.Debugln("Deleting PVCs") + pvcs, err := c.listPersistentVolumeClaims() + if err != nil { + return err + } + for _, pvc := range pvcs { + c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) + if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { + c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) + } + } + if len(pvcs) > 0 { + c.logger.Debugln("PVCs have been deleted") + } else { + c.logger.Debugln("No PVCs to delete") + } + + return nil +} + +func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { + result := make([]*v1.PersistentVolume, 0) + + pvcs, err := c.listPersistentVolumeClaims() + if err != nil { + return nil, fmt.Errorf("could not list cluster's PersistentVolumeClaims: %v", err) + } + lastPodIndex := *c.Statefulset.Spec.Replicas - 1 + for _, pvc := range pvcs { + lastDash := strings.LastIndex(pvc.Name, "-") + if lastDash > 0 && lastDash < len(pvc.Name)-1 { + if pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]); err != nil { + return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) + } else { + if int32(pvcNumber) > lastPodIndex { + c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) + continue + } + } + + } + pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName) + if err != nil { + return nil, fmt.Errorf("could not get PersistentVolume: %v", err) + } + result = append(result, pv) + } + + return result, nil +} + +// resizeVolumes resize persistent volumes compatible with the given resizer interface +func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.VolumeResizer) error { + totalCompatible := 0 + newQuantity, err := resource.ParseQuantity(newVolume.Size) + if err != nil { + return fmt.Errorf("could not parse volume size: %v", err) + } + pvs, newSize, err := c.listVolumesWithManifestSize(newVolume) + if err != nil { + return fmt.Errorf("could not list persistent volumes: %v", err) + } + for _, pv := range pvs { + volumeSize := quantityToGigabyte(pv.Spec.Capacity[v1.ResourceStorage]) + if volumeSize > newSize { + return fmt.Errorf("cannot shrink persistent volume") + } + if volumeSize == newSize { + continue + } + for _, resizer := range resizers { + if !resizer.VolumeBelongsToProvider(pv) { + continue + } + totalCompatible += 1 + if !resizer.IsConnectedToProvider() { + err := resizer.ConnectToProvider() + if err != nil { + return fmt.Errorf("could not connect to the volume provider: %v", err) + } + defer resizer.DisconnectFromProvider() + } + awsVolumeId, err := resizer.GetProviderVolumeID(pv) + if err != nil { + return err + } + c.logger.Debugf("updating persistent volume %s to %d", pv.Name, newSize) + if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { + return fmt.Errorf("could not resize EBS volume %s: %v", awsVolumeId, err) + } + c.logger.Debugf("resizing the filesystem on the volume %s", pv.Name) + podName := getPodNameFromPersistentVolume(pv) + if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { + return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) + } + c.logger.Debugf("filesystem resize successfull on volume %s", pv.Name) + pv.Spec.Capacity[v1.ResourceStorage] = newQuantity + c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) + if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { + return fmt.Errorf("could not update persistent volume: %s", err) + } + c.logger.Debugf("successfully updated persistent volume %s", pv.Name) + } + } + if len(pvs) > 0 && totalCompatible == 0 { + return fmt.Errorf("could not resize EBS volumes: persistent volumes are not compatible with existing resizing providers") + } + return nil +} + +func (c *Cluster) VolumesNeedResizing(newVolume spec.Volume) (bool, error) { + volumes, manifestSize, err := c.listVolumesWithManifestSize(newVolume) + if err != nil { + return false, err + } + for _, pv := range volumes { + currentSize := quantityToGigabyte(pv.Spec.Capacity[v1.ResourceStorage]) + if currentSize != manifestSize { + return true, nil + } + } + return false, nil +} + +func (c *Cluster) listVolumesWithManifestSize(newVolume spec.Volume) ([]*v1.PersistentVolume, int64, error) { + newSize, err := resource.ParseQuantity(newVolume.Size) + if err != nil { + return nil, 0, fmt.Errorf("could not parse volume size from the manifest: %v", err) + } + manifestSize := quantityToGigabyte(newSize) + volumes, err := c.listPersistentVolumes() + if err != nil { + return nil, 0, fmt.Errorf("could not list persistent volumes: %v", err) + } + return volumes, manifestSize, nil +} + +// getPodNameFromPersistentVolume returns a pod name that it extracts from the volume claim ref. +func getPodNameFromPersistentVolume(pv *v1.PersistentVolume) *spec.NamespacedName { + namespace := pv.Spec.ClaimRef.Namespace + name := pv.Spec.ClaimRef.Name[len(constants.DataVolumeName)+1:] + return &spec.NamespacedName{namespace, name} +} + +func quantityToGigabyte(q resource.Quantity) int64 { + return q.ScaledValue(0) / (1 * constants.Gigabyte) +} diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 5e1919ecb..d8e0e9892 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -169,8 +169,8 @@ func (c *Controller) processEvent(obj interface{}) error { } if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) - logger.Errorf("%v", cl) + cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) + logger.Errorf("%v", cl.Error) return nil } cl.Error = nil diff --git a/pkg/controller/util.go b/pkg/controller/util.go index fcf15ec06..714128d51 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -23,6 +23,7 @@ func (c *Controller) makeClusterConfig() cluster.Config { return cluster.Config{ KubeClient: c.KubeClient, RestClient: c.RestClient, + RestConfig: c.RestConfig, TeamsAPIClient: c.TeamsAPIClient, OpConfig: config.Copy(c.opConfig), InfrastructureRoles: infrastructureRoles, diff --git a/pkg/util/constants/annotations.go b/pkg/util/constants/annotations.go new file mode 100644 index 000000000..5a2e625d1 --- /dev/null +++ b/pkg/util/constants/annotations.go @@ -0,0 +1,9 @@ +package constants + +const ( + ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" + ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" + ElbTimeoutAnnotationValue = "3600" + KubeIAmAnnotation = "iam.amazonaws.com/role" + VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" +) diff --git a/pkg/util/constants/aws.go b/pkg/util/constants/aws.go new file mode 100644 index 000000000..5140a1729 --- /dev/null +++ b/pkg/util/constants/aws.go @@ -0,0 +1,16 @@ +package constants + +import "time" + +const ( + AWS_REGION = "eu-central-1" + EBSVolumeIDStart = "/vol-" + EBSProvisioner = "kubernetes.io/aws-ebs" + //https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_VolumeModification.html + EBSVolumeStateModifying = "modifying" + EBSVolumeStateOptimizing = "optimizing" + EBSVolumeStateFailed = "failed" + EBSVolumeStateCompleted = "completed" + EBSVolumeResizeWaitInterval = 2 * time.Second + EBSVolumeResizeWaitTimeout = 30 * time.Second +) diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go deleted file mode 100644 index df8c5bc98..000000000 --- a/pkg/util/constants/constants.go +++ /dev/null @@ -1,35 +0,0 @@ -package constants - -import "time" - -const ( - TPRName = "postgresql" - TPRVendor = "acid.zalan.do" - TPRDescription = "Managed PostgreSQL clusters" - TPRApiVersion = "v1" - ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace - WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace - K8sVersion = "v1" - K8sAPIPath = "/api" - DataVolumeName = "pgdata" - PasswordLength = 64 - UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName - ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" - ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" - ElbTimeoutAnnotationValue = "3600" - KubeIAmAnnotation = "iam.amazonaws.com/role" - ResourceName = TPRName + "s" - PodRoleMaster = "master" - PodRoleReplica = "replica" - SuperuserKeyName = "superuser" - ReplicationUserKeyName = "replication" - StatefulsetDeletionInterval = 1 * time.Second - StatefulsetDeletionTimeout = 30 * time.Second - - RoleFlagSuperuser = "SUPERUSER" - RoleFlagInherit = "INHERIT" - RoleFlagLogin = "LOGIN" - RoleFlagNoLogin = "NOLOGIN" - RoleFlagCreateRole = "CREATEROLE" - RoleFlagCreateDB = "CREATEDB" -) diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go new file mode 100644 index 000000000..0341d3734 --- /dev/null +++ b/pkg/util/constants/kubernetes.go @@ -0,0 +1,12 @@ +package constants + +import "time" + +const ( + ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace + WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace + K8sVersion = "v1" + K8sAPIPath = "/api" + StatefulsetDeletionInterval = 1 * time.Second + StatefulsetDeletionTimeout = 30 * time.Second +) diff --git a/pkg/util/constants/postgresql.go b/pkg/util/constants/postgresql.go new file mode 100644 index 000000000..c27802946 --- /dev/null +++ b/pkg/util/constants/postgresql.go @@ -0,0 +1,9 @@ +package constants + +const ( + DataVolumeName = "pgdata" + PodRoleMaster = "master" + PodRoleReplica = "replica" + PostgresDataMount = "/home/postgres/pgdata" + PostgresDataPath = PostgresDataMount + "/pgroot" +) diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go new file mode 100644 index 000000000..85fb42b1b --- /dev/null +++ b/pkg/util/constants/roles.go @@ -0,0 +1,14 @@ +package constants + +const ( + PasswordLength = 64 + UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName + SuperuserKeyName = "superuser" + ReplicationUserKeyName = "replication" + RoleFlagSuperuser = "SUPERUSER" + RoleFlagInherit = "INHERIT" + RoleFlagLogin = "LOGIN" + RoleFlagNoLogin = "NOLOGIN" + RoleFlagCreateRole = "CREATEROLE" + RoleFlagCreateDB = "CREATEDB" +) diff --git a/pkg/util/constants/thirdpartyresource.go b/pkg/util/constants/thirdpartyresource.go new file mode 100644 index 000000000..64d702bc6 --- /dev/null +++ b/pkg/util/constants/thirdpartyresource.go @@ -0,0 +1,9 @@ +package constants + +const ( + TPRName = "postgresql" + TPRVendor = "acid.zalan.do" + TPRDescription = "Managed PostgreSQL clusters" + TPRApiVersion = "v1" + ResourceName = TPRName + "s" +) diff --git a/pkg/util/constants/units.go b/pkg/util/constants/units.go new file mode 100644 index 000000000..4ec3b2511 --- /dev/null +++ b/pkg/util/constants/units.go @@ -0,0 +1,5 @@ +package constants + +const ( + Gigabyte = 1073741824 +) diff --git a/pkg/util/filesystems/ext234.go b/pkg/util/filesystems/ext234.go new file mode 100644 index 000000000..7d9215453 --- /dev/null +++ b/pkg/util/filesystems/ext234.go @@ -0,0 +1,38 @@ +package filesystems + +import ( + "fmt" + "regexp" + "strings" +) + +var ( + ext2fsSuccessRegexp = regexp.MustCompile(`The filesystem on [/a-z0-9]+ is now \d+ \(\d+\w+\) blocks long.`) +) + +const ( + EXT2 = "ext2" + EXT3 = "ext3" + EXT4 = "ext4" + resize2fs = "resize2fs" +) + +type Ext234Resize struct { +} + +func (c *Ext234Resize) CanResizeFilesystem(fstype string) bool { + return fstype == EXT2 || fstype == EXT3 || fstype == EXT4 +} + +func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func(cmd string) (out string, err error)) error { + command := fmt.Sprintf("%s %s 2>&1", resize2fs, deviceName) + out, err := commandExecutor(command) + if err != nil { + return err + } + if strings.Contains(out, "Nothing to do") || + (strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) { + return nil + } + return fmt.Errorf("unrecognized output: %s, assuming error", out) +} diff --git a/pkg/util/filesystems/filesystems.go b/pkg/util/filesystems/filesystems.go new file mode 100644 index 000000000..dba33ae40 --- /dev/null +++ b/pkg/util/filesystems/filesystems.go @@ -0,0 +1,6 @@ +package filesystems + +type FilesystemResizer interface { + CanResizeFilesystem(fstype string) bool + ResizeFilesystem(deviceName string, commandExecutor func(string) (out string, err error)) error +} diff --git a/pkg/util/teams/teams_test.go b/pkg/util/teams/teams_test.go index 0b95524fb..b17927362 100644 --- a/pkg/util/teams/teams_test.go +++ b/pkg/util/teams/teams_test.go @@ -144,7 +144,7 @@ func TestInfo(t *testing.T) { for _, tc := range teamsAPItc { func() { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Authorization") != "Bearer " + token { + if r.Header.Get("Authorization") != "Bearer "+token { t.Errorf("Authorization token is wrong or not provided") } w.WriteHeader(tc.inCode) diff --git a/pkg/util/volumes/ebs.go b/pkg/util/volumes/ebs.go new file mode 100644 index 000000000..4142820f1 --- /dev/null +++ b/pkg/util/volumes/ebs.go @@ -0,0 +1,101 @@ +package volumes + +import ( + "fmt" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + + "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" + "k8s.io/client-go/pkg/api/v1" +) + +type EBSVolumeResizer struct { + connection *ec2.EC2 +} + +func (c *EBSVolumeResizer) ConnectToProvider() error { + sess, err := session.NewSession(&aws.Config{Region: aws.String(constants.AWS_REGION)}) + if err != nil { + return fmt.Errorf("could not establish AWS session: %v", err) + } + c.connection = ec2.New(sess) + return nil +} + +func (c *EBSVolumeResizer) IsConnectedToProvider() bool { + return c.connection != nil +} + +func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool { + return pv.Spec.AWSElasticBlockStore != nil && pv.Annotations[constants.VolumeStorateProvisionerAnnotation] == constants.EBSProvisioner +} + +// GetProviderVolumeID converts aws://eu-central-1b/vol-00f93d4827217c629 to vol-00f93d4827217c629 for EBS volumes +func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { + volumeID := pv.Spec.AWSElasticBlockStore.VolumeID + if volumeID == "" { + return "", fmt.Errorf("volume id is empty for volume %s", pv.Name) + } + idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1 + if idx == 0 { + return "", fmt.Errorf("malfored EBS volume id %s", volumeID) + } + return volumeID[idx:], nil +} + +func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { + /* first check if the volume is already of a requested size */ + volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeId}}) + if err != nil { + return fmt.Errorf("could not get information about the volume: %v", err) + } + vol := volumeOutput.Volumes[0] + if *vol.VolumeId != volumeId { + return fmt.Errorf("describe volume %s returned information about a non-matching volume %s", volumeId, *vol.VolumeId) + } + if *vol.Size == newSize { + // nothing to do + return nil + } + input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeId} + output, err := c.connection.ModifyVolume(&input) + if err != nil { + return fmt.Errorf("could not modify persistent volume: %v", err) + } + + state := *output.VolumeModification.ModificationState + if state == constants.EBSVolumeStateFailed { + return fmt.Errorf("could not modify persistent volume %s: modification state failed", volumeId) + } + if state == "" { + return fmt.Errorf("received empty modification status") + } + if state == constants.EBSVolumeStateOptimizing || state == constants.EBSVolumeStateCompleted { + return nil + } + // wait until the volume reaches the "optimizing" or "completed" state + in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeId}} + return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout, + func() (bool, error) { + out, err := c.connection.DescribeVolumesModifications(&in) + if err != nil { + return false, fmt.Errorf("could not describe volume modification: %v", err) + } + if len(out.VolumesModifications) != 1 { + return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) + } + if *out.VolumesModifications[0].VolumeId != volumeId { + return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"") + } + return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil + }) +} + +func (c *EBSVolumeResizer) DisconnectFromProvider() error { + c.connection = nil + return nil +} diff --git a/pkg/util/volumes/volumes.go b/pkg/util/volumes/volumes.go new file mode 100644 index 000000000..f35d0b5d3 --- /dev/null +++ b/pkg/util/volumes/volumes.go @@ -0,0 +1,14 @@ +package volumes + +import ( + "k8s.io/client-go/pkg/api/v1" +) + +type VolumeResizer interface { + ConnectToProvider() error + IsConnectedToProvider() bool + VolumeBelongsToProvider(pv *v1.PersistentVolume) bool + GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) + ResizeVolume(providerVolumeId string, newSize int64) error + DisconnectFromProvider() error +}