Add new pooler service for replica

This commit is contained in:
Rafia Sabih 2020-08-28 14:55:49 +02:00
parent 3a906aba93
commit 83ddd5c85b
4 changed files with 72 additions and 45 deletions

View File

@ -1,7 +1,7 @@
apiVersion: "acid.zalan.do/v1"
kind: postgresql
metadata:
name: acid-test-cluster2
name: acid-test-cluster
# labels:
# environment: demo
# annotations:
@ -18,8 +18,8 @@ spec:
- createdb
enableMasterLoadBalancer: false
enableReplicaLoadBalancer: false
#enableConnectionPooler: true # not needed when connectionPooler section is present (see below)
enableReplicaConnectionPooler: true # set to enable connectionPooler for replica endpoints
enableConnectionPooler: true # not needed when connectionPooler section is present (see below)
#enableReplicaConnectionPooler: true # set to enable connectionPooler for replica endpoints
allowedSourceRanges: # load balancers' source ranges for both master and replica services
- 127.0.0.1/32
databases:

View File

@ -55,8 +55,9 @@ type Config struct {
// K8S objects that are belongs to a connection pooler
type ConnectionPoolerObjects struct {
Deployment *appsv1.Deployment
Service *v1.Service
Deployment *appsv1.Deployment
Service *v1.Service
ReplService *v1.Service
// It could happen that a connection pooler was enabled, but the operator
// was not able to properly process a corresponding event or was restarted.

View File

@ -2088,10 +2088,6 @@ func (c *Cluster) getConnectionPoolerEnvVars(spec *acidv1.PostgresSpec) []v1.Env
Name: "CONNECTION_POOLER_MASTER_PORT",
Value: fmt.Sprint(pgPort),
},
{
Name: "CONNECTION_POOLER_REPLICA_PORT",
Value: fmt.Sprint(5433),
},
{
Name: "CONNECTION_POOLER_MODE",
Value: effectiveMode,
@ -2308,41 +2304,19 @@ func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1
if spec.ConnectionPooler == nil {
spec.ConnectionPooler = &acidv1.ConnectionPooler{}
}
var serviceSpec = v1.ServiceSpec{}
if *spec.EnableReplicaConnectionPooler == false {
serviceSpec = v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: c.connectionPoolerName(),
Port: pgPort,
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)},
},
serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: c.connectionPoolerName(),
Port: pgPort,
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)},
},
Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{
"connection-pooler": c.connectionPoolerName(),
},
}
} else {
serviceSpec = v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: c.connectionPoolerName(),
Port: pgPort,
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)},
},
{
Name: c.connectionPoolerName() + "-repl",
Port: 5433,
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Replica)},
},
},
Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{
"connection-pooler": c.connectionPoolerName(),
},
}
},
Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{
"connection-pooler-repl": c.connectionPoolerName(),
},
}
service := &v1.Service{
@ -2365,6 +2339,42 @@ func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1
return service
}
func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service {
replicaserviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: c.connectionPoolerName() + "-repl",
Port: pgPort,
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Replica)},
},
},
Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{
"connection-pooler-repl": c.connectionPoolerName() + "-repl",
},
}
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: c.connectionPoolerName() + "-repl",
Namespace: c.Namespace,
Labels: c.connectionPoolerLabelsSelector().MatchLabels,
Annotations: map[string]string{},
// make StatefulSet object its owner to represent the dependency.
// By itself StatefulSet is being deleted with "Orphaned"
// propagation policy, which means that it's deletion will not
// clean up this service, but there is a hope that this object will
// be garbage collected if something went wrong and operator didn't
// deleted it.
OwnerReferences: c.ownerReferences(),
},
Spec: replicaserviceSpec,
}
return service
}
func ensurePath(file string, defaultDir string, defaultFile string) string {
if file == "" {
return path.Join(defaultDir, defaultFile)

View File

@ -152,11 +152,27 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
if err != nil {
return nil, err
}
if c.Spec.EnableReplicaConnectionPooler != nil && *c.Spec.EnableReplicaConnectionPooler == true {
replServiceSpec := c.generateReplicaConnectionPoolerService(&c.Spec)
replService, err := c.KubeClient.
Services(serviceSpec.Namespace).
Create(context.TODO(), replServiceSpec, metav1.CreateOptions{})
c.ConnectionPooler = &ConnectionPoolerObjects{
Deployment: deployment,
Service: service,
if err != nil {
return nil, err
}
c.ConnectionPooler = &ConnectionPoolerObjects{
Deployment: deployment,
Service: service,
ReplService: replService,
}
} else {
c.ConnectionPooler = &ConnectionPoolerObjects{
Deployment: deployment,
Service: service,
}
}
c.logger.Debugf("created new connection pooler %q, uid: %q",
util.NameFromMeta(deployment.ObjectMeta), deployment.UID)