From 83ddd5c85bb592322014e041241dc4d172a0c736 Mon Sep 17 00:00:00 2001 From: Rafia Sabih Date: Fri, 28 Aug 2020 14:55:49 +0200 Subject: [PATCH] Add new pooler service for replica --- manifests/complete-postgres-manifest.yaml | 6 +- pkg/cluster/cluster.go | 5 +- pkg/cluster/k8sres.go | 84 +++++++++++++---------- pkg/cluster/resources.go | 22 +++++- 4 files changed, 72 insertions(+), 45 deletions(-) diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index be2250ff9..24c56f565 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -1,7 +1,7 @@ apiVersion: "acid.zalan.do/v1" kind: postgresql metadata: - name: acid-test-cluster2 + name: acid-test-cluster # labels: # environment: demo # annotations: @@ -18,8 +18,8 @@ spec: - createdb enableMasterLoadBalancer: false enableReplicaLoadBalancer: false - #enableConnectionPooler: true # not needed when connectionPooler section is present (see below) - enableReplicaConnectionPooler: true # set to enable connectionPooler for replica endpoints + enableConnectionPooler: true # not needed when connectionPooler section is present (see below) + #enableReplicaConnectionPooler: true # set to enable connectionPooler for replica endpoints allowedSourceRanges: # load balancers' source ranges for both master and replica services - 127.0.0.1/32 databases: diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 51c5d3809..ee305c1b0 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -55,8 +55,9 @@ type Config struct { // K8S objects that are belongs to a connection pooler type ConnectionPoolerObjects struct { - Deployment *appsv1.Deployment - Service *v1.Service + Deployment *appsv1.Deployment + Service *v1.Service + ReplService *v1.Service // It could happen that a connection pooler was enabled, but the operator // was not able to properly process a corresponding event or was restarted. diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index c6743d553..d13d1889f 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -2088,10 +2088,6 @@ func (c *Cluster) getConnectionPoolerEnvVars(spec *acidv1.PostgresSpec) []v1.Env Name: "CONNECTION_POOLER_MASTER_PORT", Value: fmt.Sprint(pgPort), }, - { - Name: "CONNECTION_POOLER_REPLICA_PORT", - Value: fmt.Sprint(5433), - }, { Name: "CONNECTION_POOLER_MODE", Value: effectiveMode, @@ -2308,41 +2304,19 @@ func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1 if spec.ConnectionPooler == nil { spec.ConnectionPooler = &acidv1.ConnectionPooler{} } - var serviceSpec = v1.ServiceSpec{} - if *spec.EnableReplicaConnectionPooler == false { - serviceSpec = v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Name: c.connectionPoolerName(), - Port: pgPort, - TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, - }, + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: c.connectionPoolerName(), + Port: pgPort, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, }, - Type: v1.ServiceTypeClusterIP, - Selector: map[string]string{ - "connection-pooler": c.connectionPoolerName(), - }, - } - } else { - serviceSpec = v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Name: c.connectionPoolerName(), - Port: pgPort, - TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, - }, - { - Name: c.connectionPoolerName() + "-repl", - Port: 5433, - TargetPort: intstr.IntOrString{StrVal: c.servicePort(Replica)}, - }, - }, - Type: v1.ServiceTypeClusterIP, - Selector: map[string]string{ - "connection-pooler": c.connectionPoolerName(), - }, - } + }, + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "connection-pooler-repl": c.connectionPoolerName(), + }, } service := &v1.Service{ @@ -2365,6 +2339,42 @@ func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1 return service } +func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service { + + replicaserviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: c.connectionPoolerName() + "-repl", + Port: pgPort, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(Replica)}, + }, + }, + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "connection-pooler-repl": c.connectionPoolerName() + "-repl", + }, + } + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connectionPoolerName() + "-repl", + Namespace: c.Namespace, + Labels: c.connectionPoolerLabelsSelector().MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this service, but there is a hope that this object will + // be garbage collected if something went wrong and operator didn't + // deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: replicaserviceSpec, + } + + return service +} + func ensurePath(file string, defaultDir string, defaultFile string) string { if file == "" { return path.Join(defaultDir, defaultFile) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index a9d13c124..160e42548 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -152,11 +152,27 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo if err != nil { return nil, err } + if c.Spec.EnableReplicaConnectionPooler != nil && *c.Spec.EnableReplicaConnectionPooler == true { + replServiceSpec := c.generateReplicaConnectionPoolerService(&c.Spec) + replService, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(context.TODO(), replServiceSpec, metav1.CreateOptions{}) - c.ConnectionPooler = &ConnectionPoolerObjects{ - Deployment: deployment, - Service: service, + if err != nil { + return nil, err + } + c.ConnectionPooler = &ConnectionPoolerObjects{ + Deployment: deployment, + Service: service, + ReplService: replService, + } + } else { + c.ConnectionPooler = &ConnectionPoolerObjects{ + Deployment: deployment, + Service: service, + } } + c.logger.Debugf("created new connection pooler %q, uid: %q", util.NameFromMeta(deployment.ObjectMeta), deployment.UID)