Configure load balancer on a per-cluster and operator-wide level (#57)
* Deny all requests to the load balancer by default. * Operator-wide toggle for the load-balancer. * Define per-cluster useLoadBalancer option. If useLoadBalancer is not set - then operator-wide defaults take place. If it is true - the load balancer is created, otherwise a service type clusterIP is created. Internally, we have to completely replace the service if the service type changes. We cannot patch, since some fields from the old service that will remain after patch are incompatible with the new one, and handling them explicitly when updating the service is ugly and error-prone. We cannot update the service because of the immutable fields, that leaves us the only option of deleting the old service and creating the new one. Unfortunately, there is still an issue of unnecessary removal of endpoints associated with the service, it will be addressed in future commits. * Revert the unintended effect of go fmt * Recreate endpoints on service update. When the service type is changed, the service is deleted and then the one with the new type is created. Unfortnately, endpoints are deleted as well. Re-create them afterwards, preserving the original addresses stored in them. * Improve error messages and comments. Use generate instead of gen in names.
This commit is contained in:
parent
ba6529bec9
commit
00150711e4
|
|
@ -28,3 +28,4 @@ data:
|
|||
super_username: postgres
|
||||
teams_api_url: http://fake-teams-api.default.svc.cluster.local
|
||||
workers: "4"
|
||||
enable_load_balancer: "true"
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ spec:
|
|||
loop_wait: &loop_wait 10
|
||||
retry_timeout: 10
|
||||
maximum_lag_on_failover: 33554432
|
||||
useLoadBalancer: true
|
||||
maintenanceWindows:
|
||||
- 01:00-06:00 #UTC
|
||||
- Sat:00:00-04:00
|
||||
|
|
|
|||
|
|
@ -242,6 +242,10 @@ func (c *Cluster) Create() error {
|
|||
func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) {
|
||||
//TODO: improve comparison
|
||||
match = true
|
||||
if c.Service[role].Spec.Type != service.Spec.Type {
|
||||
return false, fmt.Sprintf("new %s service's type %s doesn't match the current one %s",
|
||||
role, service.Spec.Type, c.Service[role].Spec.Type)
|
||||
}
|
||||
oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges
|
||||
newSourceRanges := service.Spec.LoadBalancerSourceRanges
|
||||
/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */
|
||||
|
|
@ -292,7 +296,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
|
|||
// In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through
|
||||
// and the combined effect of all the changes should be applied.
|
||||
// TODO: log all reasons for changing the statefulset, not just the last one.
|
||||
// TODO: make sure this is in sync with genPodTemplate, ideally by using the same list of fields to generate
|
||||
// TODO: make sure this is in sync with generatePodTemplate, ideally by using the same list of fields to generate
|
||||
// the template and the diff
|
||||
if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName {
|
||||
needsReplace = true
|
||||
|
|
@ -435,7 +439,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
|||
continue
|
||||
}
|
||||
}
|
||||
newService := c.genService(role, newSpec.Spec.AllowedSourceRanges)
|
||||
newService := c.generateService(role, &newSpec.Spec)
|
||||
if match, reason := c.sameServiceWith(role, newService); !match {
|
||||
c.logServiceChanges(role, c.Service[role], newService, true, reason)
|
||||
if err := c.updateService(role, newService); err != nil {
|
||||
|
|
@ -446,7 +450,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
|||
}
|
||||
}
|
||||
|
||||
newStatefulSet, err := c.genStatefulSet(newSpec.Spec)
|
||||
newStatefulSet, err := c.generateStatefulSet(newSpec.Spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not generate statefulset: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ const (
|
|||
pgBinariesLocationTemplate = "/usr/lib/postgresql/%s/bin"
|
||||
patroniPGBinariesParameterName = "bin_dir"
|
||||
patroniPGParametersParameterName = "parameters"
|
||||
localHost = "127.0.0.1/32"
|
||||
)
|
||||
|
||||
type pgUser struct {
|
||||
|
|
@ -203,7 +204,7 @@ PATRONI_INITDB_PARAMS:
|
|||
return string(result)
|
||||
}
|
||||
|
||||
func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec {
|
||||
func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec {
|
||||
spiloConfiguration := c.generateSpiloJSONConfiguration(pgParameters, patroniParameters)
|
||||
|
||||
envVars := []v1.EnvVar{
|
||||
|
|
@ -323,14 +324,14 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements,
|
|||
return &template
|
||||
}
|
||||
|
||||
func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) {
|
||||
func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) {
|
||||
resourceRequirements, err := c.resourceRequirements(spec.Resources)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
podTemplate := c.genPodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni)
|
||||
volumeClaimTemplate, err := persistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass)
|
||||
podTemplate := c.generatePodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni)
|
||||
volumeClaimTemplate, err := generatePersistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -352,7 +353,7 @@ func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet,
|
|||
return statefulSet, nil
|
||||
}
|
||||
|
||||
func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
|
||||
func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
|
||||
metadata := v1.ObjectMeta{
|
||||
Name: constants.DataVolumeName,
|
||||
}
|
||||
|
|
@ -383,19 +384,19 @@ func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.P
|
|||
return volumeClaim, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) {
|
||||
func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) {
|
||||
secrets = make(map[string]*v1.Secret, len(c.pgUsers))
|
||||
namespace := c.Metadata.Namespace
|
||||
for username, pgUser := range c.pgUsers {
|
||||
//Skip users with no password i.e. human users (they'll be authenticated using pam)
|
||||
secret := c.genSingleUserSecret(namespace, pgUser)
|
||||
secret := c.generateSingleUserSecret(namespace, pgUser)
|
||||
if secret != nil {
|
||||
secrets[username] = secret
|
||||
}
|
||||
}
|
||||
/* special case for the system user */
|
||||
for _, systemUser := range c.systemUsers {
|
||||
secret := c.genSingleUserSecret(namespace, systemUser)
|
||||
secret := c.generateSingleUserSecret(namespace, systemUser)
|
||||
if secret != nil {
|
||||
secrets[systemUser.Name] = secret
|
||||
}
|
||||
|
|
@ -404,7 +405,7 @@ func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) {
|
|||
return
|
||||
}
|
||||
|
||||
func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret {
|
||||
func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret {
|
||||
//Skip users with no password i.e. human users (they'll be authenticated using pam)
|
||||
if pgUser.Password == "" {
|
||||
return nil
|
||||
|
|
@ -425,7 +426,7 @@ func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.
|
|||
return &secret
|
||||
}
|
||||
|
||||
func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v1.Service {
|
||||
func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service {
|
||||
|
||||
dnsNameFunction := c.masterDnsName
|
||||
name := c.Metadata.Name
|
||||
|
|
@ -434,30 +435,52 @@ func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v
|
|||
name = name + "-repl"
|
||||
}
|
||||
|
||||
serviceSpec := v1.ServiceSpec{
|
||||
Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}},
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
}
|
||||
|
||||
if role == Replica {
|
||||
serviceSpec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)}
|
||||
}
|
||||
|
||||
var annotations map[string]string
|
||||
|
||||
// Examine the per-cluster load balancer setting, if it is not defined - check the operator configuration.
|
||||
if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) ||
|
||||
(newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) {
|
||||
|
||||
// safe default value: lock load balancer to only local address unless overriden explicitely.
|
||||
sourceRanges := []string{localHost}
|
||||
allowedSourceRanges := newSpec.AllowedSourceRanges
|
||||
if len(allowedSourceRanges) >= 0 {
|
||||
sourceRanges = allowedSourceRanges
|
||||
}
|
||||
|
||||
serviceSpec.Type = v1.ServiceTypeLoadBalancer
|
||||
serviceSpec.LoadBalancerSourceRanges = sourceRanges
|
||||
|
||||
annotations = map[string]string{
|
||||
constants.ZalandoDNSNameAnnotation: dnsNameFunction(),
|
||||
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
service := &v1.Service{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: c.Metadata.Namespace,
|
||||
Labels: c.roleLabelsSet(role),
|
||||
Annotations: map[string]string{
|
||||
constants.ZalandoDNSNameAnnotation: dnsNameFunction(),
|
||||
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
|
||||
},
|
||||
Name: name,
|
||||
Namespace: c.Metadata.Namespace,
|
||||
Labels: c.roleLabelsSet(role),
|
||||
Annotations: annotations,
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeLoadBalancer,
|
||||
Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}},
|
||||
LoadBalancerSourceRanges: allowedSourceRanges,
|
||||
},
|
||||
}
|
||||
if role == Replica {
|
||||
service.Spec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)}
|
||||
Spec: serviceSpec,
|
||||
}
|
||||
|
||||
return service
|
||||
}
|
||||
|
||||
func (c *Cluster) genMasterEndpoints() *v1.Endpoints {
|
||||
func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints {
|
||||
endpoints := &v1.Endpoints{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: c.Metadata.Name,
|
||||
|
|
@ -465,6 +488,9 @@ func (c *Cluster) genMasterEndpoints() *v1.Endpoints {
|
|||
Labels: c.roleLabelsSet(Master),
|
||||
},
|
||||
}
|
||||
if len(subsets) > 0 {
|
||||
endpoints.Subsets = subsets
|
||||
}
|
||||
|
||||
return endpoints
|
||||
}
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
|
|||
if c.Statefulset != nil {
|
||||
return nil, fmt.Errorf("statefulset already exists in the cluster")
|
||||
}
|
||||
statefulSetSpec, err := c.genStatefulSet(c.Spec)
|
||||
statefulSetSpec, err := c.generateStatefulSet(c.Spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate statefulset: %v", err)
|
||||
}
|
||||
|
|
@ -233,7 +233,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
|
|||
if c.Service[role] != nil {
|
||||
return nil, fmt.Errorf("service already exists in the cluster")
|
||||
}
|
||||
serviceSpec := c.genService(role, c.Spec.AllowedSourceRanges)
|
||||
serviceSpec := c.generateService(role, &c.Spec)
|
||||
|
||||
service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec)
|
||||
if err != nil {
|
||||
|
|
@ -249,9 +249,47 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
|
|||
return fmt.Errorf("there is no service in the cluster")
|
||||
}
|
||||
serviceName := util.NameFromMeta(c.Service[role].ObjectMeta)
|
||||
endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta)
|
||||
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
|
||||
if newService.Spec.Type != c.Service[role].Spec.Type {
|
||||
// service type has changed, need to replace the service completely.
|
||||
// we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
|
||||
var (
|
||||
currentEndpoint *v1.Endpoints
|
||||
err error
|
||||
)
|
||||
|
||||
if role == Master {
|
||||
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
|
||||
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
|
||||
currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get current cluster endpoints: %v", err)
|
||||
}
|
||||
}
|
||||
err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete service '%s': '%v'", serviceName, err)
|
||||
}
|
||||
c.Endpoint = nil
|
||||
svc, err := c.KubeClient.Services(newService.Namespace).Create(newService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create service '%s': '%v'", serviceName, err)
|
||||
}
|
||||
c.Service[role] = svc
|
||||
if role == Master {
|
||||
// create the new endpoint using the addresses obtained from the previous one
|
||||
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
|
||||
ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create endpoint '%s': '%v'", endpointName, err)
|
||||
}
|
||||
c.Endpoint = ep
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(newService.ObjectMeta.Annotations) > 0 {
|
||||
|
||||
annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations)
|
||||
|
||||
_, err := c.KubeClient.Services(c.Service[role].Namespace).Patch(
|
||||
|
|
@ -300,7 +338,7 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) {
|
|||
if c.Endpoint != nil {
|
||||
return nil, fmt.Errorf("endpoint already exists in the cluster")
|
||||
}
|
||||
endpointsSpec := c.genMasterEndpoints()
|
||||
endpointsSpec := c.generateMasterEndpoints(nil)
|
||||
|
||||
endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec)
|
||||
if err != nil {
|
||||
|
|
@ -327,7 +365,7 @@ func (c *Cluster) deleteEndpoint() error {
|
|||
}
|
||||
|
||||
func (c *Cluster) applySecrets() error {
|
||||
secrets := c.genUserSecrets()
|
||||
secrets := c.generateUserSecrets()
|
||||
|
||||
for secretUsername, secretSpec := range secrets {
|
||||
secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ func (c *Cluster) syncService(role PostgresRole) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
desiredSvc := c.genService(role, cSpec.AllowedSourceRanges)
|
||||
desiredSvc := c.generateService(role, &cSpec)
|
||||
match, reason := c.sameServiceWith(role, desiredSvc)
|
||||
if match {
|
||||
return nil
|
||||
|
|
@ -158,7 +158,7 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
}
|
||||
/* TODO: should check that we need to replace the statefulset */
|
||||
if !rollUpdate {
|
||||
desiredSS, err := c.genStatefulSet(cSpec)
|
||||
desiredSS, err := c.generateStatefulSet(cSpec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not generate statefulset: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,8 +70,8 @@ func metadataAnnotationsPatch(annotations map[string]string) string {
|
|||
annotationsList = append(annotationsList, fmt.Sprintf(`"%s":"%s"`, name, value))
|
||||
}
|
||||
annotationsString := strings.Join(annotationsList, ",")
|
||||
// TODO: perhaps use patchStrategy:"replace" json annotation instead of constructing the patch literally.
|
||||
return fmt.Sprintf(constants.ServiceMetadataAnnotationFormat, annotationsString)
|
||||
// TODO: perhaps use patchStrategy:action json annotation instead of constructing the patch literally.
|
||||
return fmt.Sprintf(constants.ServiceMetadataAnnotationReplaceFormat, annotationsString)
|
||||
}
|
||||
|
||||
func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) {
|
||||
|
|
|
|||
|
|
@ -86,8 +86,10 @@ type PostgresSpec struct {
|
|||
Patroni `json:"patroni,omitempty"`
|
||||
Resources `json:"resources,omitempty"`
|
||||
|
||||
TeamID string `json:"teamId"`
|
||||
AllowedSourceRanges []string `json:"allowedSourceRanges"`
|
||||
TeamID string `json:"teamId"`
|
||||
AllowedSourceRanges []string `json:"allowedSourceRanges"`
|
||||
// EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omited from the manifest
|
||||
UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"`
|
||||
ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"`
|
||||
NumberOfInstances int32 `json:"numberOfInstances"`
|
||||
Users map[string]userFlags `json:"users"`
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package spec
|
|||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"database/sql"
|
||||
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ type Config struct {
|
|||
DebugLogging bool `name:"debug_logging" default:"true"`
|
||||
EnableDBAccess bool `name:"enable_database_access" default:"true"`
|
||||
EnableTeamsAPI bool `name:"enable_teams_api" default:"true"`
|
||||
EnableLoadBalancer bool `name:"enable_load_balancer" default:"true"`
|
||||
MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"`
|
||||
ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"`
|
||||
Workers uint32 `name:"workers" default:"4"`
|
||||
|
|
|
|||
|
|
@ -2,10 +2,10 @@ package constants
|
|||
|
||||
// Names and values in Kubernetes annotation for services, statefulsets and volumes
|
||||
const (
|
||||
ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname"
|
||||
ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout"
|
||||
ElbTimeoutAnnotationValue = "3600"
|
||||
KubeIAmAnnotation = "iam.amazonaws.com/role"
|
||||
VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by"
|
||||
ServiceMetadataAnnotationFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}`
|
||||
ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname"
|
||||
ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout"
|
||||
ElbTimeoutAnnotationValue = "3600"
|
||||
KubeIAmAnnotation = "iam.amazonaws.com/role"
|
||||
VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by"
|
||||
ServiceMetadataAnnotationReplaceFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}`
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue