This commit is contained in:
Rafia Sabih 2020-09-04 08:02:27 +02:00
parent 1814342dc3
commit 374dd00538
1 changed files with 62 additions and 98 deletions

View File

@ -887,10 +887,17 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
} }
} }
if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil { if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, Master); err != nil {
c.logger.Errorf("could not sync connection pooler: %v", err) c.logger.Errorf("could not sync connection pooler: %v", err)
return reason, err return reason, err
} }
if newSpec.Spec.EnableReplicaConnectionPooler != nil &&
*newSpec.Spec.EnableReplicaConnectionPooler == true {
if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, Replica); err != nil {
c.logger.Errorf("could not sync connection pooler: %v", err)
return reason, err
}
}
} }
if oldNeedConnectionPooler && !newNeedConnectionPooler { if oldNeedConnectionPooler && !newNeedConnectionPooler {
@ -922,21 +929,37 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
// synchronizing the corresponding deployment, but in case of deployment or // synchronizing the corresponding deployment, but in case of deployment or
// service is missing, create it. After checking, also remember an object for // service is missing, create it. After checking, also remember an object for
// the future references. // the future references.
func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) ( func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) (
SyncReason, error) { SyncReason, error) {
masterdeployment, err := c.checkAndCreateConnectionPoolerDeployment(Master, newSpec) deployment, err := c.KubeClient.
if err != nil { Deployments(c.Namespace).
msg := "could not get connection pooler deployment to sync: %v" Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{})
return NoSync, fmt.Errorf(msg, err)
} if err != nil && k8sutil.ResourceNotFound(err) {
replicadeployment, err := c.checkAndCreateConnectionPoolerDeployment(Replica, newSpec) msg := "Deployment %s for connection pooler synchronization is not found, create it"
if err != nil { c.logger.Warningf(msg, c.connectionPoolerName(role))
deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role)
if err != nil {
msg = "could not generate deployment for connection pooler: %v"
return NoSync, fmt.Errorf(msg, err)
}
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
if err != nil {
return NoSync, err
}
c.ConnectionPooler.Deployment = deployment
} else if err != nil {
msg := "could not get connection pooler deployment to sync: %v" msg := "could not get connection pooler deployment to sync: %v"
return NoSync, fmt.Errorf(msg, err) return NoSync, fmt.Errorf(msg, err)
} else { } else {
c.ConnectionPooler.Deployment = masterdeployment c.ConnectionPooler.Deployment = deployment
c.ConnectionPooler.ReplDeployment = replicadeployment
// actual synchronization // actual synchronization
oldConnectionPooler := oldSpec.Spec.ConnectionPooler oldConnectionPooler := oldSpec.Spec.ConnectionPooler
@ -955,28 +978,32 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
newConnectionPooler = &acidv1.ConnectionPooler{} newConnectionPooler = &acidv1.ConnectionPooler{}
} }
c.logger.Infof("Old: %+v, New: %+v", oldConnectionPooler, newConnectionPooler) c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler)
specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler)
defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, masterdeployment) defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment)
reason := append(specReason, defaultsReason...) reason := append(specReason, defaultsReason...)
if specSync || defaultsSync { if specSync || defaultsSync {
newmasterdeployment, err := c.UpdateConnectionPoolerDeploymentSub(Master, reason[:], newSpec) c.logger.Infof("Update connection pooler deployment %s, reason: %+v",
if err != nil { c.connectionPoolerName(role), reason)
return reason, err
} newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role)
c.ConnectionPooler.Deployment = newmasterdeployment if err != nil {
return reason, nil msg := "could not generate deployment for connection pooler: %v"
} return reason, fmt.Errorf(msg, err)
}
oldDeploymentSpec := c.ConnectionPooler.Deployment
deployment, err := c.updateConnectionPoolerDeployment(
oldDeploymentSpec,
newDeploymentSpec)
defaultsSync, defaultsReason = c.needSyncConnectionPoolerDefaults(newConnectionPooler, replicadeployment)
reason = append(specReason, defaultsReason...)
if specSync || defaultsSync {
newreplicadeployment, err := c.UpdateConnectionPoolerDeploymentSub(Replica, reason[:], newSpec)
if err != nil { if err != nil {
return reason, err return reason, err
} }
c.ConnectionPooler.Deployment = newreplicadeployment
c.ConnectionPooler.Deployment = deployment
return reason, nil return reason, nil
} }
} }
@ -985,76 +1012,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if newAnnotations != nil { if newAnnotations != nil {
c.updateConnectionPoolerAnnotations(newAnnotations) c.updateConnectionPoolerAnnotations(newAnnotations)
} }
masterservice, err := c.checkAndCreateConnectionPoolerService(Master, newSpec)
if err != nil {
msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err)
}
replicaservice, err := c.checkAndCreateConnectionPoolerService(Replica, newSpec)
if err != nil {
msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err)
} else {
// Service updates are not supported and probably not that useful anyway
c.ConnectionPooler.Service = masterservice
c.ConnectionPooler.ReplService = replicaservice
}
return NoSync, nil
}
func (c *Cluster) UpdateConnectionPoolerDeploymentSub(role PostgresRole, reason []string, newSpec *acidv1.Postgresql) (*appsv1.Deployment, error) {
c.logger.Infof("Update connection pooler deployment %s, reason: %+v",
c.connectionPoolerName(role), reason)
newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role)
if err != nil {
msg := "could not generate deployment for connection pooler: %v"
return nil, fmt.Errorf(msg, err)
}
oldDeploymentSpec := c.ConnectionPooler.Deployment
deployment, err := c.updateConnectionPoolerDeployment(
oldDeploymentSpec,
newDeploymentSpec)
if err != nil {
return nil, err
}
return deployment, nil
}
func (c *Cluster) checkAndCreateConnectionPoolerDeployment(role PostgresRole, newSpec *acidv1.Postgresql) (*appsv1.Deployment, error) {
deployment, err := c.KubeClient.
Deployments(c.Namespace).
Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{})
if err != nil && k8sutil.ResourceNotFound(err) {
msg := "Deployment %s for connection pooler synchronization is not found, create it"
c.logger.Warningf(msg, c.connectionPoolerName(role))
deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role)
if err != nil {
msg = "could not generate deployment for connection pooler: %v"
return nil, fmt.Errorf(msg, err)
}
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return deployment, nil
}
return deployment, nil
}
func (c *Cluster) checkAndCreateConnectionPoolerService(role PostgresRole, newSpec *acidv1.Postgresql) (*v1.Service, error) {
service, err := c.KubeClient. service, err := c.KubeClient.
Services(c.Namespace). Services(c.Namespace).
@ -1070,10 +1027,17 @@ func (c *Cluster) checkAndCreateConnectionPoolerService(role PostgresRole, newSp
Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
if err != nil { if err != nil {
return nil, err return NoSync, err
} }
return service, nil c.ConnectionPooler.Service = service
} else if err != nil {
msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err)
} else {
// Service updates are not supported and probably not that useful anyway
c.ConnectionPooler.Service = service
} }
return service, nil
return NoSync, nil
} }