Refactor needConnectionPooler

Have one unified function to tell if any connection pooler is required

Add a helper function to list the roles that require connection pooler,
helps in avoiding duplication of code
This commit is contained in:
Rafia Sabih 2020-09-21 16:40:40 +02:00
parent 7c9b459919
commit 2936ed0060
5 changed files with 96 additions and 156 deletions

View File

@ -392,8 +392,10 @@ CPU and memory limits for the sidecar container.
Parameters are grouped under the `connectionPooler` top-level key and specify Parameters are grouped under the `connectionPooler` top-level key and specify
configuration for connection pooler. If this section is not empty, a connection configuration for connection pooler. If this section is not empty, a connection
pooler will be created for a database even if `enableConnectionPooler` is not pooler will be created for master service only even if `enableConnectionPooler`
present. is not present. But if this section is present then it defines the configuration
for both master and replica pooler services (if `enableReplicaConnectionPooler`
is enabled).
* **numberOfInstances** * **numberOfInstances**
How many instances of connection pooler to create. How many instances of connection pooler to create.

View File

@ -55,10 +55,8 @@ type Config struct {
// K8S objects that are belongs to a connection pooler // K8S objects that are belongs to a connection pooler
type ConnectionPoolerObjects struct { type ConnectionPoolerObjects struct {
Deployment *appsv1.Deployment Deployment map[PostgresRole]*appsv1.Deployment
ReplDeployment *appsv1.Deployment Service map[PostgresRole]*v1.Service
Service *v1.Service
ReplService *v1.Service
// It could happen that a connection pooler was enabled, but the operator // It could happen that a connection pooler was enabled, but the operator
// was not able to properly process a corresponding event or was restarted. // was not able to properly process a corresponding event or was restarted.
@ -348,7 +346,8 @@ func (c *Cluster) Create() error {
// //
// Do not consider connection pooler as a strict requirement, and if // Do not consider connection pooler as a strict requirement, and if
// something fails, report warning // something fails, report warning
if c.needMasterConnectionPooler() || c.needReplicaConnectionPooler() { roles := c.RolesConnectionPooler()
for _, r := range roles {
if c.ConnectionPooler != nil { if c.ConnectionPooler != nil {
c.logger.Warning("Connection pooler already exists in the cluster") c.logger.Warning("Connection pooler already exists in the cluster")
return nil return nil
@ -359,7 +358,7 @@ func (c *Cluster) Create() error {
return nil return nil
} }
c.logger.Infof("connection pooler %q has been successfully created", c.logger.Infof("connection pooler %q has been successfully created",
util.NameFromMeta(connectionPooler.Deployment.ObjectMeta)) util.NameFromMeta(connectionPooler.Deployment[r].ObjectMeta))
} }
return nil return nil

View File

@ -126,9 +126,10 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
msg = "could not prepare database for connection pooler: %v" msg = "could not prepare database for connection pooler: %v"
return nil, fmt.Errorf(msg, err) return nil, fmt.Errorf(msg, err)
} }
if c.needMasterConnectionPooler() { if c.needConnectionPooler() {
roles := c.RolesConnectionPooler()
deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Master) for _, r := range roles {
deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, r)
if err != nil { if err != nil {
msg = "could not generate deployment for connection pooler: %v" msg = "could not generate deployment for connection pooler: %v"
return nil, fmt.Errorf(msg, err) return nil, fmt.Errorf(msg, err)
@ -145,7 +146,7 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
return nil, err return nil, err
} }
serviceSpec := c.generateConnectionPoolerService(&c.Spec, Master) serviceSpec := c.generateConnectionPoolerService(&c.Spec, r)
service, err := c.KubeClient. service, err := c.KubeClient.
Services(serviceSpec.Namespace). Services(serviceSpec.Namespace).
Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
@ -153,54 +154,12 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.ConnectionPooler = &ConnectionPoolerObjects{ c.ConnectionPooler.Deployment[r] = deployment
Deployment: deployment, c.ConnectionPooler.Service[r] = service
Service: service,
}
c.logger.Debugf("created new connection pooler %q, uid: %q", c.logger.Debugf("created new connection pooler %q, uid: %q",
util.NameFromMeta(deployment.ObjectMeta), deployment.UID) util.NameFromMeta(deployment.ObjectMeta), deployment.UID)
} }
if c.needReplicaConnectionPooler() {
repldeploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Replica)
if err != nil {
msg = "could not generate deployment for connection pooler: %v"
return nil, fmt.Errorf(msg, err)
}
// client-go does retry 10 times (with NoBackoff by default) when the API
// believe a request can be retried and returns Retry-After header. This
// should be good enough to not think about it here.
repldeployment, err := c.KubeClient.
Deployments(repldeploymentSpec.Namespace).
Create(context.TODO(), repldeploymentSpec, metav1.CreateOptions{})
if err != nil {
return nil, err
}
replServiceSpec := c.generateConnectionPoolerService(&c.Spec, Replica)
replService, err := c.KubeClient.
Services(replServiceSpec.Namespace).
Create(context.TODO(), replServiceSpec, metav1.CreateOptions{})
if err != nil {
return nil, err
}
if c.needMasterConnectionPooler() {
c.ConnectionPooler.ReplDeployment = repldeployment
c.ConnectionPooler.ReplService = replService
} else {
c.ConnectionPooler = &ConnectionPoolerObjects{
ReplDeployment: repldeployment,
ReplService: replService,
}
}
c.logger.Debugf("created new connection pooler for replica %q, uid: %q",
util.NameFromMeta(repldeployment.ObjectMeta), repldeployment.UID)
} }
return c.ConnectionPooler, nil return c.ConnectionPooler, nil
@ -221,12 +180,8 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) {
// is somehow empty, try to delete based on what would we generate // is somehow empty, try to delete based on what would we generate
var deploymentName string var deploymentName string
var deployment *appsv1.Deployment var deployment *appsv1.Deployment
deployment = c.ConnectionPooler.Deployment[role]
if role == Master {
deployment = c.ConnectionPooler.Deployment
} else {
deployment = c.ConnectionPooler.ReplDeployment
}
if deployment != nil { if deployment != nil {
deploymentName = deployment.Name deploymentName = deployment.Name
} }
@ -249,11 +204,8 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) {
// Repeat the same for the service object // Repeat the same for the service object
var service *v1.Service var service *v1.Service
if role == Master { service = c.ConnectionPooler.Service[role]
service = c.ConnectionPooler.Service
} else {
service = c.ConnectionPooler.ReplService
}
serviceName := c.connectionPoolerName(role) serviceName := c.connectionPoolerName(role)
if service != nil { if service != nil {
@ -909,9 +861,9 @@ func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget {
// Perform actual patching of a connection pooler deployment, assuming that all // Perform actual patching of a connection pooler deployment, assuming that all
// the check were already done before. // the check were already done before.
func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment, role PostgresRole) (*appsv1.Deployment, error) {
c.setProcessName("updating connection pooler") c.setProcessName("updating connection pooler")
if c.ConnectionPooler == nil || c.ConnectionPooler.Deployment == nil { if c.ConnectionPooler == nil || c.ConnectionPooler.Deployment[role] == nil {
return nil, fmt.Errorf("there is no connection pooler in the cluster") return nil, fmt.Errorf("there is no connection pooler in the cluster")
} }
@ -924,9 +876,9 @@ func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeploym
// worker at one time will try to update it chances of conflicts are // worker at one time will try to update it chances of conflicts are
// minimal. // minimal.
deployment, err := c.KubeClient. deployment, err := c.KubeClient.
Deployments(c.ConnectionPooler.Deployment.Namespace).Patch( Deployments(c.ConnectionPooler.Deployment[role].Namespace).Patch(
context.TODO(), context.TODO(),
c.ConnectionPooler.Deployment.Name, c.ConnectionPooler.Deployment[role].Name,
types.MergePatchType, types.MergePatchType,
patchData, patchData,
metav1.PatchOptions{}, metav1.PatchOptions{},
@ -935,21 +887,21 @@ func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeploym
return nil, fmt.Errorf("could not patch deployment: %v", err) return nil, fmt.Errorf("could not patch deployment: %v", err)
} }
c.ConnectionPooler.Deployment = deployment c.ConnectionPooler.Deployment[role] = deployment
return deployment, nil return deployment, nil
} }
//updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment //updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment
func (c *Cluster) updateConnectionPoolerAnnotations(annotations map[string]string) (*appsv1.Deployment, error) { func (c *Cluster) updateConnectionPoolerAnnotations(annotations map[string]string, role PostgresRole) (*appsv1.Deployment, error) {
c.logger.Debugf("updating connection pooler annotations") c.logger.Debugf("updating connection pooler annotations")
patchData, err := metaAnnotationsPatch(annotations) patchData, err := metaAnnotationsPatch(annotations)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not form patch for the deployment metadata: %v", err) return nil, fmt.Errorf("could not form patch for the deployment metadata: %v", err)
} }
result, err := c.KubeClient.Deployments(c.ConnectionPooler.Deployment.Namespace).Patch( result, err := c.KubeClient.Deployments(c.ConnectionPooler.Deployment[role].Namespace).Patch(
context.TODO(), context.TODO(),
c.ConnectionPooler.Deployment.Name, c.ConnectionPooler.Deployment[role].Name,
types.MergePatchType, types.MergePatchType,
[]byte(patchData), []byte(patchData),
metav1.PatchOptions{}, metav1.PatchOptions{},

View File

@ -848,7 +848,7 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
var newNeedConnectionPooler, oldNeedConnectionPooler bool var newNeedConnectionPooler, oldNeedConnectionPooler bool
// Check and perform the sync requirements for each of the roles. // Check and perform the sync requirements for each of the roles.
for _, role := range [2]PostgresRole{Master, Replica} { for _, role := range c.RolesConnectionPooler() {
if role == Master { if role == Master {
newNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&newSpec.Spec) newNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&newSpec.Spec)
oldNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&oldSpec.Spec) oldNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&oldSpec.Spec)
@ -858,6 +858,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
} }
if c.ConnectionPooler == nil { if c.ConnectionPooler == nil {
c.ConnectionPooler = &ConnectionPoolerObjects{} c.ConnectionPooler = &ConnectionPoolerObjects{}
c.ConnectionPooler.Deployment = make(map[PostgresRole]*appsv1.Deployment)
c.ConnectionPooler.Service = make(map[PostgresRole]*v1.Service)
} }
if newNeedConnectionPooler { if newNeedConnectionPooler {
@ -909,29 +911,13 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
if !oldNeedConnectionPooler && !newNeedConnectionPooler { if !oldNeedConnectionPooler && !newNeedConnectionPooler {
// delete and cleanup resources if not empty // delete and cleanup resources if not empty
if role == Master {
if c.ConnectionPooler != nil && if c.ConnectionPooler != nil &&
(c.ConnectionPooler.Deployment != nil || (c.ConnectionPooler.Deployment[role] != nil ||
c.ConnectionPooler.Service != nil) { c.ConnectionPooler.Service[role] != nil) {
if err = c.deleteConnectionPooler(role); err != nil { if err = c.deleteConnectionPooler(role); err != nil {
c.logger.Warningf("could not remove connection pooler: %v", err) c.logger.Warningf("could not remove connection pooler: %v", err)
} }
} else if c.ConnectionPooler.ReplDeployment == nil && c.ConnectionPooler.ReplService == nil {
c.ConnectionPooler = nil
}
} else {
if c.ConnectionPooler != nil &&
(c.ConnectionPooler.ReplDeployment != nil ||
c.ConnectionPooler.ReplService != nil) {
if err = c.deleteConnectionPooler(role); err != nil {
c.logger.Warningf("could not remove connection pooler: %v", err)
}
} else if c.ConnectionPooler.Deployment == nil && c.ConnectionPooler.Service == nil {
c.ConnectionPooler = nil
}
} }
} }
} }
@ -967,22 +953,13 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil { if err != nil {
return NoSync, err return NoSync, err
} }
c.ConnectionPooler.Deployment[role] = deployment
if role == Master {
c.ConnectionPooler.Deployment = deployment
} else {
c.ConnectionPooler.ReplDeployment = deployment
}
} else if err != nil { } else if err != nil {
msg := "could not get connection pooler deployment to sync: %v" msg := "could not get connection pooler deployment to sync: %v"
return NoSync, fmt.Errorf(msg, err) return NoSync, fmt.Errorf(msg, err)
} else { } else {
if role == Master { c.ConnectionPooler.Deployment[role] = deployment
c.ConnectionPooler.Deployment = deployment
} else {
c.ConnectionPooler.ReplDeployment = deployment
}
// actual synchronization // actual synchronization
oldConnectionPooler := oldSpec.Spec.ConnectionPooler oldConnectionPooler := oldSpec.Spec.ConnectionPooler
newConnectionPooler := newSpec.Spec.ConnectionPooler newConnectionPooler := newSpec.Spec.ConnectionPooler
@ -1007,34 +984,33 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
reason := append(specReason, defaultsReason...) reason := append(specReason, defaultsReason...)
if specSync || defaultsSync { if specSync || defaultsSync {
c.logger.Infof("Update connection pooler deployment %s, reason: %+v",
c.connectionPoolerName(role), reason)
newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role)
if err != nil { if err != nil {
msg := "could not generate deployment for connection pooler: %v" msg := "could not generate deployment for connection pooler: %v"
return reason, fmt.Errorf(msg, err) return reason, fmt.Errorf(msg, err)
} }
oldDeploymentSpec := c.ConnectionPooler.Deployment oldDeploymentSpec := c.ConnectionPooler.Deployment[role]
deployment, err := c.updateConnectionPoolerDeployment( deployment, err := c.updateConnectionPoolerDeployment(
oldDeploymentSpec, oldDeploymentSpec,
newDeploymentSpec) newDeploymentSpec,
role)
if err != nil { if err != nil {
return reason, err return reason, err
} }
if role == Master { c.ConnectionPooler.Deployment[role] = deployment
c.ConnectionPooler.Deployment = deployment
} else {
c.ConnectionPooler.ReplDeployment = deployment
}
return reason, nil return reason, nil
} }
} }
newAnnotations := c.AnnotationsToPropagate(c.ConnectionPooler.Deployment.Annotations) newAnnotations := c.AnnotationsToPropagate(c.ConnectionPooler.Deployment[role].Annotations)
if newAnnotations != nil { if newAnnotations != nil {
c.updateConnectionPoolerAnnotations(newAnnotations) c.updateConnectionPoolerAnnotations(newAnnotations, role)
} }
service, err := c.KubeClient. service, err := c.KubeClient.
@ -1053,22 +1029,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil { if err != nil {
return NoSync, err return NoSync, err
} }
if role == Master { c.ConnectionPooler.Service[role] = service
c.ConnectionPooler.Service = service
} else {
c.ConnectionPooler.ReplService = service
}
} else if err != nil { } else if err != nil {
msg := "could not get connection pooler service to sync: %v" msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err) return NoSync, fmt.Errorf(msg, err)
} else { } else {
// Service updates are not supported and probably not that useful anyway // Service updates are not supported and probably not that useful anyway
if role == Master { c.ConnectionPooler.Service[role] = service
c.ConnectionPooler.Service = service
} else {
c.ConnectionPooler.ReplService = service
}
} }
return NoSync, nil return NoSync, nil

View File

@ -521,17 +521,36 @@ func (c *Cluster) patroniKubernetesUseConfigMaps() bool {
// isConnectionPoolerEnabled // isConnectionPoolerEnabled
func (c *Cluster) needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { func (c *Cluster) needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) return (spec.EnableConnectionPooler != nil && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil)
} }
func (c *Cluster) needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { func (c *Cluster) needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
return nil != spec.EnableReplicaConnectionPooler && *spec.EnableReplicaConnectionPooler return spec.EnableReplicaConnectionPooler != nil && *spec.EnableReplicaConnectionPooler
} }
func (c *Cluster) needMasterConnectionPooler() bool { func (c *Cluster) needMasterConnectionPooler() bool {
return c.needMasterConnectionPoolerWorker(&c.Spec) return c.needMasterConnectionPoolerWorker(&c.Spec)
} }
func (c *Cluster) needConnectionPooler() bool {
return c.needMasterConnectionPoolerWorker(&c.Spec) || c.needReplicaConnectionPoolerWorker(&c.Spec)
}
// RolesConnectionPooler gives the list of roles which need connection pooler
func (c *Cluster) RolesConnectionPooler() []PostgresRole {
roles := []PostgresRole{}
i := 0
if c.needMasterConnectionPoolerWorker(&c.Spec) {
roles[i] = Master
i = i + 1
}
if c.needMasterConnectionPoolerWorker(&c.Spec) {
roles[i] = Replica
}
return roles
}
func (c *Cluster) needReplicaConnectionPooler() bool { func (c *Cluster) needReplicaConnectionPooler() bool {
return c.needReplicaConnectionPoolerWorker(&c.Spec) return c.needReplicaConnectionPoolerWorker(&c.Spec)
} }