Merge branch 'master' into feature/tests
This commit is contained in:
		
						commit
						c8d80273c3
					
				|  | @ -7,7 +7,7 @@ imports: | ||||||
|   - compute/metadata |   - compute/metadata | ||||||
|   - internal |   - internal | ||||||
| - name: github.com/aws/aws-sdk-go | - name: github.com/aws/aws-sdk-go | ||||||
|   version: 63ce630574a5ec05ecd8e8de5cea16332a5a684d |   version: e766cfe96ef7320817087fa4cd92c09abdb87310 | ||||||
|   subpackages: |   subpackages: | ||||||
|   - aws |   - aws | ||||||
|   - aws/awserr |   - aws/awserr | ||||||
|  |  | ||||||
|  | @ -22,3 +22,5 @@ import: | ||||||
|   version: ee39d359dd0896c4c0eccf23f033f158ad3d3bd7 |   version: ee39d359dd0896c4c0eccf23f033f158ad3d3bd7 | ||||||
|   subpackages: |   subpackages: | ||||||
|   - pkg/client/unversioned/remotecommand |   - pkg/client/unversioned/remotecommand | ||||||
|  | - package: github.com/aws/aws-sdk-go | ||||||
|  |   version: ^1.8.24 | ||||||
|  |  | ||||||
|  | @ -26,6 +26,7 @@ import ( | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/teams" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/teams" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/users" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/users" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/volumes" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var ( | var ( | ||||||
|  | @ -37,6 +38,7 @@ var ( | ||||||
| type Config struct { | type Config struct { | ||||||
| 	KubeClient          *kubernetes.Clientset //TODO: move clients to the better place?
 | 	KubeClient          *kubernetes.Clientset //TODO: move clients to the better place?
 | ||||||
| 	RestClient          *rest.RESTClient | 	RestClient          *rest.RESTClient | ||||||
|  | 	RestConfig          *rest.Config | ||||||
| 	TeamsAPIClient      *teams.API | 	TeamsAPIClient      *teams.API | ||||||
| 	OpConfig            config.Config | 	OpConfig            config.Config | ||||||
| 	InfrastructureRoles map[string]spec.PgUser // inherited from the controller
 | 	InfrastructureRoles map[string]spec.PgUser // inherited from the controller
 | ||||||
|  | @ -68,6 +70,13 @@ type Cluster struct { | ||||||
| 	podEventsQueue   *cache.FIFO | 	podEventsQueue   *cache.FIFO | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | type compareStatefulsetResult struct { | ||||||
|  | 	match         bool | ||||||
|  | 	replace       bool | ||||||
|  | 	rollingUpdate bool | ||||||
|  | 	reasons       []string | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||||
| 	lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) | 	lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) | ||||||
| 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} | 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} | ||||||
|  | @ -244,20 +253,24 @@ func (c *Cluster) sameVolumeWith(volume spec.Volume) (match bool, reason string) | ||||||
| 	return | 	return | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (match, needsReplace, needsRollUpdate bool, reason string) { | func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *compareStatefulsetResult { | ||||||
|  | 	reasons := make([]string, 0) | ||||||
|  | 	var match, needsRollUpdate, needsReplace bool | ||||||
|  | 
 | ||||||
| 	match = true | 	match = true | ||||||
| 	//TODO: improve me
 | 	//TODO: improve me
 | ||||||
| 	if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas { | 	if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas { | ||||||
| 		match = false | 		match = false | ||||||
| 		reason = "new statefulset's number of replicas doesn't match the current one" | 		reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one") | ||||||
| 	} | 	} | ||||||
| 	if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) { | 	if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) { | ||||||
| 		needsRollUpdate = true | 		needsRollUpdate = true | ||||||
| 		reason = "new statefulset's container specification doesn't match the current one" | 		reasons = append(reasons, "new statefulset's container specification doesn't match the current one") | ||||||
| 	} | 	} | ||||||
| 	if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { | 	if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { | ||||||
|  | 
 | ||||||
| 		c.logger.Warnf("statefulset '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) | 		c.logger.Warnf("statefulset '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) | ||||||
| 		return | 		return &compareStatefulsetResult{} | ||||||
| 	} | 	} | ||||||
| 	// In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through
 | 	// In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through
 | ||||||
| 	// and the combined effect of all the changes should be applied.
 | 	// and the combined effect of all the changes should be applied.
 | ||||||
|  | @ -267,48 +280,44 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (matc | ||||||
| 	if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName { | 	if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName { | ||||||
| 		needsReplace = true | 		needsReplace = true | ||||||
| 		needsRollUpdate = true | 		needsRollUpdate = true | ||||||
| 		reason = "new statefulset's serviceAccountName service asccount name doesn't match the current one" | 		reasons = append(reasons, "new statefulset's serviceAccountName service asccount name doesn't match the current one") | ||||||
| 	} | 	} | ||||||
| 	if *c.Statefulset.Spec.Template.Spec.TerminationGracePeriodSeconds != *statefulSet.Spec.Template.Spec.TerminationGracePeriodSeconds { | 	if *c.Statefulset.Spec.Template.Spec.TerminationGracePeriodSeconds != *statefulSet.Spec.Template.Spec.TerminationGracePeriodSeconds { | ||||||
| 		needsReplace = true | 		needsReplace = true | ||||||
| 		needsRollUpdate = true | 		needsRollUpdate = true | ||||||
| 		reason = "new statefulset's terminationGracePeriodSeconds  doesn't match the current one" | 		reasons = append(reasons, "new statefulset's terminationGracePeriodSeconds  doesn't match the current one") | ||||||
| 	} | 	} | ||||||
| 	// Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta
 | 	// Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta
 | ||||||
| 	if !reflect.DeepEqual(c.Statefulset.Spec.Template.Labels, statefulSet.Spec.Template.Labels) { | 	if !reflect.DeepEqual(c.Statefulset.Spec.Template.Labels, statefulSet.Spec.Template.Labels) { | ||||||
| 		needsReplace = true | 		needsReplace = true | ||||||
| 		needsRollUpdate = true | 		needsRollUpdate = true | ||||||
| 		reason = "new statefulset's metadata labels doesn't match the current one" | 		reasons = append(reasons, "new statefulset's metadata labels doesn't match the current one") | ||||||
| 	} | 	} | ||||||
| 	if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) { | 	if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) { | ||||||
| 		needsRollUpdate = true | 		needsRollUpdate = true | ||||||
| 		needsReplace = true | 		needsReplace = true | ||||||
| 		reason = "new statefulset's metadata annotations doesn't match the current one" | 		reasons = append(reasons, "new statefulset's metadata annotations doesn't match the current one") | ||||||
| 	} | 	} | ||||||
| 	if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) { | 	if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) { | ||||||
| 		needsReplace = true | 		needsReplace = true | ||||||
| 		needsRollUpdate = true | 		reasons = append(reasons, "new statefulset's volumeClaimTemplates contains different number of volumes to the old one") | ||||||
| 		reason = "new statefulset's volumeClaimTemplates contains different number of volumes to the old one" |  | ||||||
| 	} | 	} | ||||||
| 	for i := 0; i < len(c.Statefulset.Spec.VolumeClaimTemplates); i++ { | 	for i := 0; i < len(c.Statefulset.Spec.VolumeClaimTemplates); i++ { | ||||||
| 		name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name | 		name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name | ||||||
| 		// Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta
 | 		// Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta
 | ||||||
| 		if name != statefulSet.Spec.VolumeClaimTemplates[i].Name { | 		if name != statefulSet.Spec.VolumeClaimTemplates[i].Name { | ||||||
| 			needsReplace = true | 			needsReplace = true | ||||||
| 			needsRollUpdate = true | 			reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d doesn't match the current one", i)) | ||||||
| 			reason = fmt.Sprintf("new statefulset's name for volume %d doesn't match the current one", i) |  | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { | 		if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { | ||||||
| 			needsReplace = true | 			needsReplace = true | ||||||
| 			needsRollUpdate = true | 			reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name)) | ||||||
| 			reason = fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name) |  | ||||||
| 		} | 		} | ||||||
| 		if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { | 		if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { | ||||||
| 			name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name | 			name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name | ||||||
| 			needsReplace = true | 			needsReplace = true | ||||||
| 			needsRollUpdate = true | 			reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name)) | ||||||
| 			reason = fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name) |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -316,28 +325,27 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) (matc | ||||||
| 	container2 := statefulSet.Spec.Template.Spec.Containers[0] | 	container2 := statefulSet.Spec.Template.Spec.Containers[0] | ||||||
| 	if container1.Image != container2.Image { | 	if container1.Image != container2.Image { | ||||||
| 		needsRollUpdate = true | 		needsRollUpdate = true | ||||||
| 		reason = "new statefulset's container image doesn't match the current one" | 		reasons = append(reasons, "new statefulset's container image doesn't match the current one") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if !reflect.DeepEqual(container1.Ports, container2.Ports) { | 	if !reflect.DeepEqual(container1.Ports, container2.Ports) { | ||||||
| 		needsRollUpdate = true | 		needsRollUpdate = true | ||||||
| 		reason = "new statefulset's container ports don't match the current one" | 		reasons = append(reasons, "new statefulset's container ports don't match the current one") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if !compareResources(&container1.Resources, &container2.Resources) { | 	if !compareResources(&container1.Resources, &container2.Resources) { | ||||||
| 		needsRollUpdate = true | 		needsRollUpdate = true | ||||||
| 		reason = "new statefulset's container resources don't match the current ones" | 		reasons = append(reasons, "new statefulset's container resources don't match the current ones") | ||||||
| 	} | 	} | ||||||
| 	if !reflect.DeepEqual(container1.Env, container2.Env) { | 	if !reflect.DeepEqual(container1.Env, container2.Env) { | ||||||
| 		needsRollUpdate = true | 		needsRollUpdate = true | ||||||
| 		reason = "new statefulset's container environment doesn't match the current one" | 		reasons = append(reasons, "new statefulset's container environment doesn't match the current one") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if needsRollUpdate || needsReplace { | 	if needsRollUpdate || needsReplace { | ||||||
| 		match = false | 		match = false | ||||||
| 	} | 	} | ||||||
| 
 | 	return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace} | ||||||
| 	return |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) (equal bool) { | func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) (equal bool) { | ||||||
|  | @ -387,22 +395,16 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 		c.logger.Infof("service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta)) | 		c.logger.Infof("service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta)) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match { |  | ||||||
| 		c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason) |  | ||||||
| 		//TODO: update PVC
 |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	newStatefulSet, err := c.genStatefulSet(newSpec.Spec) | 	newStatefulSet, err := c.genStatefulSet(newSpec.Spec) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not generate statefulset: %v", err) | 		return fmt.Errorf("could not generate statefulset: %v", err) | ||||||
| 	} | 	} | ||||||
|  | 	cmp := c.compareStatefulSetWith(newStatefulSet) | ||||||
| 
 | 
 | ||||||
| 	sameSS, needsReplace, rollingUpdate, reason := c.compareStatefulSetWith(newStatefulSet) | 	if !cmp.match { | ||||||
| 
 | 		c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, cmp.reasons) | ||||||
| 	if !sameSS { |  | ||||||
| 		c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, reason) |  | ||||||
| 		//TODO: mind the case of updating allowedSourceRanges
 | 		//TODO: mind the case of updating allowedSourceRanges
 | ||||||
| 		if !needsReplace { | 		if !cmp.replace { | ||||||
| 			if err := c.updateStatefulSet(newStatefulSet); err != nil { | 			if err := c.updateStatefulSet(newStatefulSet); err != nil { | ||||||
| 				c.setStatus(spec.ClusterStatusUpdateFailed) | 				c.setStatus(spec.ClusterStatusUpdateFailed) | ||||||
| 				return fmt.Errorf("could not upate statefulset: %v", err) | 				return fmt.Errorf("could not upate statefulset: %v", err) | ||||||
|  | @ -423,7 +425,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 		//TODO: rewrite pg version in tpr spec
 | 		//TODO: rewrite pg version in tpr spec
 | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if rollingUpdate { | 	if cmp.rollingUpdate { | ||||||
| 		c.logger.Infof("Rolling update is needed") | 		c.logger.Infof("Rolling update is needed") | ||||||
| 		// TODO: wait for actual streaming to the replica
 | 		// TODO: wait for actual streaming to the replica
 | ||||||
| 		if err := c.recreatePods(); err != nil { | 		if err := c.recreatePods(); err != nil { | ||||||
|  | @ -432,6 +434,15 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Rolling update has been finished") | 		c.logger.Infof("Rolling update has been finished") | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match { | ||||||
|  | 		c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason) | ||||||
|  | 		if err := c.resizeVolumes(newSpec.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { | ||||||
|  | 			return fmt.Errorf("Could not update volumes: %v", err) | ||||||
|  | 		} | ||||||
|  | 		c.logger.Infof("volumes have been updated successfully") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	c.setStatus(spec.ClusterStatusRunning) | 	c.setStatus(spec.ClusterStatusRunning) | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  |  | ||||||
|  | @ -1,4 +1,4 @@ | ||||||
| package controller | package cluster | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
|  | @ -11,7 +11,7 @@ import ( | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (c *Controller) ExecCommand(podName spec.NamespacedName, command []string) (string, error) { | func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (string, error) { | ||||||
| 	var ( | 	var ( | ||||||
| 		execOut bytes.Buffer | 		execOut bytes.Buffer | ||||||
| 		execErr bytes.Buffer | 		execErr bytes.Buffer | ||||||
|  | @ -0,0 +1,46 @@ | ||||||
|  | package cluster | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"strings" | ||||||
|  | 
 | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/filesystems" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) getPostgresFilesystemInfo(podName *spec.NamespacedName) (device, fstype string, err error) { | ||||||
|  | 	out, err := c.ExecCommand(podName, "bash", "-c", fmt.Sprintf("df -T %s|tail -1", constants.PostgresDataMount)) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", "", err | ||||||
|  | 	} | ||||||
|  | 	fields := strings.Fields(out) | ||||||
|  | 	if len(fields) < 2 { | ||||||
|  | 		return "", "", fmt.Errorf("too few fields in the df output") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return fields[0], fields[1], nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) resizePostgresFilesystem(podName *spec.NamespacedName, resizers []filesystems.FilesystemResizer) error { | ||||||
|  | 	// resize2fs always writes to stderr, and ExecCommand considers a non-empty stderr an error
 | ||||||
|  | 	// first, determine the device and the filesystem
 | ||||||
|  | 	deviceName, fsType, err := c.getPostgresFilesystemInfo(podName) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("could not get device and type for the postgres filesystem: %v", err) | ||||||
|  | 	} | ||||||
|  | 	for _, resizer := range resizers { | ||||||
|  | 		if !resizer.CanResizeFilesystem(fsType) { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		err := resizer.ResizeFilesystem(deviceName, func(cmd string) (out string, err error) { | ||||||
|  | 			return c.ExecCommand(podName, "bash", "-c", cmd) | ||||||
|  | 		}) | ||||||
|  | 
 | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %s", fsType) | ||||||
|  | } | ||||||
|  | @ -213,7 +213,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			Name:  "PGROOT", | 			Name:  "PGROOT", | ||||||
| 			Value: "/home/postgres/pgdata/pgroot", | 			Value: constants.PostgresDataPath, | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			Name:  "ETCD_HOST", | 			Name:  "ETCD_HOST", | ||||||
|  | @ -293,7 +293,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, | ||||||
| 		VolumeMounts: []v1.VolumeMount{ | 		VolumeMounts: []v1.VolumeMount{ | ||||||
| 			{ | 			{ | ||||||
| 				Name:      constants.DataVolumeName, | 				Name:      constants.DataVolumeName, | ||||||
| 				MountPath: "/home/postgres/pgdata", //TODO: fetch from manifesto
 | 				MountPath: constants.PostgresDataMount, //TODO: fetch from manifesto
 | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
| 		Env: envVars, | 		Env: envVars, | ||||||
|  |  | ||||||
|  | @ -24,19 +24,6 @@ func (c *Cluster) listPods() ([]v1.Pod, error) { | ||||||
| 	return pods.Items, nil | 	return pods.Items, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { |  | ||||||
| 	ns := c.Metadata.Namespace |  | ||||||
| 	listOptions := v1.ListOptions{ |  | ||||||
| 		LabelSelector: c.labelsSet().String(), |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(listOptions) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, fmt.Errorf("could not get list of PersistentVolumeClaims: %v", err) |  | ||||||
| 	} |  | ||||||
| 	return pvcs.Items, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Cluster) deletePods() error { | func (c *Cluster) deletePods() error { | ||||||
| 	c.logger.Debugln("Deleting pods") | 	c.logger.Debugln("Deleting pods") | ||||||
| 	pods, err := c.listPods() | 	pods, err := c.listPods() | ||||||
|  | @ -63,28 +50,6 @@ func (c *Cluster) deletePods() error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deletePersistenVolumeClaims() error { |  | ||||||
| 	c.logger.Debugln("Deleting PVCs") |  | ||||||
| 	ns := c.Metadata.Namespace |  | ||||||
| 	pvcs, err := c.listPersistentVolumeClaims() |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	for _, pvc := range pvcs { |  | ||||||
| 		c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) |  | ||||||
| 		if err := c.KubeClient.PersistentVolumeClaims(ns).Delete(pvc.Name, c.deleteOptions); err != nil { |  | ||||||
| 			c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	if len(pvcs) > 0 { |  | ||||||
| 		c.logger.Debugln("PVCs have been deleted") |  | ||||||
| 	} else { |  | ||||||
| 		c.logger.Debugln("No PVCs to delete") |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Cluster) deletePod(podName spec.NamespacedName) error { | func (c *Cluster) deletePod(podName spec.NamespacedName) error { | ||||||
| 	ch := c.registerPodSubscriber(podName) | 	ch := c.registerPodSubscriber(podName) | ||||||
| 	defer c.unregisterPodSubscriber(podName) | 	defer c.unregisterPodSubscriber(podName) | ||||||
|  |  | ||||||
|  | @ -5,6 +5,7 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util" | 	"github.com/zalando-incubator/postgres-operator/pkg/util" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/volumes" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) Sync() error { | func (c *Cluster) Sync() error { | ||||||
|  | @ -44,18 +45,22 @@ func (c *Cluster) Sync() error { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if c.databaseAccessDisabled() { | 	if !c.databaseAccessDisabled() { | ||||||
| 		return nil | 		if err := c.initDbConn(); err != nil { | ||||||
| 	} | 			return fmt.Errorf("could not init db connection: %v", err) | ||||||
| 	if err := c.initDbConn(); err != nil { | 		} else { | ||||||
| 		return fmt.Errorf("could not init db connection: %v", err) | 			c.logger.Debugf("Syncing roles") | ||||||
| 	} else { | 			if err := c.SyncRoles(); err != nil { | ||||||
| 		c.logger.Debugf("Syncing roles") | 				return fmt.Errorf("could not sync roles: %v", err) | ||||||
| 		if err := c.SyncRoles(); err != nil { | 			} | ||||||
| 			return fmt.Errorf("could not sync roles: %v", err) |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	c.logger.Debugf("Syncing persistent volumes") | ||||||
|  | 	if err := c.SyncVolumes(); err != nil { | ||||||
|  | 		return fmt.Errorf("could not sync persistent volumes: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -114,7 +119,7 @@ func (c *Cluster) syncEndpoint() error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncStatefulSet() error { | func (c *Cluster) syncStatefulSet() error { | ||||||
| 	cSpec := c.Spec | 	cSpec := c.Spec | ||||||
| 	var rollUpdate, needsReplace bool | 	var rollUpdate bool | ||||||
| 	if c.Statefulset == nil { | 	if c.Statefulset == nil { | ||||||
| 		c.logger.Infof("could not find the cluster's statefulset") | 		c.logger.Infof("could not find the cluster's statefulset") | ||||||
| 		pods, err := c.listPods() | 		pods, err := c.listPods() | ||||||
|  | @ -139,24 +144,20 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	/* TODO: should check that we need to replace the statefulset */ | ||||||
| 	if !rollUpdate { | 	if !rollUpdate { | ||||||
| 		var ( |  | ||||||
| 			match  bool |  | ||||||
| 			reason string |  | ||||||
| 		) |  | ||||||
| 
 |  | ||||||
| 		desiredSS, err := c.genStatefulSet(cSpec) | 		desiredSS, err := c.genStatefulSet(cSpec) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not generate statefulset: %v", err) | 			return fmt.Errorf("could not generate statefulset: %v", err) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		match, needsReplace, rollUpdate, reason = c.compareStatefulSetWith(desiredSS) | 		cmp := c.compareStatefulSetWith(desiredSS) | ||||||
| 		if match { | 		if cmp.match { | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		c.logStatefulSetChanges(c.Statefulset, desiredSS, false, reason) | 		c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons) | ||||||
| 
 | 
 | ||||||
| 		if !needsReplace { | 		if !cmp.replace { | ||||||
| 			if err := c.updateStatefulSet(desiredSS); err != nil { | 			if err := c.updateStatefulSet(desiredSS); err != nil { | ||||||
| 				return fmt.Errorf("could not update statefulset: %v", err) | 				return fmt.Errorf("could not update statefulset: %v", err) | ||||||
| 			} | 			} | ||||||
|  | @ -166,7 +167,7 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if !rollUpdate { | 		if !cmp.rollingUpdate { | ||||||
| 			c.logger.Debugln("No rolling update is needed") | 			c.logger.Debugln("No rolling update is needed") | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
|  | @ -199,3 +200,19 @@ func (c *Cluster) SyncRoles() error { | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | /* SyncVolume reads all persistent volumes and checks that their size matches the one declared in the statefulset */ | ||||||
|  | func (c *Cluster) SyncVolumes() error { | ||||||
|  | 	act, err := c.VolumesNeedResizing(c.Spec.Volume) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("could not compare size of the volumes: %v", err) | ||||||
|  | 	} | ||||||
|  | 	if !act { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	if err := c.resizeVolumes(c.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { | ||||||
|  | 		return fmt.Errorf("Could not sync volumes: %v", err) | ||||||
|  | 	} | ||||||
|  | 	c.logger.Infof("volumes have been synced successfully") | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -63,7 +63,7 @@ func specPatch(spec interface{}) ([]byte, error) { | ||||||
| 	}{spec}) | 	}{spec}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reason string) { | func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { | ||||||
| 	if isUpdate { | 	if isUpdate { | ||||||
| 		c.logger.Infof("statefulset '%s' has been changed", | 		c.logger.Infof("statefulset '%s' has been changed", | ||||||
| 			util.NameFromMeta(old.ObjectMeta), | 			util.NameFromMeta(old.ObjectMeta), | ||||||
|  | @ -75,8 +75,10 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate | ||||||
| 	} | 	} | ||||||
| 	c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) | 	c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) | ||||||
| 
 | 
 | ||||||
| 	if reason != "" { | 	if len(reasons) > 0 { | ||||||
| 		c.logger.Infof("Reason: %s", reason) | 		for _, reason := range reasons { | ||||||
|  | 			c.logger.Infof("Reason: %s", reason) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -0,0 +1,178 @@ | ||||||
|  | package cluster | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"strconv" | ||||||
|  | 	"strings" | ||||||
|  | 
 | ||||||
|  | 	"k8s.io/client-go/pkg/api/resource" | ||||||
|  | 	"k8s.io/client-go/pkg/api/v1" | ||||||
|  | 
 | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/filesystems" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/volumes" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { | ||||||
|  | 	ns := c.Metadata.Namespace | ||||||
|  | 	listOptions := v1.ListOptions{ | ||||||
|  | 		LabelSelector: c.labelsSet().String(), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(listOptions) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("could not list of PersistentVolumeClaims: %v", err) | ||||||
|  | 	} | ||||||
|  | 	return pvcs.Items, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) deletePersistenVolumeClaims() error { | ||||||
|  | 	c.logger.Debugln("Deleting PVCs") | ||||||
|  | 	pvcs, err := c.listPersistentVolumeClaims() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	for _, pvc := range pvcs { | ||||||
|  | 		c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) | ||||||
|  | 		if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { | ||||||
|  | 			c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if len(pvcs) > 0 { | ||||||
|  | 		c.logger.Debugln("PVCs have been deleted") | ||||||
|  | 	} else { | ||||||
|  | 		c.logger.Debugln("No PVCs to delete") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { | ||||||
|  | 	result := make([]*v1.PersistentVolume, 0) | ||||||
|  | 
 | ||||||
|  | 	pvcs, err := c.listPersistentVolumeClaims() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("could not list cluster's PersistentVolumeClaims: %v", err) | ||||||
|  | 	} | ||||||
|  | 	lastPodIndex := *c.Statefulset.Spec.Replicas - 1 | ||||||
|  | 	for _, pvc := range pvcs { | ||||||
|  | 		lastDash := strings.LastIndex(pvc.Name, "-") | ||||||
|  | 		if lastDash > 0 && lastDash < len(pvc.Name)-1 { | ||||||
|  | 			if pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]); err != nil { | ||||||
|  | 				return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) | ||||||
|  | 			} else { | ||||||
|  | 				if int32(pvcNumber) > lastPodIndex { | ||||||
|  | 					c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 		} | ||||||
|  | 		pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, fmt.Errorf("could not get PersistentVolume: %v", err) | ||||||
|  | 		} | ||||||
|  | 		result = append(result, pv) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return result, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // resizeVolumes resize persistent volumes compatible with the given resizer interface
 | ||||||
|  | func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.VolumeResizer) error { | ||||||
|  | 	totalCompatible := 0 | ||||||
|  | 	newQuantity, err := resource.ParseQuantity(newVolume.Size) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("could not parse volume size: %v", err) | ||||||
|  | 	} | ||||||
|  | 	pvs, newSize, err := c.listVolumesWithManifestSize(newVolume) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("could not list persistent volumes: %v", err) | ||||||
|  | 	} | ||||||
|  | 	for _, pv := range pvs { | ||||||
|  | 		volumeSize := quantityToGigabyte(pv.Spec.Capacity[v1.ResourceStorage]) | ||||||
|  | 		if volumeSize > newSize { | ||||||
|  | 			return fmt.Errorf("cannot shrink persistent volume") | ||||||
|  | 		} | ||||||
|  | 		if volumeSize == newSize { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		for _, resizer := range resizers { | ||||||
|  | 			if !resizer.VolumeBelongsToProvider(pv) { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			totalCompatible += 1 | ||||||
|  | 			if !resizer.IsConnectedToProvider() { | ||||||
|  | 				err := resizer.ConnectToProvider() | ||||||
|  | 				if err != nil { | ||||||
|  | 					return fmt.Errorf("could not connect to the volume provider: %v", err) | ||||||
|  | 				} | ||||||
|  | 				defer resizer.DisconnectFromProvider() | ||||||
|  | 			} | ||||||
|  | 			awsVolumeId, err := resizer.GetProviderVolumeID(pv) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 			c.logger.Debugf("updating persistent volume %s to %d", pv.Name, newSize) | ||||||
|  | 			if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { | ||||||
|  | 				return fmt.Errorf("could not resize EBS volume %s: %v", awsVolumeId, err) | ||||||
|  | 			} | ||||||
|  | 			c.logger.Debugf("resizing the filesystem on the volume %s", pv.Name) | ||||||
|  | 			podName := getPodNameFromPersistentVolume(pv) | ||||||
|  | 			if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { | ||||||
|  | 				return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) | ||||||
|  | 			} | ||||||
|  | 			c.logger.Debugf("filesystem resize successfull on volume %s", pv.Name) | ||||||
|  | 			pv.Spec.Capacity[v1.ResourceStorage] = newQuantity | ||||||
|  | 			c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) | ||||||
|  | 			if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { | ||||||
|  | 				return fmt.Errorf("could not update persistent volume: %s", err) | ||||||
|  | 			} | ||||||
|  | 			c.logger.Debugf("successfully updated persistent volume %s", pv.Name) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if len(pvs) > 0 && totalCompatible == 0 { | ||||||
|  | 		return fmt.Errorf("could not resize EBS volumes: persistent volumes are not compatible with existing resizing providers") | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) VolumesNeedResizing(newVolume spec.Volume) (bool, error) { | ||||||
|  | 	volumes, manifestSize, err := c.listVolumesWithManifestSize(newVolume) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return false, err | ||||||
|  | 	} | ||||||
|  | 	for _, pv := range volumes { | ||||||
|  | 		currentSize := quantityToGigabyte(pv.Spec.Capacity[v1.ResourceStorage]) | ||||||
|  | 		if currentSize != manifestSize { | ||||||
|  | 			return true, nil | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return false, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) listVolumesWithManifestSize(newVolume spec.Volume) ([]*v1.PersistentVolume, int64, error) { | ||||||
|  | 	newSize, err := resource.ParseQuantity(newVolume.Size) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, 0, fmt.Errorf("could not parse volume size from the manifest: %v", err) | ||||||
|  | 	} | ||||||
|  | 	manifestSize := quantityToGigabyte(newSize) | ||||||
|  | 	volumes, err := c.listPersistentVolumes() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, 0, fmt.Errorf("could not list persistent volumes: %v", err) | ||||||
|  | 	} | ||||||
|  | 	return volumes, manifestSize, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // getPodNameFromPersistentVolume returns a pod name that it extracts from the volume claim ref.
 | ||||||
|  | func getPodNameFromPersistentVolume(pv *v1.PersistentVolume) *spec.NamespacedName { | ||||||
|  | 	namespace := pv.Spec.ClaimRef.Namespace | ||||||
|  | 	name := pv.Spec.ClaimRef.Name[len(constants.DataVolumeName)+1:] | ||||||
|  | 	return &spec.NamespacedName{namespace, name} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func quantityToGigabyte(q resource.Quantity) int64 { | ||||||
|  | 	return q.ScaledValue(0) / (1 * constants.Gigabyte) | ||||||
|  | } | ||||||
|  | @ -169,8 +169,8 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Sync(); err != nil { | 		if err := cl.Sync(); err != nil { | ||||||
| 			cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) | 			cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) | ||||||
| 			logger.Errorf("%v", cl) | 			logger.Errorf("%v", cl.Error) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		cl.Error = nil | 		cl.Error = nil | ||||||
|  |  | ||||||
|  | @ -23,6 +23,7 @@ func (c *Controller) makeClusterConfig() cluster.Config { | ||||||
| 	return cluster.Config{ | 	return cluster.Config{ | ||||||
| 		KubeClient:          c.KubeClient, | 		KubeClient:          c.KubeClient, | ||||||
| 		RestClient:          c.RestClient, | 		RestClient:          c.RestClient, | ||||||
|  | 		RestConfig:          c.RestConfig, | ||||||
| 		TeamsAPIClient:      c.TeamsAPIClient, | 		TeamsAPIClient:      c.TeamsAPIClient, | ||||||
| 		OpConfig:            config.Copy(c.opConfig), | 		OpConfig:            config.Copy(c.opConfig), | ||||||
| 		InfrastructureRoles: infrastructureRoles, | 		InfrastructureRoles: infrastructureRoles, | ||||||
|  |  | ||||||
|  | @ -0,0 +1,9 @@ | ||||||
|  | package constants | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	ZalandoDNSNameAnnotation           = "external-dns.alpha.kubernetes.io/hostname" | ||||||
|  | 	ElbTimeoutAnnotationName           = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" | ||||||
|  | 	ElbTimeoutAnnotationValue          = "3600" | ||||||
|  | 	KubeIAmAnnotation                  = "iam.amazonaws.com/role" | ||||||
|  | 	VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" | ||||||
|  | ) | ||||||
|  | @ -0,0 +1,16 @@ | ||||||
|  | package constants | ||||||
|  | 
 | ||||||
|  | import "time" | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	AWS_REGION       = "eu-central-1" | ||||||
|  | 	EBSVolumeIDStart = "/vol-" | ||||||
|  | 	EBSProvisioner   = "kubernetes.io/aws-ebs" | ||||||
|  | 	//https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_VolumeModification.html
 | ||||||
|  | 	EBSVolumeStateModifying     = "modifying" | ||||||
|  | 	EBSVolumeStateOptimizing    = "optimizing" | ||||||
|  | 	EBSVolumeStateFailed        = "failed" | ||||||
|  | 	EBSVolumeStateCompleted     = "completed" | ||||||
|  | 	EBSVolumeResizeWaitInterval = 2 * time.Second | ||||||
|  | 	EBSVolumeResizeWaitTimeout  = 30 * time.Second | ||||||
|  | ) | ||||||
|  | @ -1,35 +0,0 @@ | ||||||
| package constants |  | ||||||
| 
 |  | ||||||
| import "time" |  | ||||||
| 
 |  | ||||||
| const ( |  | ||||||
| 	TPRName                     = "postgresql" |  | ||||||
| 	TPRVendor                   = "acid.zalan.do" |  | ||||||
| 	TPRDescription              = "Managed PostgreSQL clusters" |  | ||||||
| 	TPRApiVersion               = "v1" |  | ||||||
| 	ListClustersURITemplate     = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName       // Namespace
 |  | ||||||
| 	WatchClustersURITemplate    = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace
 |  | ||||||
| 	K8sVersion                  = "v1" |  | ||||||
| 	K8sAPIPath                  = "/api" |  | ||||||
| 	DataVolumeName              = "pgdata" |  | ||||||
| 	PasswordLength              = 64 |  | ||||||
| 	UserSecretTemplate          = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName
 |  | ||||||
| 	ZalandoDNSNameAnnotation    = "external-dns.alpha.kubernetes.io/hostname" |  | ||||||
| 	ElbTimeoutAnnotationName    = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" |  | ||||||
| 	ElbTimeoutAnnotationValue   = "3600" |  | ||||||
| 	KubeIAmAnnotation           = "iam.amazonaws.com/role" |  | ||||||
| 	ResourceName                = TPRName + "s" |  | ||||||
| 	PodRoleMaster               = "master" |  | ||||||
| 	PodRoleReplica              = "replica" |  | ||||||
| 	SuperuserKeyName            = "superuser" |  | ||||||
| 	ReplicationUserKeyName      = "replication" |  | ||||||
| 	StatefulsetDeletionInterval = 1 * time.Second |  | ||||||
| 	StatefulsetDeletionTimeout  = 30 * time.Second |  | ||||||
| 
 |  | ||||||
| 	RoleFlagSuperuser  = "SUPERUSER" |  | ||||||
| 	RoleFlagInherit    = "INHERIT" |  | ||||||
| 	RoleFlagLogin      = "LOGIN" |  | ||||||
| 	RoleFlagNoLogin    = "NOLOGIN" |  | ||||||
| 	RoleFlagCreateRole = "CREATEROLE" |  | ||||||
| 	RoleFlagCreateDB   = "CREATEDB" |  | ||||||
| ) |  | ||||||
|  | @ -0,0 +1,12 @@ | ||||||
|  | package constants | ||||||
|  | 
 | ||||||
|  | import "time" | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	ListClustersURITemplate     = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName       // Namespace
 | ||||||
|  | 	WatchClustersURITemplate    = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace
 | ||||||
|  | 	K8sVersion                  = "v1" | ||||||
|  | 	K8sAPIPath                  = "/api" | ||||||
|  | 	StatefulsetDeletionInterval = 1 * time.Second | ||||||
|  | 	StatefulsetDeletionTimeout  = 30 * time.Second | ||||||
|  | ) | ||||||
|  | @ -0,0 +1,9 @@ | ||||||
|  | package constants | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	DataVolumeName    = "pgdata" | ||||||
|  | 	PodRoleMaster     = "master" | ||||||
|  | 	PodRoleReplica    = "replica" | ||||||
|  | 	PostgresDataMount = "/home/postgres/pgdata" | ||||||
|  | 	PostgresDataPath  = PostgresDataMount + "/pgroot" | ||||||
|  | ) | ||||||
|  | @ -0,0 +1,14 @@ | ||||||
|  | package constants | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	PasswordLength         = 64 | ||||||
|  | 	UserSecretTemplate     = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName
 | ||||||
|  | 	SuperuserKeyName       = "superuser" | ||||||
|  | 	ReplicationUserKeyName = "replication" | ||||||
|  | 	RoleFlagSuperuser      = "SUPERUSER" | ||||||
|  | 	RoleFlagInherit        = "INHERIT" | ||||||
|  | 	RoleFlagLogin          = "LOGIN" | ||||||
|  | 	RoleFlagNoLogin        = "NOLOGIN" | ||||||
|  | 	RoleFlagCreateRole     = "CREATEROLE" | ||||||
|  | 	RoleFlagCreateDB       = "CREATEDB" | ||||||
|  | ) | ||||||
|  | @ -0,0 +1,9 @@ | ||||||
|  | package constants | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	TPRName        = "postgresql" | ||||||
|  | 	TPRVendor      = "acid.zalan.do" | ||||||
|  | 	TPRDescription = "Managed PostgreSQL clusters" | ||||||
|  | 	TPRApiVersion  = "v1" | ||||||
|  | 	ResourceName   = TPRName + "s" | ||||||
|  | ) | ||||||
|  | @ -0,0 +1,5 @@ | ||||||
|  | package constants | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	Gigabyte = 1073741824 | ||||||
|  | ) | ||||||
|  | @ -0,0 +1,38 @@ | ||||||
|  | package filesystems | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"regexp" | ||||||
|  | 	"strings" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var ( | ||||||
|  | 	ext2fsSuccessRegexp = regexp.MustCompile(`The filesystem on [/a-z0-9]+ is now \d+ \(\d+\w+\) blocks long.`) | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	EXT2      = "ext2" | ||||||
|  | 	EXT3      = "ext3" | ||||||
|  | 	EXT4      = "ext4" | ||||||
|  | 	resize2fs = "resize2fs" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type Ext234Resize struct { | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Ext234Resize) CanResizeFilesystem(fstype string) bool { | ||||||
|  | 	return fstype == EXT2 || fstype == EXT3 || fstype == EXT4 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func(cmd string) (out string, err error)) error { | ||||||
|  | 	command := fmt.Sprintf("%s %s 2>&1", resize2fs, deviceName) | ||||||
|  | 	out, err := commandExecutor(command) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if strings.Contains(out, "Nothing to do") || | ||||||
|  | 		(strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	return fmt.Errorf("unrecognized output: %s, assuming error", out) | ||||||
|  | } | ||||||
|  | @ -0,0 +1,6 @@ | ||||||
|  | package filesystems | ||||||
|  | 
 | ||||||
|  | type FilesystemResizer interface { | ||||||
|  | 	CanResizeFilesystem(fstype string) bool | ||||||
|  | 	ResizeFilesystem(deviceName string, commandExecutor func(string) (out string, err error)) error | ||||||
|  | } | ||||||
|  | @ -144,7 +144,7 @@ func TestInfo(t *testing.T) { | ||||||
| 	for _, tc := range teamsAPItc { | 	for _, tc := range teamsAPItc { | ||||||
| 		func() { | 		func() { | ||||||
| 			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | 			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||||
| 				if r.Header.Get("Authorization") != "Bearer " + token { | 				if r.Header.Get("Authorization") != "Bearer "+token { | ||||||
| 					t.Errorf("Authorization token is wrong or not provided") | 					t.Errorf("Authorization token is wrong or not provided") | ||||||
| 				} | 				} | ||||||
| 				w.WriteHeader(tc.inCode) | 				w.WriteHeader(tc.inCode) | ||||||
|  |  | ||||||
|  | @ -0,0 +1,101 @@ | ||||||
|  | package volumes | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"strings" | ||||||
|  | 
 | ||||||
|  | 	"github.com/aws/aws-sdk-go/aws" | ||||||
|  | 	"github.com/aws/aws-sdk-go/aws/session" | ||||||
|  | 	"github.com/aws/aws-sdk-go/service/ec2" | ||||||
|  | 
 | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" | ||||||
|  | 	"k8s.io/client-go/pkg/api/v1" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type EBSVolumeResizer struct { | ||||||
|  | 	connection *ec2.EC2 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *EBSVolumeResizer) ConnectToProvider() error { | ||||||
|  | 	sess, err := session.NewSession(&aws.Config{Region: aws.String(constants.AWS_REGION)}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("could not establish AWS session: %v", err) | ||||||
|  | 	} | ||||||
|  | 	c.connection = ec2.New(sess) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *EBSVolumeResizer) IsConnectedToProvider() bool { | ||||||
|  | 	return c.connection != nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool { | ||||||
|  | 	return pv.Spec.AWSElasticBlockStore != nil && pv.Annotations[constants.VolumeStorateProvisionerAnnotation] == constants.EBSProvisioner | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetProviderVolumeID converts aws://eu-central-1b/vol-00f93d4827217c629 to vol-00f93d4827217c629 for EBS volumes
 | ||||||
|  | func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { | ||||||
|  | 	volumeID := pv.Spec.AWSElasticBlockStore.VolumeID | ||||||
|  | 	if volumeID == "" { | ||||||
|  | 		return "", fmt.Errorf("volume id is empty for volume %s", pv.Name) | ||||||
|  | 	} | ||||||
|  | 	idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1 | ||||||
|  | 	if idx == 0 { | ||||||
|  | 		return "", fmt.Errorf("malfored EBS volume id %s", volumeID) | ||||||
|  | 	} | ||||||
|  | 	return volumeID[idx:], nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { | ||||||
|  | 	/* first check if the volume is already of a requested size */ | ||||||
|  | 	volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeId}}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("could not get information about the volume: %v", err) | ||||||
|  | 	} | ||||||
|  | 	vol := volumeOutput.Volumes[0] | ||||||
|  | 	if *vol.VolumeId != volumeId { | ||||||
|  | 		return fmt.Errorf("describe volume %s returned information about a non-matching volume %s", volumeId, *vol.VolumeId) | ||||||
|  | 	} | ||||||
|  | 	if *vol.Size == newSize { | ||||||
|  | 		// nothing to do
 | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeId} | ||||||
|  | 	output, err := c.connection.ModifyVolume(&input) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("could not modify persistent volume: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	state := *output.VolumeModification.ModificationState | ||||||
|  | 	if state == constants.EBSVolumeStateFailed { | ||||||
|  | 		return fmt.Errorf("could not modify persistent volume %s: modification state failed", volumeId) | ||||||
|  | 	} | ||||||
|  | 	if state == "" { | ||||||
|  | 		return fmt.Errorf("received empty modification status") | ||||||
|  | 	} | ||||||
|  | 	if state == constants.EBSVolumeStateOptimizing || state == constants.EBSVolumeStateCompleted { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	// wait until the volume reaches the "optimizing" or "completed" state
 | ||||||
|  | 	in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeId}} | ||||||
|  | 	return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout, | ||||||
|  | 		func() (bool, error) { | ||||||
|  | 			out, err := c.connection.DescribeVolumesModifications(&in) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return false, fmt.Errorf("could not describe volume modification: %v", err) | ||||||
|  | 			} | ||||||
|  | 			if len(out.VolumesModifications) != 1 { | ||||||
|  | 				return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) | ||||||
|  | 			} | ||||||
|  | 			if *out.VolumesModifications[0].VolumeId != volumeId { | ||||||
|  | 				return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"") | ||||||
|  | 			} | ||||||
|  | 			return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil | ||||||
|  | 		}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *EBSVolumeResizer) DisconnectFromProvider() error { | ||||||
|  | 	c.connection = nil | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | @ -0,0 +1,14 @@ | ||||||
|  | package volumes | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"k8s.io/client-go/pkg/api/v1" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type VolumeResizer interface { | ||||||
|  | 	ConnectToProvider() error | ||||||
|  | 	IsConnectedToProvider() bool | ||||||
|  | 	VolumeBelongsToProvider(pv *v1.PersistentVolume) bool | ||||||
|  | 	GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) | ||||||
|  | 	ResizeVolume(providerVolumeId string, newSize int64) error | ||||||
|  | 	DisconnectFromProvider() error | ||||||
|  | } | ||||||
		Loading…
	
		Reference in New Issue