always sync pooler objects
This commit is contained in:
		
							parent
							
								
									d032e4783e
								
							
						
					
					
						commit
						28a39f5c08
					
				| 
						 | 
					@ -3,7 +3,6 @@ package cluster
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"reflect"
 | 
					 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/r3labs/diff"
 | 
						"github.com/r3labs/diff"
 | 
				
			||||||
| 
						 | 
					@ -722,31 +721,6 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
	var connectionPoolerNeeded bool
 | 
						var connectionPoolerNeeded bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	needSync := !reflect.DeepEqual(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler)
 | 
					 | 
				
			||||||
	masterChanges, err := diff.Diff(oldSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableConnectionPooler)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		c.logger.Error("Error in getting diff of master connection pooler changes")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	replicaChanges, err := diff.Diff(oldSpec.Spec.EnableReplicaConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		c.logger.Error("Error in getting diff of replica connection pooler changes")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// skip pooler sync when theres no diff or it's deactivated
 | 
					 | 
				
			||||||
	// but, handling the case when connectionPooler is not there but it is required
 | 
					 | 
				
			||||||
	// as per spec, hence do not skip syncing in that case, even though there
 | 
					 | 
				
			||||||
	// is no diff in specs
 | 
					 | 
				
			||||||
	if (!needSync && len(masterChanges) <= 0 && len(replicaChanges) <= 0) &&
 | 
					 | 
				
			||||||
		((!needConnectionPooler(&newSpec.Spec) && (c.ConnectionPooler == nil || !needConnectionPooler(&oldSpec.Spec))) ||
 | 
					 | 
				
			||||||
			(c.ConnectionPooler != nil && needConnectionPooler(&newSpec.Spec) &&
 | 
					 | 
				
			||||||
				((c.ConnectionPooler[Master] != nil && c.ConnectionPooler[Master].LookupFunction) ||
 | 
					 | 
				
			||||||
					(c.ConnectionPooler[Replica] != nil && c.ConnectionPooler[Replica].LookupFunction)))) {
 | 
					 | 
				
			||||||
		c.logger.Debugln("syncing pooler is not required")
 | 
					 | 
				
			||||||
		return nil, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	logPoolerEssentials(c.logger, oldSpec, newSpec)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Check and perform the sync requirements for each of the roles.
 | 
						// Check and perform the sync requirements for each of the roles.
 | 
				
			||||||
	for _, role := range [2]PostgresRole{Master, Replica} {
 | 
						for _, role := range [2]PostgresRole{Master, Replica} {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -781,7 +755,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
 | 
				
			||||||
			// in between
 | 
								// in between
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// in this case also do not forget to install lookup function
 | 
								// in this case also do not forget to install lookup function
 | 
				
			||||||
			if !c.ConnectionPooler[role].LookupFunction {
 | 
								// skip installation in standby clusters, since they are read-only
 | 
				
			||||||
 | 
								if !c.ConnectionPooler[role].LookupFunction && c.Spec.StandbyCluster == nil {
 | 
				
			||||||
				connectionPooler := c.Spec.ConnectionPooler
 | 
									connectionPooler := c.Spec.ConnectionPooler
 | 
				
			||||||
				specSchema := ""
 | 
									specSchema := ""
 | 
				
			||||||
				specUser := ""
 | 
									specUser := ""
 | 
				
			||||||
| 
						 | 
					@ -838,32 +813,37 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
 | 
				
			||||||
func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) (
 | 
					func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) (
 | 
				
			||||||
	SyncReason, error) {
 | 
						SyncReason, error) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var (
 | 
				
			||||||
 | 
							deployment    *appsv1.Deployment
 | 
				
			||||||
 | 
							newDeployment *appsv1.Deployment
 | 
				
			||||||
 | 
							svc           *v1.Service
 | 
				
			||||||
 | 
							newService    *v1.Service
 | 
				
			||||||
 | 
							err           error
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	syncReason := make([]string, 0)
 | 
						syncReason := make([]string, 0)
 | 
				
			||||||
	deployment, err := c.KubeClient.
 | 
						deployment, err = c.KubeClient.
 | 
				
			||||||
		Deployments(c.Namespace).
 | 
							Deployments(c.Namespace).
 | 
				
			||||||
		Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{})
 | 
							Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err != nil && k8sutil.ResourceNotFound(err) {
 | 
						if err != nil && k8sutil.ResourceNotFound(err) {
 | 
				
			||||||
		msg := "deployment %s for connection pooler synchronization is not found, create it"
 | 
							c.logger.Warningf("deployment %s for connection pooler synchronization is not found, create it", c.connectionPoolerName(role))
 | 
				
			||||||
		c.logger.Warningf(msg, c.connectionPoolerName(role))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		deploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role])
 | 
							newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role])
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			msg = "could not generate deployment for connection pooler: %v"
 | 
								return NoSync, fmt.Errorf("could not generate deployment for connection pooler: %v", err)
 | 
				
			||||||
			return NoSync, fmt.Errorf(msg, err)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		deployment, err := c.KubeClient.
 | 
							deployment, err = c.KubeClient.
 | 
				
			||||||
			Deployments(deploymentSpec.Namespace).
 | 
								Deployments(newDeployment.Namespace).
 | 
				
			||||||
			Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
 | 
								Create(context.TODO(), newDeployment, metav1.CreateOptions{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return NoSync, err
 | 
								return NoSync, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		c.ConnectionPooler[role].Deployment = deployment
 | 
							c.ConnectionPooler[role].Deployment = deployment
 | 
				
			||||||
	} else if err != nil {
 | 
						} else if err != nil {
 | 
				
			||||||
		msg := "could not get connection pooler deployment to sync: %v"
 | 
							return NoSync, fmt.Errorf("could not get connection pooler deployment to sync: %v", err)
 | 
				
			||||||
		return NoSync, fmt.Errorf(msg, err)
 | 
					 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		c.ConnectionPooler[role].Deployment = deployment
 | 
							c.ConnectionPooler[role].Deployment = deployment
 | 
				
			||||||
		// actual synchronization
 | 
							// actual synchronization
 | 
				
			||||||
| 
						 | 
					@ -900,16 +880,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
 | 
				
			||||||
		syncReason = append(syncReason, defaultsReason...)
 | 
							syncReason = append(syncReason, defaultsReason...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if specSync || defaultsSync {
 | 
							if specSync || defaultsSync {
 | 
				
			||||||
			c.logger.Infof("Update connection pooler deployment %s, reason: %+v",
 | 
								c.logger.Infof("update connection pooler deployment %s, reason: %+v",
 | 
				
			||||||
				c.connectionPoolerName(role), syncReason)
 | 
									c.connectionPoolerName(role), syncReason)
 | 
				
			||||||
			newDeploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role])
 | 
								newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role])
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				msg := "could not generate deployment for connection pooler: %v"
 | 
									return syncReason, fmt.Errorf("could not generate deployment for connection pooler: %v", err)
 | 
				
			||||||
				return syncReason, fmt.Errorf(msg, err)
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			deployment, err := updateConnectionPoolerDeployment(c.KubeClient,
 | 
								deployment, err = updateConnectionPoolerDeployment(c.KubeClient, newDeployment)
 | 
				
			||||||
				newDeploymentSpec)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				return syncReason, err
 | 
									return syncReason, err
 | 
				
			||||||
| 
						 | 
					@ -927,17 +905,17 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
 | 
				
			||||||
		c.ConnectionPooler[role].Deployment = deployment
 | 
							c.ConnectionPooler[role].Deployment = deployment
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil {
 | 
						if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil {
 | 
				
			||||||
		c.ConnectionPooler[role].Service = svc
 | 
							c.ConnectionPooler[role].Service = svc
 | 
				
			||||||
		desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role])
 | 
							desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role])
 | 
				
			||||||
		if match, reason := k8sutil.SameService(svc, desiredSvc); !match {
 | 
							if match, reason := k8sutil.SameService(svc, desiredSvc); !match {
 | 
				
			||||||
			syncReason = append(syncReason, reason)
 | 
								syncReason = append(syncReason, reason)
 | 
				
			||||||
			c.logServiceChanges(role, svc, desiredSvc, false, reason)
 | 
								c.logServiceChanges(role, svc, desiredSvc, false, reason)
 | 
				
			||||||
			updatedService, err := c.updateService(role, svc, desiredSvc)
 | 
								newService, err = c.updateService(role, svc, desiredSvc)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err)
 | 
									return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			c.ConnectionPooler[role].Service = updatedService
 | 
								c.ConnectionPooler[role].Service = newService
 | 
				
			||||||
			c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
 | 
								c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		return NoSync, nil
 | 
							return NoSync, nil
 | 
				
			||||||
| 
						 | 
					@ -953,14 +931,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
 | 
				
			||||||
	c.logger.Warningf(msg, c.connectionPoolerName(role))
 | 
						c.logger.Warningf(msg, c.connectionPoolerName(role))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role])
 | 
						serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role])
 | 
				
			||||||
	service, err := c.KubeClient.
 | 
						newService, err = c.KubeClient.
 | 
				
			||||||
		Services(serviceSpec.Namespace).
 | 
							Services(serviceSpec.Namespace).
 | 
				
			||||||
		Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
 | 
							Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return NoSync, err
 | 
							return NoSync, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	c.ConnectionPooler[role].Service = service
 | 
						c.ConnectionPooler[role].Service = newService
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return NoSync, nil
 | 
						return NoSync, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue