diff --git a/glide.lock b/glide.lock index 3df271947..24db2b0d8 100644 --- a/glide.lock +++ b/glide.lock @@ -7,7 +7,7 @@ imports: - compute/metadata - internal - name: github.com/aws/aws-sdk-go - version: 63ce630574a5ec05ecd8e8de5cea16332a5a684d + version: e766cfe96ef7320817087fa4cd92c09abdb87310 subpackages: - aws - aws/awserr diff --git a/glide.yaml b/glide.yaml index b16cc5f03..12ad3c548 100644 --- a/glide.yaml +++ b/glide.yaml @@ -22,3 +22,5 @@ import: version: ee39d359dd0896c4c0eccf23f033f158ad3d3bd7 subpackages: - pkg/client/unversioned/remotecommand +- package: github.com/aws/aws-sdk-go + version: ^1.8.24 diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index af2c92857..854abfed4 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -26,6 +26,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/teams" "github.com/zalando-incubator/postgres-operator/pkg/util/users" + "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" ) var ( @@ -37,6 +38,7 @@ var ( type Config struct { KubeClient *kubernetes.Clientset //TODO: move clients to the better place? RestClient *rest.RESTClient + RestConfig *rest.Config TeamsAPIClient *teams.API OpConfig config.Config InfrastructureRoles map[string]spec.PgUser // inherited from the controller @@ -68,6 +70,13 @@ type Cluster struct { podEventsQueue *cache.FIFO } +type compareStatefulsetResult struct { + match bool + replace bool + rollingUpdate bool + reasons []string +} + func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} @@ -244,20 +253,24 @@ func (c *Cluster) sameVolumeWith(volume spec.Volume) (match bool, reason string) return } -func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (match, needsReplace, needsRollUpdate bool, reason string) { +func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *compareStatefulsetResult { + reasons := make([]string, 0) + var match, needsRollUpdate, needsReplace bool + match = true //TODO: improve me if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas { match = false - reason = "new statefulset's number of replicas doesn't match the current one" + reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one") } if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) { needsRollUpdate = true - reason = "new statefulset's container specification doesn't match the current one" + reasons = append(reasons, "new statefulset's container specification doesn't match the current one") } if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { + c.logger.Warnf("statefulset '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) - return + return &compareStatefulsetResult{} } // In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through // and the combined effect of all the changes should be applied. @@ -267,48 +280,44 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (matc if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName { needsReplace = true needsRollUpdate = true - reason = "new statefulset's serviceAccountName service asccount name doesn't match the current one" + reasons = append(reasons, "new statefulset's serviceAccountName service asccount name doesn't match the current one") } if *c.Statefulset.Spec.Template.Spec.TerminationGracePeriodSeconds != *statefulSet.Spec.Template.Spec.TerminationGracePeriodSeconds { needsReplace = true needsRollUpdate = true - reason = "new statefulset's terminationGracePeriodSeconds doesn't match the current one" + reasons = append(reasons, "new statefulset's terminationGracePeriodSeconds doesn't match the current one") } // Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta if !reflect.DeepEqual(c.Statefulset.Spec.Template.Labels, statefulSet.Spec.Template.Labels) { needsReplace = true needsRollUpdate = true - reason = "new statefulset's metadata labels doesn't match the current one" + reasons = append(reasons, "new statefulset's metadata labels doesn't match the current one") } if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) { needsRollUpdate = true needsReplace = true - reason = "new statefulset's metadata annotations doesn't match the current one" + reasons = append(reasons, "new statefulset's metadata annotations doesn't match the current one") } if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) { needsReplace = true - needsRollUpdate = true - reason = "new statefulset's volumeClaimTemplates contains different number of volumes to the old one" + reasons = append(reasons, "new statefulset's volumeClaimTemplates contains different number of volumes to the old one") } for i := 0; i < len(c.Statefulset.Spec.VolumeClaimTemplates); i++ { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name // Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta if name != statefulSet.Spec.VolumeClaimTemplates[i].Name { needsReplace = true - needsRollUpdate = true - reason = fmt.Sprintf("new statefulset's name for volume %d doesn't match the current one", i) + reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d doesn't match the current one", i)) continue } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { needsReplace = true - needsRollUpdate = true - reason = fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name) + reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name)) } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name needsReplace = true - needsRollUpdate = true - reason = fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name) + reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name)) } } @@ -316,28 +325,27 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (matc container2 := statefulSet.Spec.Template.Spec.Containers[0] if container1.Image != container2.Image { needsRollUpdate = true - reason = "new statefulset's container image doesn't match the current one" + reasons = append(reasons, "new statefulset's container image doesn't match the current one") } if !reflect.DeepEqual(container1.Ports, container2.Ports) { needsRollUpdate = true - reason = "new statefulset's container ports don't match the current one" + reasons = append(reasons, "new statefulset's container ports don't match the current one") } if !compareResources(&container1.Resources, &container2.Resources) { needsRollUpdate = true - reason = "new statefulset's container resources don't match the current ones" + reasons = append(reasons, "new statefulset's container resources don't match the current ones") } if !reflect.DeepEqual(container1.Env, container2.Env) { needsRollUpdate = true - reason = "new statefulset's container environment doesn't match the current one" + reasons = append(reasons, "new statefulset's container environment doesn't match the current one") } if needsRollUpdate || needsReplace { match = false } - - return + return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace} } func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) (equal bool) { @@ -387,22 +395,16 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.logger.Infof("service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta)) } - if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match { - c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason) - //TODO: update PVC - } - newStatefulSet, err := c.genStatefulSet(newSpec.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } + cmp := c.compareStatefulSetWith(newStatefulSet) - sameSS, needsReplace, rollingUpdate, reason := c.compareStatefulSetWith(newStatefulSet) - - if !sameSS { - c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, reason) + if !cmp.match { + c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, cmp.reasons) //TODO: mind the case of updating allowedSourceRanges - if !needsReplace { + if !cmp.replace { if err := c.updateStatefulSet(newStatefulSet); err != nil { c.setStatus(spec.ClusterStatusUpdateFailed) return fmt.Errorf("could not upate statefulset: %v", err) @@ -423,7 +425,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { //TODO: rewrite pg version in tpr spec } - if rollingUpdate { + if cmp.rollingUpdate { c.logger.Infof("Rolling update is needed") // TODO: wait for actual streaming to the replica if err := c.recreatePods(); err != nil { @@ -432,6 +434,15 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } c.logger.Infof("Rolling update has been finished") } + + if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match { + c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason) + if err := c.resizeVolumes(newSpec.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { + return fmt.Errorf("Could not update volumes: %v", err) + } + c.logger.Infof("volumes have been updated successfully") + } + c.setStatus(spec.ClusterStatusRunning) return nil diff --git a/pkg/controller/exec.go b/pkg/cluster/exec.go similarity index 92% rename from pkg/controller/exec.go rename to pkg/cluster/exec.go index 2d0af6818..fbd913c21 100644 --- a/pkg/controller/exec.go +++ b/pkg/cluster/exec.go @@ -1,4 +1,4 @@ -package controller +package cluster import ( "bytes" @@ -11,7 +11,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" ) -func (c *Controller) ExecCommand(podName spec.NamespacedName, command []string) (string, error) { +func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (string, error) { var ( execOut bytes.Buffer execErr bytes.Buffer diff --git a/pkg/cluster/filesystems.go b/pkg/cluster/filesystems.go new file mode 100644 index 000000000..fe636ed95 --- /dev/null +++ b/pkg/cluster/filesystems.go @@ -0,0 +1,46 @@ +package cluster + +import ( + "fmt" + "strings" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/filesystems" +) + +func (c *Cluster) getPostgresFilesystemInfo(podName *spec.NamespacedName) (device, fstype string, err error) { + out, err := c.ExecCommand(podName, "bash", "-c", fmt.Sprintf("df -T %s|tail -1", constants.PostgresDataMount)) + if err != nil { + return "", "", err + } + fields := strings.Fields(out) + if len(fields) < 2 { + return "", "", fmt.Errorf("too few fields in the df output") + } + + return fields[0], fields[1], nil +} + +func (c *Cluster) resizePostgresFilesystem(podName *spec.NamespacedName, resizers []filesystems.FilesystemResizer) error { + // resize2fs always writes to stderr, and ExecCommand considers a non-empty stderr an error + // first, determine the device and the filesystem + deviceName, fsType, err := c.getPostgresFilesystemInfo(podName) + if err != nil { + return fmt.Errorf("could not get device and type for the postgres filesystem: %v", err) + } + for _, resizer := range resizers { + if !resizer.CanResizeFilesystem(fsType) { + continue + } + err := resizer.ResizeFilesystem(deviceName, func(cmd string) (out string, err error) { + return c.ExecCommand(podName, "bash", "-c", cmd) + }) + + if err != nil { + return err + } + return nil + } + return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %s", fsType) +} diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 035d9cef6..203afb26e 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -213,7 +213,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, }, { Name: "PGROOT", - Value: "/home/postgres/pgdata/pgroot", + Value: constants.PostgresDataPath, }, { Name: "ETCD_HOST", @@ -293,7 +293,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, VolumeMounts: []v1.VolumeMount{ { Name: constants.DataVolumeName, - MountPath: "/home/postgres/pgdata", //TODO: fetch from manifesto + MountPath: constants.PostgresDataMount, //TODO: fetch from manifesto }, }, Env: envVars, diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 363074d73..49497d929 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -24,19 +24,6 @@ func (c *Cluster) listPods() ([]v1.Pod, error) { return pods.Items, nil } -func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { - ns := c.Metadata.Namespace - listOptions := v1.ListOptions{ - LabelSelector: c.labelsSet().String(), - } - - pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(listOptions) - if err != nil { - return nil, fmt.Errorf("could not get list of PersistentVolumeClaims: %v", err) - } - return pvcs.Items, nil -} - func (c *Cluster) deletePods() error { c.logger.Debugln("Deleting pods") pods, err := c.listPods() @@ -63,28 +50,6 @@ func (c *Cluster) deletePods() error { return nil } -func (c *Cluster) deletePersistenVolumeClaims() error { - c.logger.Debugln("Deleting PVCs") - ns := c.Metadata.Namespace - pvcs, err := c.listPersistentVolumeClaims() - if err != nil { - return err - } - for _, pvc := range pvcs { - c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) - if err := c.KubeClient.PersistentVolumeClaims(ns).Delete(pvc.Name, c.deleteOptions); err != nil { - c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) - } - } - if len(pvcs) > 0 { - c.logger.Debugln("PVCs have been deleted") - } else { - c.logger.Debugln("No PVCs to delete") - } - - return nil -} - func (c *Cluster) deletePod(podName spec.NamespacedName) error { ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 3ac35a274..bdc108a68 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -5,6 +5,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" + "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" ) func (c *Cluster) Sync() error { @@ -44,18 +45,22 @@ func (c *Cluster) Sync() error { } } - if c.databaseAccessDisabled() { - return nil - } - if err := c.initDbConn(); err != nil { - return fmt.Errorf("could not init db connection: %v", err) - } else { - c.logger.Debugf("Syncing roles") - if err := c.SyncRoles(); err != nil { - return fmt.Errorf("could not sync roles: %v", err) + if !c.databaseAccessDisabled() { + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init db connection: %v", err) + } else { + c.logger.Debugf("Syncing roles") + if err := c.SyncRoles(); err != nil { + return fmt.Errorf("could not sync roles: %v", err) + } } } + c.logger.Debugf("Syncing persistent volumes") + if err := c.SyncVolumes(); err != nil { + return fmt.Errorf("could not sync persistent volumes: %v", err) + } + return nil } @@ -114,7 +119,7 @@ func (c *Cluster) syncEndpoint() error { func (c *Cluster) syncStatefulSet() error { cSpec := c.Spec - var rollUpdate, needsReplace bool + var rollUpdate bool if c.Statefulset == nil { c.logger.Infof("could not find the cluster's statefulset") pods, err := c.listPods() @@ -139,24 +144,20 @@ func (c *Cluster) syncStatefulSet() error { return nil } } + /* TODO: should check that we need to replace the statefulset */ if !rollUpdate { - var ( - match bool - reason string - ) - desiredSS, err := c.genStatefulSet(cSpec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } - match, needsReplace, rollUpdate, reason = c.compareStatefulSetWith(desiredSS) - if match { + cmp := c.compareStatefulSetWith(desiredSS) + if cmp.match { return nil } - c.logStatefulSetChanges(c.Statefulset, desiredSS, false, reason) + c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) - if !needsReplace { + if !cmp.replace { if err := c.updateStatefulSet(desiredSS); err != nil { return fmt.Errorf("could not update statefulset: %v", err) } @@ -166,7 +167,7 @@ func (c *Cluster) syncStatefulSet() error { } } - if !rollUpdate { + if !cmp.rollingUpdate { c.logger.Debugln("No rolling update is needed") return nil } @@ -199,3 +200,19 @@ func (c *Cluster) SyncRoles() error { } return nil } + +/* SyncVolume reads all persistent volumes and checks that their size matches the one declared in the statefulset */ +func (c *Cluster) SyncVolumes() error { + act, err := c.VolumesNeedResizing(c.Spec.Volume) + if err != nil { + return fmt.Errorf("could not compare size of the volumes: %v", err) + } + if !act { + return nil + } + if err := c.resizeVolumes(c.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { + return fmt.Errorf("Could not sync volumes: %v", err) + } + c.logger.Infof("volumes have been synced successfully") + return nil +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 2302e1d47..0212c103f 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -63,7 +63,7 @@ func specPatch(spec interface{}) ([]byte, error) { }{spec}) } -func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reason string) { +func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { if isUpdate { c.logger.Infof("statefulset '%s' has been changed", util.NameFromMeta(old.ObjectMeta), @@ -75,8 +75,10 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate } c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) - if reason != "" { - c.logger.Infof("Reason: %s", reason) + if len(reasons) > 0 { + for _, reason := range reasons { + c.logger.Infof("Reason: %s", reason) + } } } diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go new file mode 100644 index 000000000..c54859665 --- /dev/null +++ b/pkg/cluster/volumes.go @@ -0,0 +1,178 @@ +package cluster + +import ( + "fmt" + "strconv" + "strings" + + "k8s.io/client-go/pkg/api/resource" + "k8s.io/client-go/pkg/api/v1" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util" + "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/filesystems" + "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" +) + +func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { + ns := c.Metadata.Namespace + listOptions := v1.ListOptions{ + LabelSelector: c.labelsSet().String(), + } + + pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(listOptions) + if err != nil { + return nil, fmt.Errorf("could not list of PersistentVolumeClaims: %v", err) + } + return pvcs.Items, nil +} + +func (c *Cluster) deletePersistenVolumeClaims() error { + c.logger.Debugln("Deleting PVCs") + pvcs, err := c.listPersistentVolumeClaims() + if err != nil { + return err + } + for _, pvc := range pvcs { + c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) + if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { + c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) + } + } + if len(pvcs) > 0 { + c.logger.Debugln("PVCs have been deleted") + } else { + c.logger.Debugln("No PVCs to delete") + } + + return nil +} + +func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { + result := make([]*v1.PersistentVolume, 0) + + pvcs, err := c.listPersistentVolumeClaims() + if err != nil { + return nil, fmt.Errorf("could not list cluster's PersistentVolumeClaims: %v", err) + } + lastPodIndex := *c.Statefulset.Spec.Replicas - 1 + for _, pvc := range pvcs { + lastDash := strings.LastIndex(pvc.Name, "-") + if lastDash > 0 && lastDash < len(pvc.Name)-1 { + if pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]); err != nil { + return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) + } else { + if int32(pvcNumber) > lastPodIndex { + c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) + continue + } + } + + } + pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName) + if err != nil { + return nil, fmt.Errorf("could not get PersistentVolume: %v", err) + } + result = append(result, pv) + } + + return result, nil +} + +// resizeVolumes resize persistent volumes compatible with the given resizer interface +func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.VolumeResizer) error { + totalCompatible := 0 + newQuantity, err := resource.ParseQuantity(newVolume.Size) + if err != nil { + return fmt.Errorf("could not parse volume size: %v", err) + } + pvs, newSize, err := c.listVolumesWithManifestSize(newVolume) + if err != nil { + return fmt.Errorf("could not list persistent volumes: %v", err) + } + for _, pv := range pvs { + volumeSize := quantityToGigabyte(pv.Spec.Capacity[v1.ResourceStorage]) + if volumeSize > newSize { + return fmt.Errorf("cannot shrink persistent volume") + } + if volumeSize == newSize { + continue + } + for _, resizer := range resizers { + if !resizer.VolumeBelongsToProvider(pv) { + continue + } + totalCompatible += 1 + if !resizer.IsConnectedToProvider() { + err := resizer.ConnectToProvider() + if err != nil { + return fmt.Errorf("could not connect to the volume provider: %v", err) + } + defer resizer.DisconnectFromProvider() + } + awsVolumeId, err := resizer.GetProviderVolumeID(pv) + if err != nil { + return err + } + c.logger.Debugf("updating persistent volume %s to %d", pv.Name, newSize) + if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { + return fmt.Errorf("could not resize EBS volume %s: %v", awsVolumeId, err) + } + c.logger.Debugf("resizing the filesystem on the volume %s", pv.Name) + podName := getPodNameFromPersistentVolume(pv) + if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { + return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) + } + c.logger.Debugf("filesystem resize successfull on volume %s", pv.Name) + pv.Spec.Capacity[v1.ResourceStorage] = newQuantity + c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) + if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { + return fmt.Errorf("could not update persistent volume: %s", err) + } + c.logger.Debugf("successfully updated persistent volume %s", pv.Name) + } + } + if len(pvs) > 0 && totalCompatible == 0 { + return fmt.Errorf("could not resize EBS volumes: persistent volumes are not compatible with existing resizing providers") + } + return nil +} + +func (c *Cluster) VolumesNeedResizing(newVolume spec.Volume) (bool, error) { + volumes, manifestSize, err := c.listVolumesWithManifestSize(newVolume) + if err != nil { + return false, err + } + for _, pv := range volumes { + currentSize := quantityToGigabyte(pv.Spec.Capacity[v1.ResourceStorage]) + if currentSize != manifestSize { + return true, nil + } + } + return false, nil +} + +func (c *Cluster) listVolumesWithManifestSize(newVolume spec.Volume) ([]*v1.PersistentVolume, int64, error) { + newSize, err := resource.ParseQuantity(newVolume.Size) + if err != nil { + return nil, 0, fmt.Errorf("could not parse volume size from the manifest: %v", err) + } + manifestSize := quantityToGigabyte(newSize) + volumes, err := c.listPersistentVolumes() + if err != nil { + return nil, 0, fmt.Errorf("could not list persistent volumes: %v", err) + } + return volumes, manifestSize, nil +} + +// getPodNameFromPersistentVolume returns a pod name that it extracts from the volume claim ref. +func getPodNameFromPersistentVolume(pv *v1.PersistentVolume) *spec.NamespacedName { + namespace := pv.Spec.ClaimRef.Namespace + name := pv.Spec.ClaimRef.Name[len(constants.DataVolumeName)+1:] + return &spec.NamespacedName{namespace, name} +} + +func quantityToGigabyte(q resource.Quantity) int64 { + return q.ScaledValue(0) / (1 * constants.Gigabyte) +} diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 5e1919ecb..d8e0e9892 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -169,8 +169,8 @@ func (c *Controller) processEvent(obj interface{}) error { } if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) - logger.Errorf("%v", cl) + cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) + logger.Errorf("%v", cl.Error) return nil } cl.Error = nil diff --git a/pkg/controller/util.go b/pkg/controller/util.go index fcf15ec06..714128d51 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -23,6 +23,7 @@ func (c *Controller) makeClusterConfig() cluster.Config { return cluster.Config{ KubeClient: c.KubeClient, RestClient: c.RestClient, + RestConfig: c.RestConfig, TeamsAPIClient: c.TeamsAPIClient, OpConfig: config.Copy(c.opConfig), InfrastructureRoles: infrastructureRoles, diff --git a/pkg/util/constants/annotations.go b/pkg/util/constants/annotations.go new file mode 100644 index 000000000..5a2e625d1 --- /dev/null +++ b/pkg/util/constants/annotations.go @@ -0,0 +1,9 @@ +package constants + +const ( + ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" + ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" + ElbTimeoutAnnotationValue = "3600" + KubeIAmAnnotation = "iam.amazonaws.com/role" + VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" +) diff --git a/pkg/util/constants/aws.go b/pkg/util/constants/aws.go new file mode 100644 index 000000000..5140a1729 --- /dev/null +++ b/pkg/util/constants/aws.go @@ -0,0 +1,16 @@ +package constants + +import "time" + +const ( + AWS_REGION = "eu-central-1" + EBSVolumeIDStart = "/vol-" + EBSProvisioner = "kubernetes.io/aws-ebs" + //https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_VolumeModification.html + EBSVolumeStateModifying = "modifying" + EBSVolumeStateOptimizing = "optimizing" + EBSVolumeStateFailed = "failed" + EBSVolumeStateCompleted = "completed" + EBSVolumeResizeWaitInterval = 2 * time.Second + EBSVolumeResizeWaitTimeout = 30 * time.Second +) diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go deleted file mode 100644 index df8c5bc98..000000000 --- a/pkg/util/constants/constants.go +++ /dev/null @@ -1,35 +0,0 @@ -package constants - -import "time" - -const ( - TPRName = "postgresql" - TPRVendor = "acid.zalan.do" - TPRDescription = "Managed PostgreSQL clusters" - TPRApiVersion = "v1" - ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace - WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace - K8sVersion = "v1" - K8sAPIPath = "/api" - DataVolumeName = "pgdata" - PasswordLength = 64 - UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName - ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" - ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" - ElbTimeoutAnnotationValue = "3600" - KubeIAmAnnotation = "iam.amazonaws.com/role" - ResourceName = TPRName + "s" - PodRoleMaster = "master" - PodRoleReplica = "replica" - SuperuserKeyName = "superuser" - ReplicationUserKeyName = "replication" - StatefulsetDeletionInterval = 1 * time.Second - StatefulsetDeletionTimeout = 30 * time.Second - - RoleFlagSuperuser = "SUPERUSER" - RoleFlagInherit = "INHERIT" - RoleFlagLogin = "LOGIN" - RoleFlagNoLogin = "NOLOGIN" - RoleFlagCreateRole = "CREATEROLE" - RoleFlagCreateDB = "CREATEDB" -) diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go new file mode 100644 index 000000000..0341d3734 --- /dev/null +++ b/pkg/util/constants/kubernetes.go @@ -0,0 +1,12 @@ +package constants + +import "time" + +const ( + ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace + WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace + K8sVersion = "v1" + K8sAPIPath = "/api" + StatefulsetDeletionInterval = 1 * time.Second + StatefulsetDeletionTimeout = 30 * time.Second +) diff --git a/pkg/util/constants/postgresql.go b/pkg/util/constants/postgresql.go new file mode 100644 index 000000000..c27802946 --- /dev/null +++ b/pkg/util/constants/postgresql.go @@ -0,0 +1,9 @@ +package constants + +const ( + DataVolumeName = "pgdata" + PodRoleMaster = "master" + PodRoleReplica = "replica" + PostgresDataMount = "/home/postgres/pgdata" + PostgresDataPath = PostgresDataMount + "/pgroot" +) diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go new file mode 100644 index 000000000..85fb42b1b --- /dev/null +++ b/pkg/util/constants/roles.go @@ -0,0 +1,14 @@ +package constants + +const ( + PasswordLength = 64 + UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName + SuperuserKeyName = "superuser" + ReplicationUserKeyName = "replication" + RoleFlagSuperuser = "SUPERUSER" + RoleFlagInherit = "INHERIT" + RoleFlagLogin = "LOGIN" + RoleFlagNoLogin = "NOLOGIN" + RoleFlagCreateRole = "CREATEROLE" + RoleFlagCreateDB = "CREATEDB" +) diff --git a/pkg/util/constants/thirdpartyresource.go b/pkg/util/constants/thirdpartyresource.go new file mode 100644 index 000000000..64d702bc6 --- /dev/null +++ b/pkg/util/constants/thirdpartyresource.go @@ -0,0 +1,9 @@ +package constants + +const ( + TPRName = "postgresql" + TPRVendor = "acid.zalan.do" + TPRDescription = "Managed PostgreSQL clusters" + TPRApiVersion = "v1" + ResourceName = TPRName + "s" +) diff --git a/pkg/util/constants/units.go b/pkg/util/constants/units.go new file mode 100644 index 000000000..4ec3b2511 --- /dev/null +++ b/pkg/util/constants/units.go @@ -0,0 +1,5 @@ +package constants + +const ( + Gigabyte = 1073741824 +) diff --git a/pkg/util/filesystems/ext234.go b/pkg/util/filesystems/ext234.go new file mode 100644 index 000000000..7d9215453 --- /dev/null +++ b/pkg/util/filesystems/ext234.go @@ -0,0 +1,38 @@ +package filesystems + +import ( + "fmt" + "regexp" + "strings" +) + +var ( + ext2fsSuccessRegexp = regexp.MustCompile(`The filesystem on [/a-z0-9]+ is now \d+ \(\d+\w+\) blocks long.`) +) + +const ( + EXT2 = "ext2" + EXT3 = "ext3" + EXT4 = "ext4" + resize2fs = "resize2fs" +) + +type Ext234Resize struct { +} + +func (c *Ext234Resize) CanResizeFilesystem(fstype string) bool { + return fstype == EXT2 || fstype == EXT3 || fstype == EXT4 +} + +func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func(cmd string) (out string, err error)) error { + command := fmt.Sprintf("%s %s 2>&1", resize2fs, deviceName) + out, err := commandExecutor(command) + if err != nil { + return err + } + if strings.Contains(out, "Nothing to do") || + (strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) { + return nil + } + return fmt.Errorf("unrecognized output: %s, assuming error", out) +} diff --git a/pkg/util/filesystems/filesystems.go b/pkg/util/filesystems/filesystems.go new file mode 100644 index 000000000..dba33ae40 --- /dev/null +++ b/pkg/util/filesystems/filesystems.go @@ -0,0 +1,6 @@ +package filesystems + +type FilesystemResizer interface { + CanResizeFilesystem(fstype string) bool + ResizeFilesystem(deviceName string, commandExecutor func(string) (out string, err error)) error +} diff --git a/pkg/util/teams/teams_test.go b/pkg/util/teams/teams_test.go index 0b95524fb..b17927362 100644 --- a/pkg/util/teams/teams_test.go +++ b/pkg/util/teams/teams_test.go @@ -144,7 +144,7 @@ func TestInfo(t *testing.T) { for _, tc := range teamsAPItc { func() { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Authorization") != "Bearer " + token { + if r.Header.Get("Authorization") != "Bearer "+token { t.Errorf("Authorization token is wrong or not provided") } w.WriteHeader(tc.inCode) diff --git a/pkg/util/volumes/ebs.go b/pkg/util/volumes/ebs.go new file mode 100644 index 000000000..4142820f1 --- /dev/null +++ b/pkg/util/volumes/ebs.go @@ -0,0 +1,101 @@ +package volumes + +import ( + "fmt" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + + "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" + "k8s.io/client-go/pkg/api/v1" +) + +type EBSVolumeResizer struct { + connection *ec2.EC2 +} + +func (c *EBSVolumeResizer) ConnectToProvider() error { + sess, err := session.NewSession(&aws.Config{Region: aws.String(constants.AWS_REGION)}) + if err != nil { + return fmt.Errorf("could not establish AWS session: %v", err) + } + c.connection = ec2.New(sess) + return nil +} + +func (c *EBSVolumeResizer) IsConnectedToProvider() bool { + return c.connection != nil +} + +func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool { + return pv.Spec.AWSElasticBlockStore != nil && pv.Annotations[constants.VolumeStorateProvisionerAnnotation] == constants.EBSProvisioner +} + +// GetProviderVolumeID converts aws://eu-central-1b/vol-00f93d4827217c629 to vol-00f93d4827217c629 for EBS volumes +func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { + volumeID := pv.Spec.AWSElasticBlockStore.VolumeID + if volumeID == "" { + return "", fmt.Errorf("volume id is empty for volume %s", pv.Name) + } + idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1 + if idx == 0 { + return "", fmt.Errorf("malfored EBS volume id %s", volumeID) + } + return volumeID[idx:], nil +} + +func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { + /* first check if the volume is already of a requested size */ + volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeId}}) + if err != nil { + return fmt.Errorf("could not get information about the volume: %v", err) + } + vol := volumeOutput.Volumes[0] + if *vol.VolumeId != volumeId { + return fmt.Errorf("describe volume %s returned information about a non-matching volume %s", volumeId, *vol.VolumeId) + } + if *vol.Size == newSize { + // nothing to do + return nil + } + input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeId} + output, err := c.connection.ModifyVolume(&input) + if err != nil { + return fmt.Errorf("could not modify persistent volume: %v", err) + } + + state := *output.VolumeModification.ModificationState + if state == constants.EBSVolumeStateFailed { + return fmt.Errorf("could not modify persistent volume %s: modification state failed", volumeId) + } + if state == "" { + return fmt.Errorf("received empty modification status") + } + if state == constants.EBSVolumeStateOptimizing || state == constants.EBSVolumeStateCompleted { + return nil + } + // wait until the volume reaches the "optimizing" or "completed" state + in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeId}} + return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout, + func() (bool, error) { + out, err := c.connection.DescribeVolumesModifications(&in) + if err != nil { + return false, fmt.Errorf("could not describe volume modification: %v", err) + } + if len(out.VolumesModifications) != 1 { + return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) + } + if *out.VolumesModifications[0].VolumeId != volumeId { + return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"") + } + return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil + }) +} + +func (c *EBSVolumeResizer) DisconnectFromProvider() error { + c.connection = nil + return nil +} diff --git a/pkg/util/volumes/volumes.go b/pkg/util/volumes/volumes.go new file mode 100644 index 000000000..f35d0b5d3 --- /dev/null +++ b/pkg/util/volumes/volumes.go @@ -0,0 +1,14 @@ +package volumes + +import ( + "k8s.io/client-go/pkg/api/v1" +) + +type VolumeResizer interface { + ConnectToProvider() error + IsConnectedToProvider() bool + VolumeBelongsToProvider(pv *v1.PersistentVolume) bool + GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) + ResizeVolume(providerVolumeId string, newSize int64) error + DisconnectFromProvider() error +}