diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 8b7b63e31..bb4c8a7b4 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -107,6 +107,8 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo if c.ConnectionPooler == nil { c.ConnectionPooler = &ConnectionPoolerObjects{} + c.ConnectionPooler.Deployment = make(map[PostgresRole]*appsv1.Deployment) + c.ConnectionPooler.Service = make(map[PostgresRole]*v1.Service) } schema := c.Spec.ConnectionPooler.Schema diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go index 45d2f2fd6..60755b909 100644 --- a/pkg/cluster/resources_test.go +++ b/pkg/cluster/resources_test.go @@ -54,42 +54,19 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) { testName, err, poolerResources) } - if poolerResources.Deployment == nil { - t.Errorf("%s: Connection pooler deployment is empty", testName) - } + for _, role := range cluster.RolesConnectionPooler() { + if poolerResources.Deployment[role] == nil { + t.Errorf("%s: Connection pooler deployment is empty for role %s", testName, role) + } - if poolerResources.Service == nil { - t.Errorf("%s: Connection pooler service is empty", testName) - } + if poolerResources.Service[role] == nil { + t.Errorf("%s: Connection pooler service is empty for role %s", testName, role) + } - err = cluster.deleteConnectionPooler(Master) - if err != nil { - t.Errorf("%s: Cannot delete connection pooler, %s", testName, err) - } - - //Check if Replica connection pooler can be create and deleted successfully - cluster.Spec = acidv1.PostgresSpec{ - EnableReplicaConnectionPooler: boolToPointer(true), - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - replpoolerResources, err := cluster.createConnectionPooler(mockInstallLookupFunction) - - if err != nil { - t.Errorf("%s: Cannot create replica connection pooler, %s, %+v", - testName, err, replpoolerResources) - } - - if replpoolerResources.ReplDeployment == nil { - t.Errorf("%s: Connection pooler replica deployment is empty", testName) - } - - if replpoolerResources.ReplService == nil { - t.Errorf("%s: Connection pooler replica service is empty", testName) - } - - err = cluster.deleteConnectionPooler(Replica) - if err != nil { - t.Errorf("%s: Cannot delete replica connection pooler, %s", testName, err) + err = cluster.deleteConnectionPooler(role) + if err != nil { + t.Errorf("%s: Cannot delete connection pooler, %s", testName, err) + } } } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 90fd08dac..f8b189f4a 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -848,7 +848,7 @@ func (c *Cluster) syncConnectionPooler(oldSpec, var newNeedConnectionPooler, oldNeedConnectionPooler bool // Check and perform the sync requirements for each of the roles. - for _, role := range c.RolesConnectionPooler() { + for _, role := range [2]PostgresRole{Master, Replica} { if role == Master { newNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&newSpec.Spec) oldNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&oldSpec.Spec) @@ -904,13 +904,32 @@ func (c *Cluster) syncConnectionPooler(oldSpec, if oldNeedConnectionPooler && !newNeedConnectionPooler { // delete and cleanup resources + otherRole := role + if len(c.RolesConnectionPooler()) == 2 { + if role == Master { + otherRole = Replica + } else { + otherRole = Master + } + } if err = c.deleteConnectionPooler(role); err != nil { c.logger.Warningf("could not remove connection pooler: %v", err) } + if c.ConnectionPooler != nil && c.ConnectionPooler.Deployment[otherRole] == nil && c.ConnectionPooler.Service[otherRole] == nil { + c.ConnectionPooler = nil + } } if !oldNeedConnectionPooler && !newNeedConnectionPooler { // delete and cleanup resources if not empty + otherRole := role + if len(c.RolesConnectionPooler()) == 2 { + if role == Master { + otherRole = Replica + } else { + otherRole = Master + } + } if c.ConnectionPooler != nil && (c.ConnectionPooler.Deployment[role] != nil || c.ConnectionPooler.Service[role] != nil) { @@ -918,6 +937,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, if err = c.deleteConnectionPooler(role); err != nil { c.logger.Warningf("could not remove connection pooler: %v", err) } + } else if c.ConnectionPooler.Deployment[otherRole] == nil && c.ConnectionPooler.Service[otherRole] == nil { + c.ConnectionPooler = nil } } } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 1a9132443..64aa03678 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -19,11 +19,13 @@ func int32ToPointer(value int32) *int32 { } func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler.Deployment.Spec.Replicas == nil || - *cluster.ConnectionPooler.Deployment.Spec.Replicas != 2 { - return fmt.Errorf("Wrong nubmer of instances") + for _, role := range cluster.RolesConnectionPooler() { + if cluster.ConnectionPooler.Deployment[role] != nil && + (cluster.ConnectionPooler.Deployment[role].Spec.Replicas == nil || + *cluster.ConnectionPooler.Deployment[role].Spec.Replicas != 2) { + return fmt.Errorf("Wrong number of instances") + } } - return nil } @@ -32,12 +34,16 @@ func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { return fmt.Errorf("Connection pooler resources are empty") } - if cluster.ConnectionPooler.Deployment == nil && cluster.ConnectionPooler.ReplDeployment == nil { - return fmt.Errorf("Deployment was not saved") - } + for _, role := range cluster.RolesConnectionPooler() { + if role != "" { + if cluster.ConnectionPooler.Deployment[role] == nil { + return fmt.Errorf("Deployment was not saved %s", role) + } - if cluster.ConnectionPooler.Service == nil && cluster.ConnectionPooler.ReplService == nil { - return fmt.Errorf("Service was not saved") + if cluster.ConnectionPooler.Service[role] == nil { + return fmt.Errorf("Service was not saved %s", role) + } + } } return nil @@ -52,20 +58,24 @@ func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error { } func OnlyMasterDeleted(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler != nil && - (cluster.ConnectionPooler.Deployment != nil && cluster.ConnectionPooler.Service != nil) { - return fmt.Errorf("Connection pooler master was not deleted") - } + for _, role := range cluster.RolesConnectionPooler() { + if cluster.ConnectionPooler != nil && + (cluster.ConnectionPooler.Deployment[role] != nil && cluster.ConnectionPooler.Service[role] != nil) { + return fmt.Errorf("Connection pooler master was not deleted") + } + } return nil } func OnlyReplicaDeleted(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler != nil && - (cluster.ConnectionPooler.ReplDeployment != nil && cluster.ConnectionPooler.ReplService != nil) { - return fmt.Errorf("Connection pooler replica was not deleted") - } + for _, role := range cluster.RolesConnectionPooler() { + if cluster.ConnectionPooler != nil && + (cluster.ConnectionPooler.Deployment[role] != nil && cluster.ConnectionPooler.Service[role] != nil) { + return fmt.Errorf("Connection pooler replica was not deleted") + } + } return nil } @@ -117,16 +127,20 @@ func TestConnectionPoolerSynchronization(t *testing.T) { clusterDirtyMock := newCluster() clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() clusterDirtyMock.ConnectionPooler = &ConnectionPoolerObjects{ - Deployment: &appsv1.Deployment{}, - Service: &v1.Service{}, + Deployment: make(map[PostgresRole]*appsv1.Deployment), + Service: make(map[PostgresRole]*v1.Service), } + clusterDirtyMock.ConnectionPooler.Deployment[Master] = &appsv1.Deployment{} + clusterDirtyMock.ConnectionPooler.Service[Master] = &v1.Service{} clusterReplicaDirtyMock := newCluster() clusterReplicaDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() clusterReplicaDirtyMock.ConnectionPooler = &ConnectionPoolerObjects{ - ReplDeployment: &appsv1.Deployment{}, - ReplService: &v1.Service{}, + Deployment: make(map[PostgresRole]*appsv1.Deployment), + Service: make(map[PostgresRole]*v1.Service), } + clusterDirtyMock.ConnectionPooler.Deployment[Replica] = &appsv1.Deployment{} + clusterDirtyMock.ConnectionPooler.Service[Replica] = &v1.Service{} clusterNewDefaultsMock := newCluster() clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index da82f36af..b9ce6dbe9 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -521,7 +521,7 @@ func (c *Cluster) patroniKubernetesUseConfigMaps() bool { // isConnectionPoolerEnabled func (c *Cluster) needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { - return (spec.EnableConnectionPooler != nil && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) + return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) } func (c *Cluster) needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { @@ -538,15 +538,13 @@ func (c *Cluster) needConnectionPooler() bool { // RolesConnectionPooler gives the list of roles which need connection pooler func (c *Cluster) RolesConnectionPooler() []PostgresRole { - roles := []PostgresRole{} - i := 0 + roles := make([]PostgresRole, 2) if c.needMasterConnectionPoolerWorker(&c.Spec) { - roles[i] = Master - i = i + 1 + roles = append(roles, Master) } if c.needMasterConnectionPoolerWorker(&c.Spec) { - roles[i] = Replica + roles = append(roles, Replica) } return roles }