Enable connection pooler for replica

- Refactor code for connection pooler deployment and services
- Refactor sync code for connection pooler
- Rename EnableConnectionPooler to EnableMasterConnectionPooler
- Update yamls and tests
This commit is contained in:
Rafia Sabih 2020-09-02 13:46:36 +02:00
parent 83ddd5c85b
commit fb49376085
19 changed files with 296 additions and 161 deletions

View File

@ -185,8 +185,10 @@ spec:
# Note: usernames specified here as database owners must be declared in the users key of the spec key. # Note: usernames specified here as database owners must be declared in the users key of the spec key.
dockerImage: dockerImage:
type: string type: string
enableConnectionPooler: enableMasterConnectionPooler:
type: boolean type: boolean
enableReplicaConnectionPooler:
type: boolean
enableLogicalBackup: enableLogicalBackup:
type: boolean type: boolean
enableMasterLoadBalancer: enableMasterLoadBalancer:

View File

@ -140,7 +140,7 @@ These parameters are grouped directly under the `spec` key in the manifest.
is `false`, then no volume will be mounted no matter how operator was is `false`, then no volume will be mounted no matter how operator was
configured (so you can override the operator configuration). Optional. configured (so you can override the operator configuration). Optional.
* **enableConnectionPooler** * **enableMasterConnectionPooler**
Tells the operator to create a connection pooler with a database. If this Tells the operator to create a connection pooler with a database. If this
field is true, a connection pooler deployment will be created even if field is true, a connection pooler deployment will be created even if
`connectionPooler` section is empty. Optional, not set by default. `connectionPooler` section is empty. Optional, not set by default.
@ -387,7 +387,7 @@ CPU and memory limits for the sidecar container.
Parameters are grouped under the `connectionPooler` top-level key and specify Parameters are grouped under the `connectionPooler` top-level key and specify
configuration for connection pooler. If this section is not empty, a connection configuration for connection pooler. If this section is not empty, a connection
pooler will be created for a database even if `enableConnectionPooler` is not pooler will be created for a database even if `enableMasterConnectionPooler` is not
present. present.
* **numberOfInstances** * **numberOfInstances**

View File

@ -736,7 +736,7 @@ manifest:
```yaml ```yaml
spec: spec:
enableConnectionPooler: true enableMasterConnectionPooler: true
``` ```
This will tell the operator to create a connection pooler with default This will tell the operator to create a connection pooler with default
@ -772,7 +772,7 @@ spec:
memory: 100Mi memory: 100Mi
``` ```
The `enableConnectionPooler` flag is not required when the `connectionPooler` The `enableMasterConnectionPooler` flag is not required when the `connectionPooler`
section is present in the manifest. But, it can be used to disable/remove the section is present in the manifest. But, it can be used to disable/remove the
pooler while keeping its configuration. pooler while keeping its configuration.

View File

@ -99,7 +99,7 @@ class EndToEndTestCase(unittest.TestCase):
'postgresqls', 'acid-minimal-cluster', 'postgresqls', 'acid-minimal-cluster',
{ {
'spec': { 'spec': {
'enableConnectionPooler': True, 'enableMasterConnectionPooler': True,
} }
}) })
k8s.wait_for_pod_start(pod_selector) k8s.wait_for_pod_start(pod_selector)
@ -141,7 +141,7 @@ class EndToEndTestCase(unittest.TestCase):
'postgresqls', 'acid-minimal-cluster', 'postgresqls', 'acid-minimal-cluster',
{ {
'spec': { 'spec': {
'enableConnectionPooler': False, 'enableMasterConnectionPooler': False,
} }
}) })
k8s.wait_for_pods_to_stop(pod_selector) k8s.wait_for_pods_to_stop(pod_selector)

View File

@ -18,8 +18,8 @@ spec:
- createdb - createdb
enableMasterLoadBalancer: false enableMasterLoadBalancer: false
enableReplicaLoadBalancer: false enableReplicaLoadBalancer: false
enableConnectionPooler: true # not needed when connectionPooler section is present (see below) enableMasterConnectionPooler: true # not needed when connectionPooler section is present (see below)
#enableReplicaConnectionPooler: true # set to enable connectionPooler for replica endpoints enableReplicaConnectionPooler: true # set to enable connectionPooler for replica endpoints
allowedSourceRanges: # load balancers' source ranges for both master and replica services allowedSourceRanges: # load balancers' source ranges for both master and replica services
- 127.0.0.1/32 - 127.0.0.1/32
databases: databases:

View File

@ -181,8 +181,10 @@ spec:
# Note: usernames specified here as database owners must be declared in the users key of the spec key. # Note: usernames specified here as database owners must be declared in the users key of the spec key.
dockerImage: dockerImage:
type: string type: string
enableConnectionPooler: enableMasterConnectionPooler:
type: boolean type: boolean
enableReplicaConnectionPooler:
type: boolean
enableLogicalBackup: enableLogicalBackup:
type: boolean type: boolean
enableMasterLoadBalancer: enableMasterLoadBalancer:

View File

@ -259,7 +259,7 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
"dockerImage": { "dockerImage": {
Type: "string", Type: "string",
}, },
"enableConnectionPooler": { "enableMasterConnectionPooler": {
Type: "boolean", Type: "boolean",
}, },
"enableReplicaConnectionPooler": { "enableReplicaConnectionPooler": {

View File

@ -29,7 +29,7 @@ type PostgresSpec struct {
Patroni `json:"patroni,omitempty"` Patroni `json:"patroni,omitempty"`
Resources `json:"resources,omitempty"` Resources `json:"resources,omitempty"`
EnableConnectionPooler *bool `json:"enableConnectionPooler,omitempty"` EnableMasterConnectionPooler *bool `json:"enableMasterConnectionPooler,omitempty"`
EnableReplicaConnectionPooler *bool `json:"enableReplicaConnectionPooler,omitempty"` EnableReplicaConnectionPooler *bool `json:"enableReplicaConnectionPooler,omitempty"`
ConnectionPooler *ConnectionPooler `json:"connectionPooler,omitempty"` ConnectionPooler *ConnectionPooler `json:"connectionPooler,omitempty"`

View File

@ -55,9 +55,10 @@ type Config struct {
// K8S objects that are belongs to a connection pooler // K8S objects that are belongs to a connection pooler
type ConnectionPoolerObjects struct { type ConnectionPoolerObjects struct {
Deployment *appsv1.Deployment Deployment *appsv1.Deployment
Service *v1.Service ReplDeployment *appsv1.Deployment
ReplService *v1.Service Service *v1.Service
ReplService *v1.Service
// It could happen that a connection pooler was enabled, but the operator // It could happen that a connection pooler was enabled, but the operator
// was not able to properly process a corresponding event or was restarted. // was not able to properly process a corresponding event or was restarted.

View File

@ -719,7 +719,7 @@ func TestInitSystemUsers(t *testing.T) {
} }
// cluster with connection pooler // cluster with connection pooler
cl.Spec.EnableConnectionPooler = boolToPointer(true) cl.Spec.EnableMasterConnectionPooler = boolToPointer(true)
cl.initSystemUsers() cl.initSystemUsers()
if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; !exist { if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; !exist {
t.Errorf("%s, connection pooler user is not present", testName) t.Errorf("%s, connection pooler user is not present", testName)

View File

@ -462,7 +462,7 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi
} }
// Creates a connection pool credentials lookup function in every database to // Creates a connection pool credentials lookup function in every database to
// perform remote authentification. // perform remote authentication.
func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
var stmtBytes bytes.Buffer var stmtBytes bytes.Buffer
c.logger.Info("Installing lookup function") c.logger.Info("Installing lookup function")

View File

@ -75,8 +75,12 @@ func (c *Cluster) statefulSetName() string {
return c.Name return c.Name
} }
func (c *Cluster) connectionPoolerName() string { func (c *Cluster) connectionPoolerName(role PostgresRole) string {
return c.Name + "-pooler" name := c.Name + "-pooler"
if role == Replica {
name = name + "-repl"
}
return name
} }
func (c *Cluster) endpointName(role PostgresRole) string { func (c *Cluster) endpointName(role PostgresRole) string {
@ -2085,7 +2089,7 @@ func (c *Cluster) getConnectionPoolerEnvVars(spec *acidv1.PostgresSpec) []v1.Env
return []v1.EnvVar{ return []v1.EnvVar{
{ {
Name: "CONNECTION_POOLER_MASTER_PORT", Name: "CONNECTION_POOLER_PORT",
Value: fmt.Sprint(pgPort), Value: fmt.Sprint(pgPort),
}, },
{ {
@ -2115,7 +2119,7 @@ func (c *Cluster) getConnectionPoolerEnvVars(spec *acidv1.PostgresSpec) []v1.Env
} }
} }
func (c *Cluster) generateConnectionPoolerPodTemplate(spec *acidv1.PostgresSpec) ( func (c *Cluster) generateConnectionPoolerPodTemplate(spec *acidv1.PostgresSpec, role PostgresRole) (
*v1.PodTemplateSpec, error) { *v1.PodTemplateSpec, error) {
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
@ -2151,11 +2155,11 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(spec *acidv1.PostgresSpec)
envVars := []v1.EnvVar{ envVars := []v1.EnvVar{
{ {
Name: "PGHOST", Name: "PGHOST",
Value: c.serviceAddress(Master), Value: c.serviceAddress(role),
}, },
{ {
Name: "PGPORT", Name: "PGPORT",
Value: c.servicePort(Master), Value: c.servicePort(role),
}, },
{ {
Name: "PGUSER", Name: "PGUSER",
@ -2237,7 +2241,7 @@ func (c *Cluster) ownerReferences() []metav1.OwnerReference {
} }
} }
func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec) ( func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec, role PostgresRole) (
*appsv1.Deployment, error) { *appsv1.Deployment, error) {
// there are two ways to enable connection pooler, either to specify a // there are two ways to enable connection pooler, either to specify a
@ -2250,7 +2254,7 @@ func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec)
spec.ConnectionPooler = &acidv1.ConnectionPooler{} spec.ConnectionPooler = &acidv1.ConnectionPooler{}
} }
podTemplate, err := c.generateConnectionPoolerPodTemplate(spec) podTemplate, err := c.generateConnectionPoolerPodTemplate(spec, role)
numberOfInstances := spec.ConnectionPooler.NumberOfInstances numberOfInstances := spec.ConnectionPooler.NumberOfInstances
if numberOfInstances == nil { if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32( numberOfInstances = util.CoalesceInt32(
@ -2269,9 +2273,12 @@ func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec)
return nil, err return nil, err
} }
var name string
name = c.connectionPoolerName(role)
deployment := &appsv1.Deployment{ deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.connectionPoolerName(), Name: name,
Namespace: c.Namespace, Namespace: c.Namespace,
Labels: c.connectionPoolerLabelsSelector().MatchLabels, Labels: c.connectionPoolerLabelsSelector().MatchLabels,
Annotations: map[string]string{}, Annotations: map[string]string{},
@ -2293,7 +2300,53 @@ func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec)
return deployment, nil return deployment, nil
} }
func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service { /* func (c *Cluster) generateReplicaConnectionPoolerDeployment(spec *acidv1.PostgresSpec) (
*appsv1.Deployment, error) {
podTemplate, err := c.generateConnectionPoolerPodTemplate(spec, Replica)
numberOfInstances := spec.ConnectionPooler.NumberOfInstances
if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32(
c.OpConfig.ConnectionPooler.NumberOfInstances,
k8sutil.Int32ToPointer(1))
}
if *numberOfInstances < constants.ConnectionPoolerMinInstances {
msg := "Adjusted number of connection pooler instances from %d to %d"
c.logger.Warningf(msg, numberOfInstances, constants.ConnectionPoolerMinInstances)
*numberOfInstances = constants.ConnectionPoolerMinInstances
}
if err != nil {
return nil, err
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: c.connectionPoolerName() + "-repl",
Namespace: c.Namespace,
Labels: c.connectionPoolerLabelsSelector().MatchLabels,
Annotations: map[string]string{},
// make StatefulSet object its owner to represent the dependency.
// By itself StatefulSet is being deleted with "Orphaned"
// propagation policy, which means that it's deletion will not
// clean up this deployment, but there is a hope that this object
// will be garbage collected if something went wrong and operator
// didn't deleted it.
OwnerReferences: c.ownerReferences(),
},
Spec: appsv1.DeploymentSpec{
Replicas: numberOfInstances,
Selector: c.connectionPoolerLabelsSelector(),
Template: *podTemplate,
},
}
return deployment, nil
} */
func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec, role PostgresRole) *v1.Service {
// there are two ways to enable connection pooler, either to specify a // there are two ways to enable connection pooler, either to specify a
// connectionPooler section or enableConnectionPooler. In the second case // connectionPooler section or enableConnectionPooler. In the second case
@ -2304,24 +2357,26 @@ func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1
if spec.ConnectionPooler == nil { if spec.ConnectionPooler == nil {
spec.ConnectionPooler = &acidv1.ConnectionPooler{} spec.ConnectionPooler = &acidv1.ConnectionPooler{}
} }
name := c.connectionPoolerName(role)
serviceSpec := v1.ServiceSpec{ serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{ Ports: []v1.ServicePort{
{ {
Name: c.connectionPoolerName(), Name: name,
Port: pgPort, Port: pgPort,
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, TargetPort: intstr.IntOrString{StrVal: c.servicePort(role)},
}, },
}, },
Type: v1.ServiceTypeClusterIP, Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{
"connection-pooler-repl": c.connectionPoolerName(),
},
} }
if role == Replica {
serviceSpec.Selector = c.roleLabelsSet(false, Replica)
} else {
serviceSpec.Selector = map[string]string{"connection-pooler": name}
}
service := &v1.Service{ service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.connectionPoolerName(), Name: name,
Namespace: c.Namespace, Namespace: c.Namespace,
Labels: c.connectionPoolerLabelsSelector().MatchLabels, Labels: c.connectionPoolerLabelsSelector().MatchLabels,
Annotations: map[string]string{}, Annotations: map[string]string{},
@ -2339,7 +2394,7 @@ func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1
return service return service
} }
func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service { /* func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service {
replicaserviceSpec := v1.ServiceSpec{ replicaserviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{ Ports: []v1.ServicePort{
@ -2349,10 +2404,8 @@ func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSp
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Replica)}, TargetPort: intstr.IntOrString{StrVal: c.servicePort(Replica)},
}, },
}, },
Type: v1.ServiceTypeClusterIP, Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{ Selector: c.roleLabelsSet(false, Replica),
"connection-pooler-repl": c.connectionPoolerName() + "-repl",
},
} }
service := &v1.Service{ service := &v1.Service{
@ -2373,7 +2426,7 @@ func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSp
} }
return service return service
} } */
func ensurePath(file string, defaultDir string, defaultFile string) string { func ensurePath(file string, defaultDir string, defaultFile string) string {
if file == "" { if file == "" {

View File

@ -1090,7 +1090,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
}, },
} }
for _, tt := range tests { for _, tt := range tests {
podSpec, err := tt.cluster.generateConnectionPoolerPodTemplate(tt.spec) podSpec, err := tt.cluster.generateConnectionPoolerPodTemplate(tt.spec, Master)
if err != tt.expected && err.Error() != tt.expected.Error() { if err != tt.expected && err.Error() != tt.expected.Error() {
t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v",
@ -1192,7 +1192,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) {
}, },
} }
for _, tt := range tests { for _, tt := range tests {
deployment, err := tt.cluster.generateConnectionPoolerDeployment(tt.spec) deployment, err := tt.cluster.generateConnectionPoolerDeployment(tt.spec, Master)
if err != tt.expected && err.Error() != tt.expected.Error() { if err != tt.expected && err.Error() != tt.expected.Error() {
t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v",
@ -1221,9 +1221,9 @@ func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error {
func testServiceSelector(cluster *Cluster, service *v1.Service) error { func testServiceSelector(cluster *Cluster, service *v1.Service) error {
selector := service.Spec.Selector selector := service.Spec.Selector
if selector["connection-pooler"] != cluster.connectionPoolerName() { if selector["connection-pooler"] != cluster.connectionPoolerName(Master) {
return fmt.Errorf("Selector is incorrect, got %s, expected %s", return fmt.Errorf("Selector is incorrect, got %s, expected %s",
selector["connection-pooler"], cluster.connectionPoolerName()) selector["connection-pooler"], cluster.connectionPoolerName(Master))
} }
return nil return nil
@ -1289,7 +1289,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) {
}, },
} }
for _, tt := range tests { for _, tt := range tests {
service := tt.cluster.generateConnectionPoolerService(tt.spec) service := tt.cluster.generateConnectionPoolerService(tt.spec, Master)
if err := tt.check(cluster, service); err != nil { if err := tt.check(cluster, service); err != nil {
t.Errorf("%s [%s]: Service spec is incorrect, %+v", t.Errorf("%s [%s]: Service spec is incorrect, %+v",

View File

@ -126,56 +126,74 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
msg = "could not prepare database for connection pooler: %v" msg = "could not prepare database for connection pooler: %v"
return nil, fmt.Errorf(msg, err) return nil, fmt.Errorf(msg, err)
} }
if c.Spec.EnableMasterConnectionPooler != nil || c.ConnectionPooler != nil {
deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Master)
if err != nil {
msg = "could not generate deployment for connection pooler: %v"
return nil, fmt.Errorf(msg, err)
}
deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec) // client-go does retry 10 times (with NoBackoff by default) when the API
if err != nil { // believe a request can be retried and returns Retry-After header. This
msg = "could not generate deployment for connection pooler: %v" // should be good enough to not think about it here.
return nil, fmt.Errorf(msg, err) deployment, err := c.KubeClient.
} Deployments(deploymentSpec.Namespace).
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
// client-go does retry 10 times (with NoBackoff by default) when the API if err != nil {
// believe a request can be retried and returns Retry-After header. This return nil, err
// should be good enough to not think about it here. }
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
if err != nil { serviceSpec := c.generateConnectionPoolerService(&c.Spec, Master)
return nil, err service, err := c.KubeClient.
}
serviceSpec := c.generateConnectionPoolerService(&c.Spec)
service, err := c.KubeClient.
Services(serviceSpec.Namespace).
Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
if err != nil {
return nil, err
}
if c.Spec.EnableReplicaConnectionPooler != nil && *c.Spec.EnableReplicaConnectionPooler == true {
replServiceSpec := c.generateReplicaConnectionPoolerService(&c.Spec)
replService, err := c.KubeClient.
Services(serviceSpec.Namespace). Services(serviceSpec.Namespace).
Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
if err != nil {
return nil, err
}
c.ConnectionPooler = &ConnectionPoolerObjects{
Deployment: deployment,
Service: service,
}
c.logger.Debugf("created new connection pooler %q, uid: %q",
util.NameFromMeta(deployment.ObjectMeta), deployment.UID)
}
if c.Spec.EnableReplicaConnectionPooler != nil && *c.Spec.EnableReplicaConnectionPooler == true {
repldeploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Replica)
if err != nil {
msg = "could not generate deployment for connection pooler: %v"
return nil, fmt.Errorf(msg, err)
}
// client-go does retry 10 times (with NoBackoff by default) when the API
// believe a request can be retried and returns Retry-After header. This
// should be good enough to not think about it here.
repldeployment, err := c.KubeClient.
Deployments(repldeploymentSpec.Namespace).
Create(context.TODO(), repldeploymentSpec, metav1.CreateOptions{})
if err != nil {
return nil, err
}
replServiceSpec := c.generateConnectionPoolerService(&c.Spec, Replica)
replService, err := c.KubeClient.
Services(replServiceSpec.Namespace).
Create(context.TODO(), replServiceSpec, metav1.CreateOptions{}) Create(context.TODO(), replServiceSpec, metav1.CreateOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.ConnectionPooler = &ConnectionPoolerObjects{ c.ConnectionPooler = &ConnectionPoolerObjects{
Deployment: deployment, ReplDeployment: repldeployment,
Service: service, ReplService: replService,
ReplService: replService,
}
} else {
c.ConnectionPooler = &ConnectionPoolerObjects{
Deployment: deployment,
Service: service,
} }
c.logger.Debugf("created new connection pooler for replica %q, uid: %q",
util.NameFromMeta(repldeployment.ObjectMeta), repldeployment.UID)
} }
c.logger.Debugf("created new connection pooler %q, uid: %q",
util.NameFromMeta(deployment.ObjectMeta), deployment.UID)
return c.ConnectionPooler, nil return c.ConnectionPooler, nil
} }
@ -192,7 +210,7 @@ func (c *Cluster) deleteConnectionPooler() (err error) {
// Clean up the deployment object. If deployment resource we've remembered // Clean up the deployment object. If deployment resource we've remembered
// is somehow empty, try to delete based on what would we generate // is somehow empty, try to delete based on what would we generate
deploymentName := c.connectionPoolerName() deploymentName := c.connectionPoolerName(Master)
deployment := c.ConnectionPooler.Deployment deployment := c.ConnectionPooler.Deployment
if deployment != nil { if deployment != nil {
@ -217,7 +235,7 @@ func (c *Cluster) deleteConnectionPooler() (err error) {
// Repeat the same for the service object // Repeat the same for the service object
service := c.ConnectionPooler.Service service := c.ConnectionPooler.Service
serviceName := c.connectionPoolerName() serviceName := c.connectionPoolerName(Master)
if service != nil { if service != nil {
serviceName = service.Name serviceName = service.Name

View File

@ -97,7 +97,7 @@ func TestNeedConnectionPooler(t *testing.T) {
} }
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true), EnableMasterConnectionPooler: boolToPointer(true),
} }
if !cluster.needConnectionPooler() { if !cluster.needConnectionPooler() {
@ -106,8 +106,8 @@ func TestNeedConnectionPooler(t *testing.T) {
} }
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(false), EnableMasterConnectionPooler: boolToPointer(false),
ConnectionPooler: &acidv1.ConnectionPooler{}, ConnectionPooler: &acidv1.ConnectionPooler{},
} }
if cluster.needConnectionPooler() { if cluster.needConnectionPooler() {
@ -116,8 +116,8 @@ func TestNeedConnectionPooler(t *testing.T) {
} }
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true), EnableMasterConnectionPooler: boolToPointer(true),
ConnectionPooler: &acidv1.ConnectionPooler{}, ConnectionPooler: &acidv1.ConnectionPooler{},
} }
if !cluster.needConnectionPooler() { if !cluster.needConnectionPooler() {

View File

@ -922,34 +922,18 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) ( func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) (
SyncReason, error) { SyncReason, error) {
deployment, err := c.KubeClient. masterdeployment, err := c.checkAndCreateConnectionPoolerDeployment(Master, newSpec)
Deployments(c.Namespace). if err != nil {
Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{}) msg := "could not get connection pooler deployment to sync: %v"
return NoSync, fmt.Errorf(msg, err)
if err != nil && k8sutil.ResourceNotFound(err) { }
msg := "Deployment %s for connection pooler synchronization is not found, create it" replicadeployment, err := c.checkAndCreateConnectionPoolerDeployment(Replica, newSpec)
c.logger.Warningf(msg, c.connectionPoolerName()) if err != nil {
deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
if err != nil {
msg = "could not generate deployment for connection pooler: %v"
return NoSync, fmt.Errorf(msg, err)
}
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
if err != nil {
return NoSync, err
}
c.ConnectionPooler.Deployment = deployment
} else if err != nil {
msg := "could not get connection pooler deployment to sync: %v" msg := "could not get connection pooler deployment to sync: %v"
return NoSync, fmt.Errorf(msg, err) return NoSync, fmt.Errorf(msg, err)
} else { } else {
c.ConnectionPooler.Deployment = deployment c.ConnectionPooler.Deployment = masterdeployment
c.ConnectionPooler.ReplDeployment = replicadeployment
// actual synchronization // actual synchronization
oldConnectionPooler := oldSpec.Spec.ConnectionPooler oldConnectionPooler := oldSpec.Spec.ConnectionPooler
@ -968,32 +952,28 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
newConnectionPooler = &acidv1.ConnectionPooler{} newConnectionPooler = &acidv1.ConnectionPooler{}
} }
c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler) c.logger.Infof("Old: %+v, New: %+v", oldConnectionPooler, newConnectionPooler)
specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler)
defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment) defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, masterdeployment)
reason := append(specReason, defaultsReason...) reason := append(specReason, defaultsReason...)
if specSync || defaultsSync { if specSync || defaultsSync {
c.logger.Infof("Update connection pooler deployment %s, reason: %+v", newmasterdeployment, err := c.UpdateConnectionPoolerDeploymentSub(Master, reason[:], newSpec)
c.connectionPoolerName(), reason)
newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
if err != nil {
msg := "could not generate deployment for connection pooler: %v"
return reason, fmt.Errorf(msg, err)
}
oldDeploymentSpec := c.ConnectionPooler.Deployment
deployment, err := c.updateConnectionPoolerDeployment(
oldDeploymentSpec,
newDeploymentSpec)
if err != nil { if err != nil {
return reason, err return reason, err
} }
c.ConnectionPooler.Deployment = newmasterdeployment
return reason, nil
}
c.ConnectionPooler.Deployment = deployment defaultsSync, defaultsReason = c.needSyncConnectionPoolerDefaults(newConnectionPooler, replicadeployment)
reason = append(specReason, defaultsReason...)
if specSync || defaultsSync {
newreplicadeployment, err := c.UpdateConnectionPoolerDeploymentSub(Replica, reason[:], newSpec)
if err != nil {
return reason, err
}
c.ConnectionPooler.Deployment = newreplicadeployment
return reason, nil return reason, nil
} }
} }
@ -1002,32 +982,95 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if newAnnotations != nil { if newAnnotations != nil {
c.updateConnectionPoolerAnnotations(newAnnotations) c.updateConnectionPoolerAnnotations(newAnnotations)
} }
masterservice, err := c.checkAndCreateConnectionPoolerService(Master, newSpec)
if err != nil {
msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err)
}
replicaservice, err := c.checkAndCreateConnectionPoolerService(Replica, newSpec)
if err != nil {
msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err)
} else {
// Service updates are not supported and probably not that useful anyway
c.ConnectionPooler.Service = masterservice
c.ConnectionPooler.ReplService = replicaservice
}
return NoSync, nil
}
func (c *Cluster) UpdateConnectionPoolerDeploymentSub(role PostgresRole, reason []string, newSpec *acidv1.Postgresql) (*appsv1.Deployment, error) {
c.logger.Infof("Update connection pooler deployment %s, reason: %+v",
c.connectionPoolerName(role), reason)
newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role)
if err != nil {
msg := "could not generate deployment for connection pooler: %v"
return nil, fmt.Errorf(msg, err)
}
oldDeploymentSpec := c.ConnectionPooler.Deployment
deployment, err := c.updateConnectionPoolerDeployment(
oldDeploymentSpec,
newDeploymentSpec)
if err != nil {
return nil, err
}
return deployment, nil
}
func (c *Cluster) checkAndCreateConnectionPoolerDeployment(role PostgresRole, newSpec *acidv1.Postgresql) (*appsv1.Deployment, error) {
deployment, err := c.KubeClient.
Deployments(c.Namespace).
Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{})
if err != nil && k8sutil.ResourceNotFound(err) {
msg := "Deployment %s for connection pooler synchronization is not found, create it"
c.logger.Warningf(msg, c.connectionPoolerName(role))
deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role)
if err != nil {
msg = "could not generate deployment for connection pooler: %v"
return nil, fmt.Errorf(msg, err)
}
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return deployment, nil
}
return deployment, nil
}
func (c *Cluster) checkAndCreateConnectionPoolerService(role PostgresRole, newSpec *acidv1.Postgresql) (*v1.Service, error) {
service, err := c.KubeClient. service, err := c.KubeClient.
Services(c.Namespace). Services(c.Namespace).
Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{}) Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{})
if err != nil && k8sutil.ResourceNotFound(err) { if err != nil && k8sutil.ResourceNotFound(err) {
msg := "Service %s for connection pooler synchronization is not found, create it" msg := "Service %s for connection pooler synchronization is not found, create it"
c.logger.Warningf(msg, c.connectionPoolerName()) c.logger.Warningf(msg, c.connectionPoolerName(role))
serviceSpec := c.generateConnectionPoolerService(&newSpec.Spec) serviceSpec := c.generateConnectionPoolerService(&newSpec.Spec, role)
service, err := c.KubeClient. service, err := c.KubeClient.
Services(serviceSpec.Namespace). Services(serviceSpec.Namespace).
Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
if err != nil { if err != nil {
return NoSync, err return nil, err
} }
c.ConnectionPooler.Service = service return service, nil
} else if err != nil {
msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err)
} else {
// Service updates are not supported and probably not that useful anyway
c.ConnectionPooler.Service = service
} }
return service, nil
return NoSync, nil
} }

View File

@ -139,7 +139,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
}, },
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true), EnableMasterConnectionPooler: boolToPointer(true),
}, },
}, },
cluster: clusterMissingObjects, cluster: clusterMissingObjects,
@ -232,14 +232,14 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
subTest: "there is no sync from nil to an empty spec", subTest: "there is no sync from nil to an empty spec",
oldSpec: &acidv1.Postgresql{ oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true), EnableMasterConnectionPooler: boolToPointer(true),
ConnectionPooler: nil, ConnectionPooler: nil,
}, },
}, },
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true), EnableMasterConnectionPooler: boolToPointer(true),
ConnectionPooler: &acidv1.ConnectionPooler{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
cluster: clusterMock, cluster: clusterMock,

View File

@ -424,7 +424,7 @@ func (c *Cluster) connectionPoolerLabelsSelector() *metav1.LabelSelector {
connectionPoolerLabels := labels.Set(map[string]string{}) connectionPoolerLabels := labels.Set(map[string]string{})
extraLabels := labels.Set(map[string]string{ extraLabels := labels.Set(map[string]string{
"connection-pooler": c.connectionPoolerName(), "connection-pooler": c.connectionPoolerName(Master),
"application": "db-connection-pooler", "application": "db-connection-pooler",
}) })
@ -520,11 +520,16 @@ func (c *Cluster) patroniKubernetesUseConfigMaps() bool {
} }
func (c *Cluster) needConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { func (c *Cluster) needConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
if spec.EnableConnectionPooler == nil { if spec.EnableMasterConnectionPooler != nil {
return *spec.EnableMasterConnectionPooler
} else if spec.EnableReplicaConnectionPooler != nil {
return *spec.EnableReplicaConnectionPooler
} else if spec.ConnectionPooler == nil {
return spec.ConnectionPooler != nil return spec.ConnectionPooler != nil
} else {
return *spec.EnableConnectionPooler
} }
// if the connectionPooler section is there, then we enable even though the
// flags are not there
return true
} }
func (c *Cluster) needConnectionPooler() bool { func (c *Cluster) needConnectionPooler() bool {

View File

@ -607,16 +607,27 @@ def update_postgresql(namespace: str, cluster: str):
spec['volume'] = {'size': size} spec['volume'] = {'size': size}
if 'enableConnectionPooler' in postgresql['spec']: if 'enableMasterConnectionPooler' in postgresql['spec']:
cp = postgresql['spec']['enableConnectionPooler'] cp = postgresql['spec']['enableMasterConnectionPooler']
if not cp: if not cp:
if 'enableConnectionPooler' in o['spec']: if 'enableMasterConnectionPooler' in o['spec']:
del o['spec']['enableConnectionPooler'] del o['spec']['enableMasterConnectionPooler']
else: else:
spec['enableConnectionPooler'] = True spec['enableMasterConnectionPooler'] = True
else: else:
if 'enableConnectionPooler' in o['spec']: if 'enableMasterConnectionPooler' in o['spec']:
del o['spec']['enableConnectionPooler'] del o['spec']['enableMasterConnectionPooler']
if 'enableReplicaConnectionPooler' in postgresql['spec']:
cp = postgresql['spec']['enableReplicaConnectionPooler']
if not cp:
if 'enableReplicaConnectionPooler' in o['spec']:
del o['spec']['enableReplicaConnectionPooler']
else:
spec['enableReplicaConnectionPooler'] = True
else:
if 'enableReplicaConnectionPooler' in o['spec']:
del o['spec']['enableReplicaConnectionPooler']
if 'enableReplicaLoadBalancer' in postgresql['spec']: if 'enableReplicaLoadBalancer' in postgresql['spec']:
rlb = postgresql['spec']['enableReplicaLoadBalancer'] rlb = postgresql['spec']['enableReplicaLoadBalancer']