498 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			498 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
| package cluster
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"sort"
 | |
| 
 | |
| 	"k8s.io/apimachinery/pkg/api/resource"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/util/intstr"
 | |
| 	"k8s.io/client-go/pkg/api/v1"
 | |
| 	"k8s.io/client-go/pkg/apis/apps/v1beta1"
 | |
| 
 | |
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec"
 | |
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	pgBinariesLocationTemplate       = "/usr/lib/postgresql/%s/bin"
 | |
| 	patroniPGBinariesParameterName   = "bin_dir"
 | |
| 	patroniPGParametersParameterName = "parameters"
 | |
| 	localHost                        = "127.0.0.1/32"
 | |
| )
 | |
| 
 | |
| type pgUser struct {
 | |
| 	Password string   `json:"password"`
 | |
| 	Options  []string `json:"options"`
 | |
| }
 | |
| 
 | |
| type patroniDCS struct {
 | |
| 	TTL                  uint32  `json:"ttl,omitempty"`
 | |
| 	LoopWait             uint32  `json:"loop_wait,omitempty"`
 | |
| 	RetryTimeout         uint32  `json:"retry_timeout,omitempty"`
 | |
| 	MaximumLagOnFailover float32 `json:"maximum_lag_on_failover,omitempty"`
 | |
| }
 | |
| 
 | |
| type pgBootstrap struct {
 | |
| 	Initdb []interface{}     `json:"initdb"`
 | |
| 	Users  map[string]pgUser `json:"users"`
 | |
| 	PgHBA  []string          `json:"pg_hba"`
 | |
| 	DCS    patroniDCS        `json:"dcs,omitempty"`
 | |
| }
 | |
| 
 | |
| type spiloConfiguration struct {
 | |
| 	PgLocalConfiguration map[string]interface{} `json:"postgresql"`
 | |
| 	Bootstrap            pgBootstrap            `json:"bootstrap"`
 | |
| }
 | |
| 
 | |
| func (c *Cluster) resourceRequirements(resources spec.Resources) (*v1.ResourceRequirements, error) {
 | |
| 	var err error
 | |
| 
 | |
| 	specRequests := resources.ResourceRequest
 | |
| 	specLimits := resources.ResourceLimits
 | |
| 
 | |
| 	config := c.OpConfig
 | |
| 
 | |
| 	defaultRequests := spec.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest}
 | |
| 	defaultLimits := spec.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit}
 | |
| 
 | |
| 	result := v1.ResourceRequirements{}
 | |
| 
 | |
| 	result.Requests, err = fillResourceList(specRequests, defaultRequests)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("could not fill resource requests: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	result.Limits, err = fillResourceList(specLimits, defaultLimits)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("could not fill resource limits: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	return &result, nil
 | |
| }
 | |
| 
 | |
| func fillResourceList(spec spec.ResourceDescription, defaults spec.ResourceDescription) (v1.ResourceList, error) {
 | |
| 	var err error
 | |
| 	requests := v1.ResourceList{}
 | |
| 
 | |
| 	if spec.CPU != "" {
 | |
| 		requests[v1.ResourceCPU], err = resource.ParseQuantity(spec.CPU)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("could not parse CPU quantity: %v", err)
 | |
| 		}
 | |
| 	} else {
 | |
| 		requests[v1.ResourceCPU], err = resource.ParseQuantity(defaults.CPU)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("could not parse default CPU quantity: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 	if spec.Memory != "" {
 | |
| 		requests[v1.ResourceMemory], err = resource.ParseQuantity(spec.Memory)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("could not parse memory quantity: %v", err)
 | |
| 		}
 | |
| 	} else {
 | |
| 		requests[v1.ResourceMemory], err = resource.ParseQuantity(defaults.Memory)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("could not parse default memory quantity: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return requests, nil
 | |
| }
 | |
| 
 | |
| func (c *Cluster) generateSpiloJSONConfiguration(pg *spec.PostgresqlParam, patroni *spec.Patroni) string {
 | |
| 	config := spiloConfiguration{}
 | |
| 
 | |
| 	config.Bootstrap = pgBootstrap{}
 | |
| 
 | |
| 	config.Bootstrap.Initdb = []interface{}{map[string]string{"auth-host": "md5"},
 | |
| 		map[string]string{"auth-local": "trust"}}
 | |
| 
 | |
| 	initdbOptionNames := []string{}
 | |
| 
 | |
| 	for k := range patroni.InitDB {
 | |
| 		initdbOptionNames = append(initdbOptionNames, k)
 | |
| 	}
 | |
| 	/* We need to sort the user-defined options to more easily compare the resulting specs */
 | |
| 	sort.Strings(initdbOptionNames)
 | |
| 
 | |
| 	// Initdb parameters in the manifest take priority over the default ones
 | |
| 	// The whole type switch dance is caused by the ability to specify both
 | |
| 	// maps and normal string items in the array of initdb options. We need
 | |
| 	// both to convert the initial key-value to strings when necessary, and
 | |
| 	// to de-duplicate the options supplied.
 | |
| PATRONI_INITDB_PARAMS:
 | |
| 	for _, k := range initdbOptionNames {
 | |
| 		v := patroni.InitDB[k]
 | |
| 		for i, defaultParam := range config.Bootstrap.Initdb {
 | |
| 			switch defaultParam.(type) {
 | |
| 			case map[string]string:
 | |
| 				{
 | |
| 					for k1 := range defaultParam.(map[string]string) {
 | |
| 						if k1 == k {
 | |
| 							(config.Bootstrap.Initdb[i]).(map[string]string)[k] = v
 | |
| 							continue PATRONI_INITDB_PARAMS
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 			case string:
 | |
| 				{
 | |
| 					/* if the option already occurs in the list */
 | |
| 					if defaultParam.(string) == v {
 | |
| 						continue PATRONI_INITDB_PARAMS
 | |
| 					}
 | |
| 				}
 | |
| 			default:
 | |
| 				c.logger.Warningf("unsupported type for initdb configuration item %s: %T", defaultParam, defaultParam)
 | |
| 				continue PATRONI_INITDB_PARAMS
 | |
| 			}
 | |
| 		}
 | |
| 		// The following options are known to have no parameters
 | |
| 		if v == "true" {
 | |
| 			switch k {
 | |
| 			case "data-checksums", "debug", "no-locale", "noclean", "nosync", "sync-only":
 | |
| 				config.Bootstrap.Initdb = append(config.Bootstrap.Initdb, k)
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 		config.Bootstrap.Initdb = append(config.Bootstrap.Initdb, map[string]string{k: v})
 | |
| 	}
 | |
| 
 | |
| 	// pg_hba parameters in the manifest replace the default ones. We cannot
 | |
| 	// reasonably merge them automatically, because pg_hba parsing stops on
 | |
| 	// a first successfully matched rule.
 | |
| 	if len(patroni.PgHba) > 0 {
 | |
| 		config.Bootstrap.PgHBA = patroni.PgHba
 | |
| 	} else {
 | |
| 		config.Bootstrap.PgHBA = []string{
 | |
| 			"hostnossl all all all reject",
 | |
| 			fmt.Sprintf("hostssl   all +%s all pam", c.OpConfig.PamRoleName),
 | |
| 			"hostssl   all all all md5",
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if patroni.MaximumLagOnFailover >= 0 {
 | |
| 		config.Bootstrap.DCS.MaximumLagOnFailover = patroni.MaximumLagOnFailover
 | |
| 	}
 | |
| 	if patroni.LoopWait != 0 {
 | |
| 		config.Bootstrap.DCS.LoopWait = patroni.LoopWait
 | |
| 	}
 | |
| 	if patroni.RetryTimeout != 0 {
 | |
| 		config.Bootstrap.DCS.RetryTimeout = patroni.RetryTimeout
 | |
| 	}
 | |
| 	if patroni.TTL != 0 {
 | |
| 		config.Bootstrap.DCS.TTL = patroni.TTL
 | |
| 	}
 | |
| 
 | |
| 	config.PgLocalConfiguration = make(map[string]interface{})
 | |
| 	config.PgLocalConfiguration[patroniPGBinariesParameterName] = fmt.Sprintf(pgBinariesLocationTemplate, pg.PgVersion)
 | |
| 	if len(pg.Parameters) > 0 {
 | |
| 		config.PgLocalConfiguration[patroniPGParametersParameterName] = pg.Parameters
 | |
| 	}
 | |
| 	config.Bootstrap.Users = map[string]pgUser{
 | |
| 		c.OpConfig.PamRoleName: {
 | |
| 			Password: "",
 | |
| 			Options:  []string{constants.RoleFlagCreateDB, constants.RoleFlagNoLogin},
 | |
| 		},
 | |
| 	}
 | |
| 	result, err := json.Marshal(config)
 | |
| 	if err != nil {
 | |
| 		c.logger.Errorf("cannot convert spilo configuration into JSON: %v", err)
 | |
| 		return ""
 | |
| 	}
 | |
| 	return string(result)
 | |
| }
 | |
| 
 | |
| func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec {
 | |
| 	spiloConfiguration := c.generateSpiloJSONConfiguration(pgParameters, patroniParameters)
 | |
| 
 | |
| 	envVars := []v1.EnvVar{
 | |
| 		{
 | |
| 			Name:  "SCOPE",
 | |
| 			Value: c.Name,
 | |
| 		},
 | |
| 		{
 | |
| 			Name:  "PGROOT",
 | |
| 			Value: constants.PostgresDataPath,
 | |
| 		},
 | |
| 		{
 | |
| 			Name:  "ETCD_HOST",
 | |
| 			Value: c.OpConfig.EtcdHost,
 | |
| 		},
 | |
| 		{
 | |
| 			Name: "POD_IP",
 | |
| 			ValueFrom: &v1.EnvVarSource{
 | |
| 				FieldRef: &v1.ObjectFieldSelector{
 | |
| 					APIVersion: "v1",
 | |
| 					FieldPath:  "status.podIP",
 | |
| 				},
 | |
| 			},
 | |
| 		},
 | |
| 		{
 | |
| 			Name: "POD_NAMESPACE",
 | |
| 			ValueFrom: &v1.EnvVarSource{
 | |
| 				FieldRef: &v1.ObjectFieldSelector{
 | |
| 					APIVersion: "v1",
 | |
| 					FieldPath:  "metadata.namespace",
 | |
| 				},
 | |
| 			},
 | |
| 		},
 | |
| 		{
 | |
| 			Name: "PGPASSWORD_SUPERUSER",
 | |
| 			ValueFrom: &v1.EnvVarSource{
 | |
| 				SecretKeyRef: &v1.SecretKeySelector{
 | |
| 					LocalObjectReference: v1.LocalObjectReference{
 | |
| 						Name: c.credentialSecretName(c.OpConfig.SuperUsername),
 | |
| 					},
 | |
| 					Key: "password",
 | |
| 				},
 | |
| 			},
 | |
| 		},
 | |
| 		{
 | |
| 			Name: "PGPASSWORD_STANDBY",
 | |
| 			ValueFrom: &v1.EnvVarSource{
 | |
| 				SecretKeyRef: &v1.SecretKeySelector{
 | |
| 					LocalObjectReference: v1.LocalObjectReference{
 | |
| 						Name: c.credentialSecretName(c.OpConfig.ReplicationUsername),
 | |
| 					},
 | |
| 					Key: "password",
 | |
| 				},
 | |
| 			},
 | |
| 		},
 | |
| 		{
 | |
| 			Name:  "PAM_OAUTH2",
 | |
| 			Value: c.OpConfig.PamConfiguration,
 | |
| 		},
 | |
| 	}
 | |
| 	if spiloConfiguration != "" {
 | |
| 		envVars = append(envVars, v1.EnvVar{Name: "SPILO_CONFIGURATION", Value: spiloConfiguration})
 | |
| 	}
 | |
| 	if c.OpConfig.WALES3Bucket != "" {
 | |
| 		envVars = append(envVars, v1.EnvVar{Name: "WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket})
 | |
| 	}
 | |
| 	privilegedMode := bool(true)
 | |
| 	container := v1.Container{
 | |
| 		Name:            c.Name,
 | |
| 		Image:           c.OpConfig.DockerImage,
 | |
| 		ImagePullPolicy: v1.PullAlways,
 | |
| 		Resources:       *resourceRequirements,
 | |
| 		Ports: []v1.ContainerPort{
 | |
| 			{
 | |
| 				ContainerPort: 8008,
 | |
| 				Protocol:      v1.ProtocolTCP,
 | |
| 			},
 | |
| 			{
 | |
| 				ContainerPort: 5432,
 | |
| 				Protocol:      v1.ProtocolTCP,
 | |
| 			},
 | |
| 			{
 | |
| 				ContainerPort: 8080,
 | |
| 				Protocol:      v1.ProtocolTCP,
 | |
| 			},
 | |
| 		},
 | |
| 		VolumeMounts: []v1.VolumeMount{
 | |
| 			{
 | |
| 				Name:      constants.DataVolumeName,
 | |
| 				MountPath: constants.PostgresDataMount, //TODO: fetch from manifesto
 | |
| 			},
 | |
| 		},
 | |
| 		Env: envVars,
 | |
| 		SecurityContext: &v1.SecurityContext{
 | |
| 			Privileged: &privilegedMode,
 | |
| 		},
 | |
| 	}
 | |
| 	terminateGracePeriodSeconds := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
 | |
| 
 | |
| 	podSpec := v1.PodSpec{
 | |
| 		ServiceAccountName:            c.OpConfig.ServiceAccountName,
 | |
| 		TerminationGracePeriodSeconds: &terminateGracePeriodSeconds,
 | |
| 		Containers:                    []v1.Container{container},
 | |
| 	}
 | |
| 
 | |
| 	template := v1.PodTemplateSpec{
 | |
| 		ObjectMeta: metav1.ObjectMeta{
 | |
| 			Labels:    c.labelsSet(),
 | |
| 			Namespace: c.Name,
 | |
| 		},
 | |
| 		Spec: podSpec,
 | |
| 	}
 | |
| 	if c.OpConfig.KubeIAMRole != "" {
 | |
| 		template.Annotations = map[string]string{constants.KubeIAmAnnotation: c.OpConfig.KubeIAMRole}
 | |
| 	}
 | |
| 
 | |
| 	return &template
 | |
| }
 | |
| 
 | |
| func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) {
 | |
| 	resourceRequirements, err := c.resourceRequirements(spec.Resources)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	podTemplate := c.generatePodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni)
 | |
| 	volumeClaimTemplate, err := generatePersistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	statefulSet := &v1beta1.StatefulSet{
 | |
| 		ObjectMeta: metav1.ObjectMeta{
 | |
| 			Name:      c.Name,
 | |
| 			Namespace: c.Namespace,
 | |
| 			Labels:    c.labelsSet(),
 | |
| 		},
 | |
| 		Spec: v1beta1.StatefulSetSpec{
 | |
| 			Replicas:             &spec.NumberOfInstances,
 | |
| 			ServiceName:          c.Name,
 | |
| 			Template:             *podTemplate,
 | |
| 			VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate},
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	return statefulSet, nil
 | |
| }
 | |
| 
 | |
| func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
 | |
| 	metadata := metav1.ObjectMeta{
 | |
| 		Name: constants.DataVolumeName,
 | |
| 	}
 | |
| 	if volumeStorageClass != "" {
 | |
| 		// TODO: check if storage class exists
 | |
| 		metadata.Annotations = map[string]string{"volume.beta.kubernetes.io/storage-class": volumeStorageClass}
 | |
| 	} else {
 | |
| 		metadata.Annotations = map[string]string{"volume.alpha.kubernetes.io/storage-class": "default"}
 | |
| 	}
 | |
| 
 | |
| 	quantity, err := resource.ParseQuantity(volumeSize)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("could not parse volume size: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	volumeClaim := &v1.PersistentVolumeClaim{
 | |
| 		ObjectMeta: metadata,
 | |
| 		Spec: v1.PersistentVolumeClaimSpec{
 | |
| 			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
 | |
| 			Resources: v1.ResourceRequirements{
 | |
| 				Requests: v1.ResourceList{
 | |
| 					v1.ResourceStorage: quantity,
 | |
| 				},
 | |
| 			},
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	return volumeClaim, nil
 | |
| }
 | |
| 
 | |
| func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) {
 | |
| 	secrets = make(map[string]*v1.Secret, len(c.pgUsers))
 | |
| 	namespace := c.Namespace
 | |
| 	for username, pgUser := range c.pgUsers {
 | |
| 		//Skip users with no password i.e. human users (they'll be authenticated using pam)
 | |
| 		secret := c.generateSingleUserSecret(namespace, pgUser)
 | |
| 		if secret != nil {
 | |
| 			secrets[username] = secret
 | |
| 		}
 | |
| 	}
 | |
| 	/* special case for the system user */
 | |
| 	for _, systemUser := range c.systemUsers {
 | |
| 		secret := c.generateSingleUserSecret(namespace, systemUser)
 | |
| 		if secret != nil {
 | |
| 			secrets[systemUser.Name] = secret
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret {
 | |
| 	//Skip users with no password i.e. human users (they'll be authenticated using pam)
 | |
| 	if pgUser.Password == "" {
 | |
| 		return nil
 | |
| 	}
 | |
| 	username := pgUser.Name
 | |
| 	secret := v1.Secret{
 | |
| 		ObjectMeta: metav1.ObjectMeta{
 | |
| 			Name:      c.credentialSecretName(username),
 | |
| 			Namespace: namespace,
 | |
| 			Labels:    c.labelsSet(),
 | |
| 		},
 | |
| 		Type: v1.SecretTypeOpaque,
 | |
| 		Data: map[string][]byte{
 | |
| 			"username": []byte(pgUser.Name),
 | |
| 			"password": []byte(pgUser.Password),
 | |
| 		},
 | |
| 	}
 | |
| 	return &secret
 | |
| }
 | |
| 
 | |
| func (c *Cluster) generateService(role postgresRole, newSpec *spec.PostgresSpec) *v1.Service {
 | |
| 
 | |
| 	dnsNameFunction := c.masterDNSName
 | |
| 	name := c.Name
 | |
| 	if role == replica {
 | |
| 		dnsNameFunction = c.replicaDNSName
 | |
| 		name = name + "-repl"
 | |
| 	}
 | |
| 
 | |
| 	serviceSpec := v1.ServiceSpec{
 | |
| 		Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}},
 | |
| 		Type:  v1.ServiceTypeClusterIP,
 | |
| 	}
 | |
| 
 | |
| 	if role == replica {
 | |
| 		serviceSpec.Selector = c.roleLabelsSet(role)
 | |
| 	}
 | |
| 
 | |
| 	var annotations map[string]string
 | |
| 
 | |
| 	// Examine the per-cluster load balancer setting, if it is not defined - check the operator configuration.
 | |
| 	if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) ||
 | |
| 		(newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) {
 | |
| 
 | |
| 		// safe default value: lock load balancer to only local address unless overridden explicitly.
 | |
| 		sourceRanges := []string{localHost}
 | |
| 		allowedSourceRanges := newSpec.AllowedSourceRanges
 | |
| 		if len(allowedSourceRanges) >= 0 {
 | |
| 			sourceRanges = allowedSourceRanges
 | |
| 		}
 | |
| 
 | |
| 		serviceSpec.Type = v1.ServiceTypeLoadBalancer
 | |
| 		serviceSpec.LoadBalancerSourceRanges = sourceRanges
 | |
| 
 | |
| 		annotations = map[string]string{
 | |
| 			constants.ZalandoDNSNameAnnotation: dnsNameFunction(),
 | |
| 			constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
 | |
| 		}
 | |
| 
 | |
| 	}
 | |
| 
 | |
| 	service := &v1.Service{
 | |
| 		ObjectMeta: metav1.ObjectMeta{
 | |
| 			Name:        name,
 | |
| 			Namespace:   c.Namespace,
 | |
| 			Labels:      c.roleLabelsSet(role),
 | |
| 			Annotations: annotations,
 | |
| 		},
 | |
| 		Spec: serviceSpec,
 | |
| 	}
 | |
| 
 | |
| 	return service
 | |
| }
 | |
| 
 | |
| func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints {
 | |
| 	endpoints := &v1.Endpoints{
 | |
| 		ObjectMeta: metav1.ObjectMeta{
 | |
| 			Name:      c.Name,
 | |
| 			Namespace: c.Namespace,
 | |
| 			Labels:    c.roleLabelsSet(master),
 | |
| 		},
 | |
| 	}
 | |
| 	if len(subsets) > 0 {
 | |
| 		endpoints.Subsets = subsets
 | |
| 	}
 | |
| 
 | |
| 	return endpoints
 | |
| }
 |