diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index f5f1920f7..270df6b0e 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -28,3 +28,4 @@ data: super_username: postgres teams_api_url: http://fake-teams-api.default.svc.cluster.local workers: "4" + enable_load_balancer: "true" diff --git a/manifests/testpostgresql.yaml b/manifests/testpostgresql.yaml index 1728f46a7..e80d667ec 100644 --- a/manifests/testpostgresql.yaml +++ b/manifests/testpostgresql.yaml @@ -41,6 +41,7 @@ spec: loop_wait: &loop_wait 10 retry_timeout: 10 maximum_lag_on_failover: 33554432 + useLoadBalancer: true maintenanceWindows: - 01:00-06:00 #UTC - Sat:00:00-04:00 diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index eaa56ec46..e62f0b16e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -242,6 +242,10 @@ func (c *Cluster) Create() error { func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) { //TODO: improve comparison match = true + if c.Service[role].Spec.Type != service.Spec.Type { + return false, fmt.Sprintf("new %s service's type %s doesn't match the current one %s", + role, service.Spec.Type, c.Service[role].Spec.Type) + } oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges newSourceRanges := service.Spec.LoadBalancerSourceRanges /* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ @@ -292,7 +296,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp // In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through // and the combined effect of all the changes should be applied. // TODO: log all reasons for changing the statefulset, not just the last one. - // TODO: make sure this is in sync with genPodTemplate, ideally by using the same list of fields to generate + // TODO: make sure this is in sync with generatePodTemplate, ideally by using the same list of fields to generate // the template and the diff if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName { needsReplace = true @@ -435,7 +439,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { continue } } - newService := c.genService(role, newSpec.Spec.AllowedSourceRanges) + newService := c.generateService(role, &newSpec.Spec) if match, reason := c.sameServiceWith(role, newService); !match { c.logServiceChanges(role, c.Service[role], newService, true, reason) if err := c.updateService(role, newService); err != nil { @@ -446,7 +450,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } } - newStatefulSet, err := c.genStatefulSet(newSpec.Spec) + newStatefulSet, err := c.generateStatefulSet(newSpec.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index fd71661ad..0cba32837 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -18,6 +18,7 @@ const ( pgBinariesLocationTemplate = "/usr/lib/postgresql/%s/bin" patroniPGBinariesParameterName = "bin_dir" patroniPGParametersParameterName = "parameters" + localHost = "127.0.0.1/32" ) type pgUser struct { @@ -203,7 +204,7 @@ PATRONI_INITDB_PARAMS: return string(result) } -func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec { +func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec { spiloConfiguration := c.generateSpiloJSONConfiguration(pgParameters, patroniParameters) envVars := []v1.EnvVar{ @@ -323,14 +324,14 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, return &template } -func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) { +func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) { resourceRequirements, err := c.resourceRequirements(spec.Resources) if err != nil { return nil, err } - podTemplate := c.genPodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni) - volumeClaimTemplate, err := persistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) + podTemplate := c.generatePodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni) + volumeClaimTemplate, err := generatePersistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) if err != nil { return nil, err } @@ -352,7 +353,7 @@ func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, return statefulSet, nil } -func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { +func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { metadata := v1.ObjectMeta{ Name: constants.DataVolumeName, } @@ -383,19 +384,19 @@ func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.P return volumeClaim, nil } -func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) { +func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) { secrets = make(map[string]*v1.Secret, len(c.pgUsers)) namespace := c.Metadata.Namespace for username, pgUser := range c.pgUsers { //Skip users with no password i.e. human users (they'll be authenticated using pam) - secret := c.genSingleUserSecret(namespace, pgUser) + secret := c.generateSingleUserSecret(namespace, pgUser) if secret != nil { secrets[username] = secret } } /* special case for the system user */ for _, systemUser := range c.systemUsers { - secret := c.genSingleUserSecret(namespace, systemUser) + secret := c.generateSingleUserSecret(namespace, systemUser) if secret != nil { secrets[systemUser.Name] = secret } @@ -404,7 +405,7 @@ func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) { return } -func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret { +func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret { //Skip users with no password i.e. human users (they'll be authenticated using pam) if pgUser.Password == "" { return nil @@ -425,7 +426,7 @@ func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1. return &secret } -func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v1.Service { +func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service { dnsNameFunction := c.masterDnsName name := c.Metadata.Name @@ -434,30 +435,52 @@ func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v name = name + "-repl" } + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, + Type: v1.ServiceTypeClusterIP, + } + + if role == Replica { + serviceSpec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)} + } + + var annotations map[string]string + + // Examine the per-cluster load balancer setting, if it is not defined - check the operator configuration. + if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) || + (newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { + + // safe default value: lock load balancer to only local address unless overriden explicitely. + sourceRanges := []string{localHost} + allowedSourceRanges := newSpec.AllowedSourceRanges + if len(allowedSourceRanges) >= 0 { + sourceRanges = allowedSourceRanges + } + + serviceSpec.Type = v1.ServiceTypeLoadBalancer + serviceSpec.LoadBalancerSourceRanges = sourceRanges + + annotations = map[string]string{ + constants.ZalandoDNSNameAnnotation: dnsNameFunction(), + constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, + } + + } + service := &v1.Service{ ObjectMeta: v1.ObjectMeta{ - Name: name, - Namespace: c.Metadata.Namespace, - Labels: c.roleLabelsSet(role), - Annotations: map[string]string{ - constants.ZalandoDNSNameAnnotation: dnsNameFunction(), - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, + Name: name, + Namespace: c.Metadata.Namespace, + Labels: c.roleLabelsSet(role), + Annotations: annotations, }, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeLoadBalancer, - Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, - LoadBalancerSourceRanges: allowedSourceRanges, - }, - } - if role == Replica { - service.Spec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)} + Spec: serviceSpec, } return service } -func (c *Cluster) genMasterEndpoints() *v1.Endpoints { +func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints { endpoints := &v1.Endpoints{ ObjectMeta: v1.ObjectMeta{ Name: c.Metadata.Name, @@ -465,6 +488,9 @@ func (c *Cluster) genMasterEndpoints() *v1.Endpoints { Labels: c.roleLabelsSet(Master), }, } + if len(subsets) > 0 { + endpoints.Subsets = subsets + } return endpoints } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 7fad9f3a1..b71f8355f 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -119,7 +119,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { if c.Statefulset != nil { return nil, fmt.Errorf("statefulset already exists in the cluster") } - statefulSetSpec, err := c.genStatefulSet(c.Spec) + statefulSetSpec, err := c.generateStatefulSet(c.Spec) if err != nil { return nil, fmt.Errorf("could not generate statefulset: %v", err) } @@ -233,7 +233,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { if c.Service[role] != nil { return nil, fmt.Errorf("service already exists in the cluster") } - serviceSpec := c.genService(role, c.Spec.AllowedSourceRanges) + serviceSpec := c.generateService(role, &c.Spec) service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec) if err != nil { @@ -249,9 +249,47 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error return fmt.Errorf("there is no service in the cluster") } serviceName := util.NameFromMeta(c.Service[role].ObjectMeta) + endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta) + // TODO: check if it possible to change the service type with a patch in future versions of Kubernetes + if newService.Spec.Type != c.Service[role].Spec.Type { + // service type has changed, need to replace the service completely. + // we cannot use just pach the current service, since it may contain attributes incompatible with the new type. + var ( + currentEndpoint *v1.Endpoints + err error + ) + + if role == Master { + // for the master service we need to re-create the endpoint as well. Get the up-to-date version of + // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) + currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name) + if err != nil { + return fmt.Errorf("could not get current cluster endpoints: %v", err) + } + } + err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) + if err != nil { + return fmt.Errorf("could not delete service '%s': '%v'", serviceName, err) + } + c.Endpoint = nil + svc, err := c.KubeClient.Services(newService.Namespace).Create(newService) + if err != nil { + return fmt.Errorf("could not create service '%s': '%v'", serviceName, err) + } + c.Service[role] = svc + if role == Master { + // create the new endpoint using the addresses obtained from the previous one + endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) + ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) + if err != nil { + return fmt.Errorf("could not create endpoint '%s': '%v'", endpointName, err) + } + c.Endpoint = ep + } + return nil + } if len(newService.ObjectMeta.Annotations) > 0 { - annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations) _, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( @@ -300,7 +338,7 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { if c.Endpoint != nil { return nil, fmt.Errorf("endpoint already exists in the cluster") } - endpointsSpec := c.genMasterEndpoints() + endpointsSpec := c.generateMasterEndpoints(nil) endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec) if err != nil { @@ -327,7 +365,7 @@ func (c *Cluster) deleteEndpoint() error { } func (c *Cluster) applySecrets() error { - secrets := c.genUserSecrets() + secrets := c.generateUserSecrets() for secretUsername, secretSpec := range secrets { secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7dd2b9ee4..625eb34f5 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -100,7 +100,7 @@ func (c *Cluster) syncService(role PostgresRole) error { return nil } - desiredSvc := c.genService(role, cSpec.AllowedSourceRanges) + desiredSvc := c.generateService(role, &cSpec) match, reason := c.sameServiceWith(role, desiredSvc) if match { return nil @@ -158,7 +158,7 @@ func (c *Cluster) syncStatefulSet() error { } /* TODO: should check that we need to replace the statefulset */ if !rollUpdate { - desiredSS, err := c.genStatefulSet(cSpec) + desiredSS, err := c.generateStatefulSet(cSpec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 81b715801..31d46cc42 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -70,8 +70,8 @@ func metadataAnnotationsPatch(annotations map[string]string) string { annotationsList = append(annotationsList, fmt.Sprintf(`"%s":"%s"`, name, value)) } annotationsString := strings.Join(annotationsList, ",") - // TODO: perhaps use patchStrategy:"replace" json annotation instead of constructing the patch literally. - return fmt.Sprintf(constants.ServiceMetadataAnnotationFormat, annotationsString) + // TODO: perhaps use patchStrategy:action json annotation instead of constructing the patch literally. + return fmt.Sprintf(constants.ServiceMetadataAnnotationReplaceFormat, annotationsString) } func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index 172fe06f8..3649eff50 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -86,8 +86,10 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` - TeamID string `json:"teamId"` - AllowedSourceRanges []string `json:"allowedSourceRanges"` + TeamID string `json:"teamId"` + AllowedSourceRanges []string `json:"allowedSourceRanges"` + // EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omited from the manifest + UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"` ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"` NumberOfInstances int32 `json:"numberOfInstances"` Users map[string]userFlags `json:"users"` diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 822395ce9..398003841 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -3,6 +3,7 @@ package spec import ( "fmt" "strings" + "database/sql" "k8s.io/client-go/pkg/api/v1" diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 7676e3e6a..cd038e9a8 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -53,6 +53,7 @@ type Config struct { DebugLogging bool `name:"debug_logging" default:"true"` EnableDBAccess bool `name:"enable_database_access" default:"true"` EnableTeamsAPI bool `name:"enable_teams_api" default:"true"` + EnableLoadBalancer bool `name:"enable_load_balancer" default:"true"` MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` Workers uint32 `name:"workers" default:"4"` diff --git a/pkg/util/constants/annotations.go b/pkg/util/constants/annotations.go index 3b276cc3d..48e41cb16 100644 --- a/pkg/util/constants/annotations.go +++ b/pkg/util/constants/annotations.go @@ -2,10 +2,10 @@ package constants // Names and values in Kubernetes annotation for services, statefulsets and volumes const ( - ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" - ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" - ElbTimeoutAnnotationValue = "3600" - KubeIAmAnnotation = "iam.amazonaws.com/role" - VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" - ServiceMetadataAnnotationFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}` + ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" + ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" + ElbTimeoutAnnotationValue = "3600" + KubeIAmAnnotation = "iam.amazonaws.com/role" + VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" + ServiceMetadataAnnotationReplaceFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}` )