From 28a39f5c082537a336c69b7e722402a2f478ae45 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 14 Mar 2022 12:10:21 +0100 Subject: [PATCH] always sync pooler objects --- pkg/cluster/connection_pooler.go | 76 ++++++++++++-------------------- 1 file changed, 27 insertions(+), 49 deletions(-) diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 12c0be6ae..da25bf535 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -3,7 +3,6 @@ package cluster import ( "context" "fmt" - "reflect" "strings" "github.com/r3labs/diff" @@ -722,31 +721,6 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look var err error var connectionPoolerNeeded bool - needSync := !reflect.DeepEqual(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler) - masterChanges, err := diff.Diff(oldSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableConnectionPooler) - if err != nil { - c.logger.Error("Error in getting diff of master connection pooler changes") - } - replicaChanges, err := diff.Diff(oldSpec.Spec.EnableReplicaConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler) - if err != nil { - c.logger.Error("Error in getting diff of replica connection pooler changes") - } - - // skip pooler sync when theres no diff or it's deactivated - // but, handling the case when connectionPooler is not there but it is required - // as per spec, hence do not skip syncing in that case, even though there - // is no diff in specs - if (!needSync && len(masterChanges) <= 0 && len(replicaChanges) <= 0) && - ((!needConnectionPooler(&newSpec.Spec) && (c.ConnectionPooler == nil || !needConnectionPooler(&oldSpec.Spec))) || - (c.ConnectionPooler != nil && needConnectionPooler(&newSpec.Spec) && - ((c.ConnectionPooler[Master] != nil && c.ConnectionPooler[Master].LookupFunction) || - (c.ConnectionPooler[Replica] != nil && c.ConnectionPooler[Replica].LookupFunction)))) { - c.logger.Debugln("syncing pooler is not required") - return nil, nil - } - - logPoolerEssentials(c.logger, oldSpec, newSpec) - // Check and perform the sync requirements for each of the roles. for _, role := range [2]PostgresRole{Master, Replica} { @@ -781,7 +755,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look // in between // in this case also do not forget to install lookup function - if !c.ConnectionPooler[role].LookupFunction { + // skip installation in standby clusters, since they are read-only + if !c.ConnectionPooler[role].LookupFunction && c.Spec.StandbyCluster == nil { connectionPooler := c.Spec.ConnectionPooler specSchema := "" specUser := "" @@ -838,32 +813,37 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) ( SyncReason, error) { + var ( + deployment *appsv1.Deployment + newDeployment *appsv1.Deployment + svc *v1.Service + newService *v1.Service + err error + ) + syncReason := make([]string, 0) - deployment, err := c.KubeClient. + deployment, err = c.KubeClient. Deployments(c.Namespace). Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) if err != nil && k8sutil.ResourceNotFound(err) { - msg := "deployment %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName(role)) + c.logger.Warningf("deployment %s for connection pooler synchronization is not found, create it", c.connectionPoolerName(role)) - deploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return NoSync, fmt.Errorf(msg, err) + return NoSync, fmt.Errorf("could not generate deployment for connection pooler: %v", err) } - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + deployment, err = c.KubeClient. + Deployments(newDeployment.Namespace). + Create(context.TODO(), newDeployment, metav1.CreateOptions{}) if err != nil { return NoSync, err } c.ConnectionPooler[role].Deployment = deployment } else if err != nil { - msg := "could not get connection pooler deployment to sync: %v" - return NoSync, fmt.Errorf(msg, err) + return NoSync, fmt.Errorf("could not get connection pooler deployment to sync: %v", err) } else { c.ConnectionPooler[role].Deployment = deployment // actual synchronization @@ -900,16 +880,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql syncReason = append(syncReason, defaultsReason...) if specSync || defaultsSync { - c.logger.Infof("Update connection pooler deployment %s, reason: %+v", + c.logger.Infof("update connection pooler deployment %s, reason: %+v", c.connectionPoolerName(role), syncReason) - newDeploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) if err != nil { - msg := "could not generate deployment for connection pooler: %v" - return syncReason, fmt.Errorf(msg, err) + return syncReason, fmt.Errorf("could not generate deployment for connection pooler: %v", err) } - deployment, err := updateConnectionPoolerDeployment(c.KubeClient, - newDeploymentSpec) + deployment, err = updateConnectionPoolerDeployment(c.KubeClient, newDeployment) if err != nil { return syncReason, err @@ -927,17 +905,17 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql c.ConnectionPooler[role].Deployment = deployment } - if svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { + if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { c.ConnectionPooler[role].Service = svc desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role]) if match, reason := k8sutil.SameService(svc, desiredSvc); !match { syncReason = append(syncReason, reason) c.logServiceChanges(role, svc, desiredSvc, false, reason) - updatedService, err := c.updateService(role, svc, desiredSvc) + newService, err = c.updateService(role, svc, desiredSvc) if err != nil { return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err) } - c.ConnectionPooler[role].Service = updatedService + c.ConnectionPooler[role].Service = newService c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) } return NoSync, nil @@ -953,14 +931,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql c.logger.Warningf(msg, c.connectionPoolerName(role)) serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role]) - service, err := c.KubeClient. + newService, err = c.KubeClient. Services(serviceSpec.Namespace). Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { return NoSync, err } - c.ConnectionPooler[role].Service = service + c.ConnectionPooler[role].Service = newService return NoSync, nil }