Adding test cases and other changes
- Refactor needConnectionPooler for master and replica separately - Improve sync function - Add test cases to create, delete and sync with repplica connection pooler Other changes
This commit is contained in:
		
							parent
							
								
									374dd00538
								
							
						
					
					
						commit
						b3dbac5b81
					
				| 
						 | 
					@ -348,7 +348,7 @@ func (c *Cluster) Create() error {
 | 
				
			||||||
	//
 | 
						//
 | 
				
			||||||
	// Do not consider connection pooler as a strict requirement, and if
 | 
						// Do not consider connection pooler as a strict requirement, and if
 | 
				
			||||||
	// something fails, report warning
 | 
						// something fails, report warning
 | 
				
			||||||
	if c.needConnectionPooler() {
 | 
						if c.needMasterConnectionPooler() || c.needReplicaConnectionPooler() {
 | 
				
			||||||
		if c.ConnectionPooler != nil {
 | 
							if c.ConnectionPooler != nil {
 | 
				
			||||||
			c.logger.Warning("Connection pooler already exists in the cluster")
 | 
								c.logger.Warning("Connection pooler already exists in the cluster")
 | 
				
			||||||
			return nil
 | 
								return nil
 | 
				
			||||||
| 
						 | 
					@ -641,7 +641,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
 | 
				
			||||||
	// initUsers. Check if it needs to be called.
 | 
						// initUsers. Check if it needs to be called.
 | 
				
			||||||
	sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) &&
 | 
						sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) &&
 | 
				
			||||||
		reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases)
 | 
							reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases)
 | 
				
			||||||
	needConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec)
 | 
						needConnectionPooler := c.needMasterConnectionPoolerWorker(&newSpec.Spec)
 | 
				
			||||||
	if !sameUsers || needConnectionPooler {
 | 
						if !sameUsers || needConnectionPooler {
 | 
				
			||||||
		c.logger.Debugf("syncing secrets")
 | 
							c.logger.Debugf("syncing secrets")
 | 
				
			||||||
		if err := c.initUsers(); err != nil {
 | 
							if err := c.initUsers(); err != nil {
 | 
				
			||||||
| 
						 | 
					@ -906,7 +906,7 @@ func (c *Cluster) initSystemUsers() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Connection pooler user is an exception, if requested it's going to be
 | 
						// Connection pooler user is an exception, if requested it's going to be
 | 
				
			||||||
	// created by operator as a normal pgUser
 | 
						// created by operator as a normal pgUser
 | 
				
			||||||
	if c.needConnectionPooler() {
 | 
						if c.needMasterConnectionPooler() || c.needReplicaConnectionPooler() {
 | 
				
			||||||
		// initialize empty connection pooler if not done yet
 | 
							// initialize empty connection pooler if not done yet
 | 
				
			||||||
		if c.Spec.ConnectionPooler == nil {
 | 
							if c.Spec.ConnectionPooler == nil {
 | 
				
			||||||
			c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{}
 | 
								c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -126,7 +126,8 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
 | 
				
			||||||
		msg = "could not prepare database for connection pooler: %v"
 | 
							msg = "could not prepare database for connection pooler: %v"
 | 
				
			||||||
		return nil, fmt.Errorf(msg, err)
 | 
							return nil, fmt.Errorf(msg, err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if c.Spec.EnableConnectionPooler != nil || c.ConnectionPooler != nil {
 | 
						if c.needMasterConnectionPooler() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Master)
 | 
							deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Master)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			msg = "could not generate deployment for connection pooler: %v"
 | 
								msg = "could not generate deployment for connection pooler: %v"
 | 
				
			||||||
| 
						 | 
					@ -158,9 +159,11 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		c.logger.Debugf("created new connection pooler %q, uid: %q",
 | 
							c.logger.Debugf("created new connection pooler %q, uid: %q",
 | 
				
			||||||
			util.NameFromMeta(deployment.ObjectMeta), deployment.UID)
 | 
								util.NameFromMeta(deployment.ObjectMeta), deployment.UID)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if c.Spec.EnableReplicaConnectionPooler != nil && *c.Spec.EnableReplicaConnectionPooler == true {
 | 
						if c.needReplicaConnectionPooler() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		repldeploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Replica)
 | 
							repldeploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Replica)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			msg = "could not generate deployment for connection pooler: %v"
 | 
								msg = "could not generate deployment for connection pooler: %v"
 | 
				
			||||||
| 
						 | 
					@ -192,6 +195,7 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		c.logger.Debugf("created new connection pooler for replica %q, uid: %q",
 | 
							c.logger.Debugf("created new connection pooler for replica %q, uid: %q",
 | 
				
			||||||
			util.NameFromMeta(repldeployment.ObjectMeta), repldeployment.UID)
 | 
								util.NameFromMeta(repldeployment.ObjectMeta), repldeployment.UID)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return c.ConnectionPooler, nil
 | 
						return c.ConnectionPooler, nil
 | 
				
			||||||
| 
						 | 
					@ -211,8 +215,13 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) {
 | 
				
			||||||
	// Clean up the deployment object. If deployment resource we've remembered
 | 
						// Clean up the deployment object. If deployment resource we've remembered
 | 
				
			||||||
	// is somehow empty, try to delete based on what would we generate
 | 
						// is somehow empty, try to delete based on what would we generate
 | 
				
			||||||
	deploymentName := c.connectionPoolerName(role)
 | 
						deploymentName := c.connectionPoolerName(role)
 | 
				
			||||||
	deployment := c.ConnectionPooler.Deployment
 | 
						var deployment *appsv1.Deployment
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if role == Master {
 | 
				
			||||||
 | 
							deployment = c.ConnectionPooler.Deployment
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							deployment = c.ConnectionPooler.ReplDeployment
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	if deployment != nil {
 | 
						if deployment != nil {
 | 
				
			||||||
		deploymentName = deployment.Name
 | 
							deploymentName = deployment.Name
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -234,7 +243,12 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) {
 | 
				
			||||||
	c.logger.Infof("Connection pooler deployment %q has been deleted", deploymentName)
 | 
						c.logger.Infof("Connection pooler deployment %q has been deleted", deploymentName)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Repeat the same for the service object
 | 
						// Repeat the same for the service object
 | 
				
			||||||
	service := c.ConnectionPooler.Service
 | 
						var service *v1.Service
 | 
				
			||||||
 | 
						if role == Master {
 | 
				
			||||||
 | 
							service = c.ConnectionPooler.Service
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							service = c.ConnectionPooler.ReplService
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	serviceName := c.connectionPoolerName(role)
 | 
						serviceName := c.connectionPoolerName(role)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if service != nil {
 | 
						if service != nil {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -66,6 +66,31 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Errorf("%s: Cannot delete connection pooler, %s", testName, err)
 | 
							t.Errorf("%s: Cannot delete connection pooler, %s", testName, err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						//Check if Replica connection pooler can be create and deleted successfully
 | 
				
			||||||
 | 
						cluster.Spec = acidv1.PostgresSpec{
 | 
				
			||||||
 | 
							EnableReplicaConnectionPooler: boolToPointer(true),
 | 
				
			||||||
 | 
							ConnectionPooler:              &acidv1.ConnectionPooler{},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						replpoolerResources, err := cluster.createConnectionPooler(mockInstallLookupFunction)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("%s: Cannot create replica connection pooler, %s, %+v",
 | 
				
			||||||
 | 
								testName, err, replpoolerResources)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if replpoolerResources.ReplDeployment == nil {
 | 
				
			||||||
 | 
							t.Errorf("%s: Connection pooler replica deployment is empty", testName)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if replpoolerResources.ReplService == nil {
 | 
				
			||||||
 | 
							t.Errorf("%s: Connection pooler replica service is empty", testName)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err = cluster.deleteConnectionPooler(Replica)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("%s: Cannot delete replica connection pooler, %s", testName, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestNeedConnectionPooler(t *testing.T) {
 | 
					func TestNeedConnectionPooler(t *testing.T) {
 | 
				
			||||||
| 
						 | 
					@ -91,7 +116,7 @@ func TestNeedConnectionPooler(t *testing.T) {
 | 
				
			||||||
		ConnectionPooler: &acidv1.ConnectionPooler{},
 | 
							ConnectionPooler: &acidv1.ConnectionPooler{},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !cluster.needConnectionPooler() {
 | 
						if !cluster.needMasterConnectionPooler() {
 | 
				
			||||||
		t.Errorf("%s: Connection pooler is not enabled with full definition",
 | 
							t.Errorf("%s: Connection pooler is not enabled with full definition",
 | 
				
			||||||
			testName)
 | 
								testName)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -100,7 +125,7 @@ func TestNeedConnectionPooler(t *testing.T) {
 | 
				
			||||||
		EnableConnectionPooler: boolToPointer(true),
 | 
							EnableConnectionPooler: boolToPointer(true),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !cluster.needConnectionPooler() {
 | 
						if !cluster.needMasterConnectionPooler() {
 | 
				
			||||||
		t.Errorf("%s: Connection pooler is not enabled with flag",
 | 
							t.Errorf("%s: Connection pooler is not enabled with flag",
 | 
				
			||||||
			testName)
 | 
								testName)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -110,7 +135,7 @@ func TestNeedConnectionPooler(t *testing.T) {
 | 
				
			||||||
		ConnectionPooler:       &acidv1.ConnectionPooler{},
 | 
							ConnectionPooler:       &acidv1.ConnectionPooler{},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if cluster.needConnectionPooler() {
 | 
						if cluster.needMasterConnectionPooler() {
 | 
				
			||||||
		t.Errorf("%s: Connection pooler is still enabled with flag being false",
 | 
							t.Errorf("%s: Connection pooler is still enabled with flag being false",
 | 
				
			||||||
			testName)
 | 
								testName)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -120,8 +145,47 @@ func TestNeedConnectionPooler(t *testing.T) {
 | 
				
			||||||
		ConnectionPooler:       &acidv1.ConnectionPooler{},
 | 
							ConnectionPooler:       &acidv1.ConnectionPooler{},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !cluster.needConnectionPooler() {
 | 
						if !cluster.needMasterConnectionPooler() {
 | 
				
			||||||
		t.Errorf("%s: Connection pooler is not enabled with flag and full",
 | 
							t.Errorf("%s: Connection pooler is not enabled with flag and full",
 | 
				
			||||||
			testName)
 | 
								testName)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Test for replica connection pooler
 | 
				
			||||||
 | 
						cluster.Spec = acidv1.PostgresSpec{
 | 
				
			||||||
 | 
							ConnectionPooler: &acidv1.ConnectionPooler{},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if cluster.needReplicaConnectionPooler() {
 | 
				
			||||||
 | 
							t.Errorf("%s: Replica Connection pooler is not enabled with full definition",
 | 
				
			||||||
 | 
								testName)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						cluster.Spec = acidv1.PostgresSpec{
 | 
				
			||||||
 | 
							EnableReplicaConnectionPooler: boolToPointer(true),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if !cluster.needReplicaConnectionPooler() {
 | 
				
			||||||
 | 
							t.Errorf("%s: Replica Connection pooler is not enabled with flag",
 | 
				
			||||||
 | 
								testName)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						cluster.Spec = acidv1.PostgresSpec{
 | 
				
			||||||
 | 
							EnableReplicaConnectionPooler: boolToPointer(false),
 | 
				
			||||||
 | 
							ConnectionPooler:              &acidv1.ConnectionPooler{},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if cluster.needReplicaConnectionPooler() {
 | 
				
			||||||
 | 
							t.Errorf("%s: Replica Connection pooler is still enabled with flag being false",
 | 
				
			||||||
 | 
								testName)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						cluster.Spec = acidv1.PostgresSpec{
 | 
				
			||||||
 | 
							EnableReplicaConnectionPooler: boolToPointer(true),
 | 
				
			||||||
 | 
							ConnectionPooler:              &acidv1.ConnectionPooler{},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if !cluster.needReplicaConnectionPooler() {
 | 
				
			||||||
 | 
							t.Errorf("%s: Replica Connection pooler is not enabled with flag and full",
 | 
				
			||||||
 | 
								testName)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -560,7 +560,7 @@ func (c *Cluster) syncRoles() (err error) {
 | 
				
			||||||
		userNames = append(userNames, u.Name)
 | 
							userNames = append(userNames, u.Name)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if c.needConnectionPooler() {
 | 
						if c.needMasterConnectionPooler() || c.needReplicaConnectionPooler() {
 | 
				
			||||||
		connectionPoolerUser := c.systemUsers[constants.ConnectionPoolerUserKeyName]
 | 
							connectionPoolerUser := c.systemUsers[constants.ConnectionPoolerUserKeyName]
 | 
				
			||||||
		userNames = append(userNames, connectionPoolerUser.Name)
 | 
							userNames = append(userNames, connectionPoolerUser.Name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -845,13 +845,20 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var reason SyncReason
 | 
						var reason SyncReason
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
 | 
						var newNeedConnectionPooler, oldNeedConnectionPooler bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if c.ConnectionPooler == nil {
 | 
						if c.ConnectionPooler == nil {
 | 
				
			||||||
		c.ConnectionPooler = &ConnectionPoolerObjects{}
 | 
							c.ConnectionPooler = &ConnectionPoolerObjects{}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	newNeedConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec)
 | 
						for _, role := range [2]PostgresRole{Master, Replica} {
 | 
				
			||||||
	oldNeedConnectionPooler := c.needConnectionPoolerWorker(&oldSpec.Spec)
 | 
							if role == Master {
 | 
				
			||||||
 | 
								newNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&newSpec.Spec)
 | 
				
			||||||
 | 
								oldNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&oldSpec.Spec)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								newNeedConnectionPooler = c.needReplicaConnectionPoolerWorker(&newSpec.Spec)
 | 
				
			||||||
 | 
								oldNeedConnectionPooler = c.needReplicaConnectionPoolerWorker(&oldSpec.Spec)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if newNeedConnectionPooler {
 | 
							if newNeedConnectionPooler {
 | 
				
			||||||
			// Try to sync in any case. If we didn't needed connection pooler before,
 | 
								// Try to sync in any case. If we didn't needed connection pooler before,
 | 
				
			||||||
| 
						 | 
					@ -887,37 +894,59 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, Master); err != nil {
 | 
								if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, role); err != nil {
 | 
				
			||||||
				c.logger.Errorf("could not sync connection pooler: %v", err)
 | 
									c.logger.Errorf("could not sync connection pooler: %v", err)
 | 
				
			||||||
				return reason, err
 | 
									return reason, err
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		if newSpec.Spec.EnableReplicaConnectionPooler != nil &&
 | 
					 | 
				
			||||||
			*newSpec.Spec.EnableReplicaConnectionPooler == true {
 | 
					 | 
				
			||||||
			if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, Replica); err != nil {
 | 
					 | 
				
			||||||
				c.logger.Errorf("could not sync connection pooler: %v", err)
 | 
					 | 
				
			||||||
				return reason, err
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if oldNeedConnectionPooler && !newNeedConnectionPooler {
 | 
							if oldNeedConnectionPooler && !newNeedConnectionPooler {
 | 
				
			||||||
			// delete and cleanup resources
 | 
								// delete and cleanup resources
 | 
				
			||||||
		for _, role := range [2]PostgresRole{Master, Replica} {
 | 
								if role == Master {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									if c.ConnectionPooler != nil &&
 | 
				
			||||||
 | 
										(c.ConnectionPooler.Deployment != nil ||
 | 
				
			||||||
 | 
											c.ConnectionPooler.Service != nil) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					if err = c.deleteConnectionPooler(role); err != nil {
 | 
										if err = c.deleteConnectionPooler(role); err != nil {
 | 
				
			||||||
						c.logger.Warningf("could not remove connection pooler: %v", err)
 | 
											c.logger.Warningf("could not remove connection pooler: %v", err)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									if c.ConnectionPooler != nil &&
 | 
				
			||||||
 | 
										(c.ConnectionPooler.ReplDeployment != nil ||
 | 
				
			||||||
 | 
											c.ConnectionPooler.ReplService != nil) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
										if err = c.deleteConnectionPooler(role); err != nil {
 | 
				
			||||||
 | 
											c.logger.Warningf("could not remove connection pooler: %v", err)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if !oldNeedConnectionPooler && !newNeedConnectionPooler {
 | 
							if !oldNeedConnectionPooler && !newNeedConnectionPooler {
 | 
				
			||||||
			// delete and cleanup resources if not empty
 | 
								// delete and cleanup resources if not empty
 | 
				
			||||||
 | 
								if role == Master {
 | 
				
			||||||
				if c.ConnectionPooler != nil &&
 | 
									if c.ConnectionPooler != nil &&
 | 
				
			||||||
					(c.ConnectionPooler.Deployment != nil ||
 | 
										(c.ConnectionPooler.Deployment != nil ||
 | 
				
			||||||
						c.ConnectionPooler.Service != nil) {
 | 
											c.ConnectionPooler.Service != nil) {
 | 
				
			||||||
			for _, role := range [2]PostgresRole{Master, Replica} {
 | 
					
 | 
				
			||||||
					if err = c.deleteConnectionPooler(role); err != nil {
 | 
										if err = c.deleteConnectionPooler(role); err != nil {
 | 
				
			||||||
						c.logger.Warningf("could not remove connection pooler: %v", err)
 | 
											c.logger.Warningf("could not remove connection pooler: %v", err)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									if c.ConnectionPooler != nil &&
 | 
				
			||||||
 | 
										(c.ConnectionPooler.ReplDeployment != nil ||
 | 
				
			||||||
 | 
											c.ConnectionPooler.ReplService != nil) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
										if err = c.deleteConnectionPooler(role); err != nil {
 | 
				
			||||||
 | 
											c.logger.Warningf("could not remove connection pooler: %v", err)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -954,13 +983,20 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
 | 
				
			||||||
			return NoSync, err
 | 
								return NoSync, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if role == Master {
 | 
				
			||||||
			c.ConnectionPooler.Deployment = deployment
 | 
								c.ConnectionPooler.Deployment = deployment
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								c.ConnectionPooler.ReplDeployment = deployment
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	} else if err != nil {
 | 
						} else if err != nil {
 | 
				
			||||||
		msg := "could not get connection pooler deployment to sync: %v"
 | 
							msg := "could not get connection pooler deployment to sync: %v"
 | 
				
			||||||
		return NoSync, fmt.Errorf(msg, err)
 | 
							return NoSync, fmt.Errorf(msg, err)
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
 | 
							if role == Master {
 | 
				
			||||||
			c.ConnectionPooler.Deployment = deployment
 | 
								c.ConnectionPooler.Deployment = deployment
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								c.ConnectionPooler.ReplDeployment = deployment
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		// actual synchronization
 | 
							// actual synchronization
 | 
				
			||||||
		oldConnectionPooler := oldSpec.Spec.ConnectionPooler
 | 
							oldConnectionPooler := oldSpec.Spec.ConnectionPooler
 | 
				
			||||||
		newConnectionPooler := newSpec.Spec.ConnectionPooler
 | 
							newConnectionPooler := newSpec.Spec.ConnectionPooler
 | 
				
			||||||
| 
						 | 
					@ -983,6 +1019,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
 | 
				
			||||||
		specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler)
 | 
							specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler)
 | 
				
			||||||
		defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment)
 | 
							defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment)
 | 
				
			||||||
		reason := append(specReason, defaultsReason...)
 | 
							reason := append(specReason, defaultsReason...)
 | 
				
			||||||
 | 
							c.logger.Warningf("role and reason %v, %v", role, reason)
 | 
				
			||||||
		if specSync || defaultsSync {
 | 
							if specSync || defaultsSync {
 | 
				
			||||||
			c.logger.Infof("Update connection pooler deployment %s, reason: %+v",
 | 
								c.logger.Infof("Update connection pooler deployment %s, reason: %+v",
 | 
				
			||||||
				c.connectionPoolerName(role), reason)
 | 
									c.connectionPoolerName(role), reason)
 | 
				
			||||||
| 
						 | 
					@ -1002,8 +1039,11 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				return reason, err
 | 
									return reason, err
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								if role == Master {
 | 
				
			||||||
				c.ConnectionPooler.Deployment = deployment
 | 
									c.ConnectionPooler.Deployment = deployment
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									c.ConnectionPooler.ReplDeployment = deployment
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
			return reason, nil
 | 
								return reason, nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -1029,14 +1069,22 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return NoSync, err
 | 
								return NoSync, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							if role == Master {
 | 
				
			||||||
			c.ConnectionPooler.Service = service
 | 
								c.ConnectionPooler.Service = service
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								c.ConnectionPooler.ReplService = service
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	} else if err != nil {
 | 
						} else if err != nil {
 | 
				
			||||||
		msg := "could not get connection pooler service to sync: %v"
 | 
							msg := "could not get connection pooler service to sync: %v"
 | 
				
			||||||
		return NoSync, fmt.Errorf(msg, err)
 | 
							return NoSync, fmt.Errorf(msg, err)
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		// Service updates are not supported and probably not that useful anyway
 | 
							// Service updates are not supported and probably not that useful anyway
 | 
				
			||||||
 | 
							if role == Master {
 | 
				
			||||||
			c.ConnectionPooler.Service = service
 | 
								c.ConnectionPooler.Service = service
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								c.ConnectionPooler.ReplService = service
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return NoSync, nil
 | 
						return NoSync, nil
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -32,11 +32,11 @@ func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error {
 | 
				
			||||||
		return fmt.Errorf("Connection pooler resources are empty")
 | 
							return fmt.Errorf("Connection pooler resources are empty")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if cluster.ConnectionPooler.Deployment == nil {
 | 
						if cluster.ConnectionPooler.Deployment == nil && cluster.ConnectionPooler.ReplDeployment == nil {
 | 
				
			||||||
		return fmt.Errorf("Deployment was not saved")
 | 
							return fmt.Errorf("Deployment was not saved")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if cluster.ConnectionPooler.Service == nil {
 | 
						if cluster.ConnectionPooler.Service == nil && cluster.ConnectionPooler.ReplService == nil {
 | 
				
			||||||
		return fmt.Errorf("Service was not saved")
 | 
							return fmt.Errorf("Service was not saved")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -51,6 +51,24 @@ func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func OnlyMasterDeleted(cluster *Cluster, err error, reason SyncReason) error {
 | 
				
			||||||
 | 
						if cluster.ConnectionPooler != nil &&
 | 
				
			||||||
 | 
							(cluster.ConnectionPooler.Deployment != nil && cluster.ConnectionPooler.Service != nil) {
 | 
				
			||||||
 | 
							return fmt.Errorf("Connection pooler master was not deleted")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func OnlyReplicaDeleted(cluster *Cluster, err error, reason SyncReason) error {
 | 
				
			||||||
 | 
						if cluster.ConnectionPooler != nil &&
 | 
				
			||||||
 | 
							(cluster.ConnectionPooler.ReplDeployment != nil && cluster.ConnectionPooler.ReplService != nil) {
 | 
				
			||||||
 | 
							return fmt.Errorf("Connection pooler replica was not deleted")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func noEmptySync(cluster *Cluster, err error, reason SyncReason) error {
 | 
					func noEmptySync(cluster *Cluster, err error, reason SyncReason) error {
 | 
				
			||||||
	for _, msg := range reason {
 | 
						for _, msg := range reason {
 | 
				
			||||||
		if strings.HasPrefix(msg, "update [] from '<nil>' to '") {
 | 
							if strings.HasPrefix(msg, "update [] from '<nil>' to '") {
 | 
				
			||||||
| 
						 | 
					@ -102,6 +120,12 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
 | 
				
			||||||
		Deployment: &appsv1.Deployment{},
 | 
							Deployment: &appsv1.Deployment{},
 | 
				
			||||||
		Service:    &v1.Service{},
 | 
							Service:    &v1.Service{},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						clusterReplicaDirtyMock := newCluster()
 | 
				
			||||||
 | 
						clusterReplicaDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient()
 | 
				
			||||||
 | 
						clusterReplicaDirtyMock.ConnectionPooler = &ConnectionPoolerObjects{
 | 
				
			||||||
 | 
							ReplDeployment: &appsv1.Deployment{},
 | 
				
			||||||
 | 
							ReplService:    &v1.Service{},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	clusterNewDefaultsMock := newCluster()
 | 
						clusterNewDefaultsMock := newCluster()
 | 
				
			||||||
	clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient()
 | 
						clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient()
 | 
				
			||||||
| 
						 | 
					@ -147,6 +171,21 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
 | 
				
			||||||
			defaultInstances: 1,
 | 
								defaultInstances: 1,
 | 
				
			||||||
			check:            objectsAreSaved,
 | 
								check:            objectsAreSaved,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								subTest: "create replica if doesn't exist with a flag",
 | 
				
			||||||
 | 
								oldSpec: &acidv1.Postgresql{
 | 
				
			||||||
 | 
									Spec: acidv1.PostgresSpec{},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								newSpec: &acidv1.Postgresql{
 | 
				
			||||||
 | 
									Spec: acidv1.PostgresSpec{
 | 
				
			||||||
 | 
										EnableReplicaConnectionPooler: boolToPointer(true),
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								cluster:          clusterReplicaDirtyMock,
 | 
				
			||||||
 | 
								defaultImage:     "pooler:1.0",
 | 
				
			||||||
 | 
								defaultInstances: 1,
 | 
				
			||||||
 | 
								check:            objectsAreSaved,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			subTest: "create from scratch",
 | 
								subTest: "create from scratch",
 | 
				
			||||||
			oldSpec: &acidv1.Postgresql{
 | 
								oldSpec: &acidv1.Postgresql{
 | 
				
			||||||
| 
						 | 
					@ -177,6 +216,43 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
 | 
				
			||||||
			defaultInstances: 1,
 | 
								defaultInstances: 1,
 | 
				
			||||||
			check:            objectsAreDeleted,
 | 
								check:            objectsAreDeleted,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								subTest: "delete only master if not needed",
 | 
				
			||||||
 | 
								oldSpec: &acidv1.Postgresql{
 | 
				
			||||||
 | 
									Spec: acidv1.PostgresSpec{
 | 
				
			||||||
 | 
										ConnectionPooler: &acidv1.ConnectionPooler{},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								newSpec: &acidv1.Postgresql{
 | 
				
			||||||
 | 
									Spec: acidv1.PostgresSpec{
 | 
				
			||||||
 | 
										ConnectionPooler:              &acidv1.ConnectionPooler{},
 | 
				
			||||||
 | 
										EnableReplicaConnectionPooler: boolToPointer(true),
 | 
				
			||||||
 | 
										EnableConnectionPooler:        boolToPointer(false),
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								cluster:          clusterMock,
 | 
				
			||||||
 | 
								defaultImage:     "pooler:1.0",
 | 
				
			||||||
 | 
								defaultInstances: 1,
 | 
				
			||||||
 | 
								check:            OnlyMasterDeleted,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								subTest: "delete only replica if not needed",
 | 
				
			||||||
 | 
								oldSpec: &acidv1.Postgresql{
 | 
				
			||||||
 | 
									Spec: acidv1.PostgresSpec{
 | 
				
			||||||
 | 
										ConnectionPooler:              &acidv1.ConnectionPooler{},
 | 
				
			||||||
 | 
										EnableReplicaConnectionPooler: boolToPointer(true),
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								newSpec: &acidv1.Postgresql{
 | 
				
			||||||
 | 
									Spec: acidv1.PostgresSpec{
 | 
				
			||||||
 | 
										ConnectionPooler: &acidv1.ConnectionPooler{},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								cluster:          clusterMock,
 | 
				
			||||||
 | 
								defaultImage:     "pooler:1.0",
 | 
				
			||||||
 | 
								defaultInstances: 1,
 | 
				
			||||||
 | 
								check:            OnlyReplicaDeleted,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			subTest: "cleanup if still there",
 | 
								subTest: "cleanup if still there",
 | 
				
			||||||
			oldSpec: &acidv1.Postgresql{
 | 
								oldSpec: &acidv1.Postgresql{
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -519,21 +519,21 @@ func (c *Cluster) patroniKubernetesUseConfigMaps() bool {
 | 
				
			||||||
	return c.OpConfig.KubernetesUseConfigMaps
 | 
						return c.OpConfig.KubernetesUseConfigMaps
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cluster) needConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
 | 
					// isConnectionPoolerEnabled
 | 
				
			||||||
	if spec.EnableConnectionPooler != nil {
 | 
					func (c *Cluster) needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
 | 
				
			||||||
		return *spec.EnableConnectionPooler
 | 
						return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil)
 | 
				
			||||||
	} else if spec.EnableReplicaConnectionPooler != nil {
 | 
					 | 
				
			||||||
		return *spec.EnableReplicaConnectionPooler
 | 
					 | 
				
			||||||
	} else if spec.ConnectionPooler == nil {
 | 
					 | 
				
			||||||
		return spec.ConnectionPooler != nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// if the connectionPooler section is there, then we enable even though the
 | 
					 | 
				
			||||||
	// flags are not there
 | 
					 | 
				
			||||||
	return true
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cluster) needConnectionPooler() bool {
 | 
					func (c *Cluster) needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
 | 
				
			||||||
	return c.needConnectionPoolerWorker(&c.Spec)
 | 
						return nil != spec.EnableReplicaConnectionPooler && *spec.EnableReplicaConnectionPooler
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Cluster) needMasterConnectionPooler() bool {
 | 
				
			||||||
 | 
						return c.needMasterConnectionPoolerWorker(&c.Spec)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Cluster) needReplicaConnectionPooler() bool {
 | 
				
			||||||
 | 
						return c.needReplicaConnectionPoolerWorker(&c.Spec)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Earlier arguments take priority
 | 
					// Earlier arguments take priority
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue