From 374dd0053855e9c5f1cd12f02c35268af0294b47 Mon Sep 17 00:00:00 2001 From: Rafia Sabih Date: Fri, 4 Sep 2020 08:02:27 +0200 Subject: [PATCH] Fix sync --- pkg/cluster/sync.go | 160 +++++++++++++++++--------------------------- 1 file changed, 62 insertions(+), 98 deletions(-) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 3ddff4e6e..f2df8771d 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -887,10 +887,17 @@ func (c *Cluster) syncConnectionPooler(oldSpec, } } - if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil { + if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, Master); err != nil { c.logger.Errorf("could not sync connection pooler: %v", err) return reason, err } + if newSpec.Spec.EnableReplicaConnectionPooler != nil && + *newSpec.Spec.EnableReplicaConnectionPooler == true { + if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, Replica); err != nil { + c.logger.Errorf("could not sync connection pooler: %v", err) + return reason, err + } + } } if oldNeedConnectionPooler && !newNeedConnectionPooler { @@ -922,21 +929,37 @@ func (c *Cluster) syncConnectionPooler(oldSpec, // synchronizing the corresponding deployment, but in case of deployment or // service is missing, create it. After checking, also remember an object for // the future references. -func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) ( +func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) ( SyncReason, error) { - masterdeployment, err := c.checkAndCreateConnectionPoolerDeployment(Master, newSpec) - if err != nil { - msg := "could not get connection pooler deployment to sync: %v" - return NoSync, fmt.Errorf(msg, err) - } - replicadeployment, err := c.checkAndCreateConnectionPoolerDeployment(Replica, newSpec) - if err != nil { + deployment, err := c.KubeClient. + Deployments(c.Namespace). + Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Deployment %s for connection pooler synchronization is not found, create it" + c.logger.Warningf(msg, c.connectionPoolerName(role)) + + deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) + if err != nil { + msg = "could not generate deployment for connection pooler: %v" + return NoSync, fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + + if err != nil { + return NoSync, err + } + + c.ConnectionPooler.Deployment = deployment + } else if err != nil { msg := "could not get connection pooler deployment to sync: %v" return NoSync, fmt.Errorf(msg, err) } else { - c.ConnectionPooler.Deployment = masterdeployment - c.ConnectionPooler.ReplDeployment = replicadeployment + c.ConnectionPooler.Deployment = deployment // actual synchronization oldConnectionPooler := oldSpec.Spec.ConnectionPooler @@ -955,28 +978,32 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql newConnectionPooler = &acidv1.ConnectionPooler{} } - c.logger.Infof("Old: %+v, New: %+v", oldConnectionPooler, newConnectionPooler) + c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler) specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) - defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, masterdeployment) + defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment) reason := append(specReason, defaultsReason...) if specSync || defaultsSync { - newmasterdeployment, err := c.UpdateConnectionPoolerDeploymentSub(Master, reason[:], newSpec) - if err != nil { - return reason, err - } - c.ConnectionPooler.Deployment = newmasterdeployment - return reason, nil - } + c.logger.Infof("Update connection pooler deployment %s, reason: %+v", + c.connectionPoolerName(role), reason) + + newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) + if err != nil { + msg := "could not generate deployment for connection pooler: %v" + return reason, fmt.Errorf(msg, err) + } + + oldDeploymentSpec := c.ConnectionPooler.Deployment + + deployment, err := c.updateConnectionPoolerDeployment( + oldDeploymentSpec, + newDeploymentSpec) - defaultsSync, defaultsReason = c.needSyncConnectionPoolerDefaults(newConnectionPooler, replicadeployment) - reason = append(specReason, defaultsReason...) - if specSync || defaultsSync { - newreplicadeployment, err := c.UpdateConnectionPoolerDeploymentSub(Replica, reason[:], newSpec) if err != nil { return reason, err } - c.ConnectionPooler.Deployment = newreplicadeployment + + c.ConnectionPooler.Deployment = deployment return reason, nil } } @@ -985,76 +1012,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if newAnnotations != nil { c.updateConnectionPoolerAnnotations(newAnnotations) } - masterservice, err := c.checkAndCreateConnectionPoolerService(Master, newSpec) - if err != nil { - msg := "could not get connection pooler service to sync: %v" - return NoSync, fmt.Errorf(msg, err) - } - replicaservice, err := c.checkAndCreateConnectionPoolerService(Replica, newSpec) - if err != nil { - msg := "could not get connection pooler service to sync: %v" - return NoSync, fmt.Errorf(msg, err) - } else { - // Service updates are not supported and probably not that useful anyway - c.ConnectionPooler.Service = masterservice - c.ConnectionPooler.ReplService = replicaservice - } - - return NoSync, nil -} - -func (c *Cluster) UpdateConnectionPoolerDeploymentSub(role PostgresRole, reason []string, newSpec *acidv1.Postgresql) (*appsv1.Deployment, error) { - - c.logger.Infof("Update connection pooler deployment %s, reason: %+v", - c.connectionPoolerName(role), reason) - - newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) - if err != nil { - msg := "could not generate deployment for connection pooler: %v" - return nil, fmt.Errorf(msg, err) - } - - oldDeploymentSpec := c.ConnectionPooler.Deployment - - deployment, err := c.updateConnectionPoolerDeployment( - oldDeploymentSpec, - newDeploymentSpec) - - if err != nil { - return nil, err - } - return deployment, nil -} - -func (c *Cluster) checkAndCreateConnectionPoolerDeployment(role PostgresRole, newSpec *acidv1.Postgresql) (*appsv1.Deployment, error) { - - deployment, err := c.KubeClient. - Deployments(c.Namespace). - Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) - - if err != nil && k8sutil.ResourceNotFound(err) { - msg := "Deployment %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName(role)) - - deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) - if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return nil, fmt.Errorf(msg, err) - } - - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - return deployment, nil - } - return deployment, nil -} - -func (c *Cluster) checkAndCreateConnectionPoolerService(role PostgresRole, newSpec *acidv1.Postgresql) (*v1.Service, error) { service, err := c.KubeClient. Services(c.Namespace). @@ -1070,10 +1027,17 @@ func (c *Cluster) checkAndCreateConnectionPoolerService(role PostgresRole, newSp Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { - return nil, err + return NoSync, err } - return service, nil + c.ConnectionPooler.Service = service + } else if err != nil { + msg := "could not get connection pooler service to sync: %v" + return NoSync, fmt.Errorf(msg, err) + } else { + // Service updates are not supported and probably not that useful anyway + c.ConnectionPooler.Service = service } - return service, nil + + return NoSync, nil }