From b3dbac5b8167e4aca144871a1afd9ff352eccb24 Mon Sep 17 00:00:00 2001 From: Rafia Sabih Date: Fri, 4 Sep 2020 17:26:54 +0200 Subject: [PATCH] Adding test cases and other changes - Refactor needConnectionPooler for master and replica separately - Improve sync function - Add test cases to create, delete and sync with repplica connection pooler Other changes --- pkg/cluster/cluster.go | 6 +- pkg/cluster/resources.go | 22 ++++- pkg/cluster/resources_test.go | 72 +++++++++++++- pkg/cluster/sync.go | 178 +++++++++++++++++++++------------- pkg/cluster/sync_test.go | 80 ++++++++++++++- pkg/cluster/util.go | 26 ++--- 6 files changed, 293 insertions(+), 91 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d982c79e2..dfaf8a01c 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -348,7 +348,7 @@ func (c *Cluster) Create() error { // // Do not consider connection pooler as a strict requirement, and if // something fails, report warning - if c.needConnectionPooler() { + if c.needMasterConnectionPooler() || c.needReplicaConnectionPooler() { if c.ConnectionPooler != nil { c.logger.Warning("Connection pooler already exists in the cluster") return nil @@ -641,7 +641,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // initUsers. Check if it needs to be called. sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) && reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) - needConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec) + needConnectionPooler := c.needMasterConnectionPoolerWorker(&newSpec.Spec) if !sameUsers || needConnectionPooler { c.logger.Debugf("syncing secrets") if err := c.initUsers(); err != nil { @@ -906,7 +906,7 @@ func (c *Cluster) initSystemUsers() { // Connection pooler user is an exception, if requested it's going to be // created by operator as a normal pgUser - if c.needConnectionPooler() { + if c.needMasterConnectionPooler() || c.needReplicaConnectionPooler() { // initialize empty connection pooler if not done yet if c.Spec.ConnectionPooler == nil { c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index fb0366bfe..207ed4570 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -126,7 +126,8 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo msg = "could not prepare database for connection pooler: %v" return nil, fmt.Errorf(msg, err) } - if c.Spec.EnableConnectionPooler != nil || c.ConnectionPooler != nil { + if c.needMasterConnectionPooler() { + deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Master) if err != nil { msg = "could not generate deployment for connection pooler: %v" @@ -158,9 +159,11 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo } c.logger.Debugf("created new connection pooler %q, uid: %q", util.NameFromMeta(deployment.ObjectMeta), deployment.UID) + } - if c.Spec.EnableReplicaConnectionPooler != nil && *c.Spec.EnableReplicaConnectionPooler == true { + if c.needReplicaConnectionPooler() { + repldeploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Replica) if err != nil { msg = "could not generate deployment for connection pooler: %v" @@ -192,6 +195,7 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo } c.logger.Debugf("created new connection pooler for replica %q, uid: %q", util.NameFromMeta(repldeployment.ObjectMeta), repldeployment.UID) + } return c.ConnectionPooler, nil @@ -211,8 +215,13 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { // Clean up the deployment object. If deployment resource we've remembered // is somehow empty, try to delete based on what would we generate deploymentName := c.connectionPoolerName(role) - deployment := c.ConnectionPooler.Deployment + var deployment *appsv1.Deployment + if role == Master { + deployment = c.ConnectionPooler.Deployment + } else { + deployment = c.ConnectionPooler.ReplDeployment + } if deployment != nil { deploymentName = deployment.Name } @@ -234,7 +243,12 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { c.logger.Infof("Connection pooler deployment %q has been deleted", deploymentName) // Repeat the same for the service object - service := c.ConnectionPooler.Service + var service *v1.Service + if role == Master { + service = c.ConnectionPooler.Service + } else { + service = c.ConnectionPooler.ReplService + } serviceName := c.connectionPoolerName(role) if service != nil { diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go index 144d8a051..45d2f2fd6 100644 --- a/pkg/cluster/resources_test.go +++ b/pkg/cluster/resources_test.go @@ -66,6 +66,31 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) { if err != nil { t.Errorf("%s: Cannot delete connection pooler, %s", testName, err) } + + //Check if Replica connection pooler can be create and deleted successfully + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + replpoolerResources, err := cluster.createConnectionPooler(mockInstallLookupFunction) + + if err != nil { + t.Errorf("%s: Cannot create replica connection pooler, %s, %+v", + testName, err, replpoolerResources) + } + + if replpoolerResources.ReplDeployment == nil { + t.Errorf("%s: Connection pooler replica deployment is empty", testName) + } + + if replpoolerResources.ReplService == nil { + t.Errorf("%s: Connection pooler replica service is empty", testName) + } + + err = cluster.deleteConnectionPooler(Replica) + if err != nil { + t.Errorf("%s: Cannot delete replica connection pooler, %s", testName, err) + } } func TestNeedConnectionPooler(t *testing.T) { @@ -91,7 +116,7 @@ func TestNeedConnectionPooler(t *testing.T) { ConnectionPooler: &acidv1.ConnectionPooler{}, } - if !cluster.needConnectionPooler() { + if !cluster.needMasterConnectionPooler() { t.Errorf("%s: Connection pooler is not enabled with full definition", testName) } @@ -100,7 +125,7 @@ func TestNeedConnectionPooler(t *testing.T) { EnableConnectionPooler: boolToPointer(true), } - if !cluster.needConnectionPooler() { + if !cluster.needMasterConnectionPooler() { t.Errorf("%s: Connection pooler is not enabled with flag", testName) } @@ -110,7 +135,7 @@ func TestNeedConnectionPooler(t *testing.T) { ConnectionPooler: &acidv1.ConnectionPooler{}, } - if cluster.needConnectionPooler() { + if cluster.needMasterConnectionPooler() { t.Errorf("%s: Connection pooler is still enabled with flag being false", testName) } @@ -120,8 +145,47 @@ func TestNeedConnectionPooler(t *testing.T) { ConnectionPooler: &acidv1.ConnectionPooler{}, } - if !cluster.needConnectionPooler() { + if !cluster.needMasterConnectionPooler() { t.Errorf("%s: Connection pooler is not enabled with flag and full", testName) } + + // Test for replica connection pooler + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if cluster.needReplicaConnectionPooler() { + t.Errorf("%s: Replica Connection pooler is not enabled with full definition", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + } + + if !cluster.needReplicaConnectionPooler() { + t.Errorf("%s: Replica Connection pooler is not enabled with flag", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(false), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if cluster.needReplicaConnectionPooler() { + t.Errorf("%s: Replica Connection pooler is still enabled with flag being false", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if !cluster.needReplicaConnectionPooler() { + t.Errorf("%s: Replica Connection pooler is not enabled with flag and full", + testName) + } } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index f2df8771d..de4655074 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -560,7 +560,7 @@ func (c *Cluster) syncRoles() (err error) { userNames = append(userNames, u.Name) } - if c.needConnectionPooler() { + if c.needMasterConnectionPooler() || c.needReplicaConnectionPooler() { connectionPoolerUser := c.systemUsers[constants.ConnectionPoolerUserKeyName] userNames = append(userNames, connectionPoolerUser.Name) @@ -845,78 +845,107 @@ func (c *Cluster) syncConnectionPooler(oldSpec, var reason SyncReason var err error + var newNeedConnectionPooler, oldNeedConnectionPooler bool if c.ConnectionPooler == nil { c.ConnectionPooler = &ConnectionPoolerObjects{} } - newNeedConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec) - oldNeedConnectionPooler := c.needConnectionPoolerWorker(&oldSpec.Spec) - - if newNeedConnectionPooler { - // Try to sync in any case. If we didn't needed connection pooler before, - // it means we want to create it. If it was already present, still sync - // since it could happen that there is no difference in specs, and all - // the resources are remembered, but the deployment was manually deleted - // in between - c.logger.Debug("syncing connection pooler") - - // in this case also do not forget to install lookup function as for - // creating cluster - if !oldNeedConnectionPooler || !c.ConnectionPooler.LookupFunction { - newConnectionPooler := newSpec.Spec.ConnectionPooler - - specSchema := "" - specUser := "" - - if newConnectionPooler != nil { - specSchema = newConnectionPooler.Schema - specUser = newConnectionPooler.User - } - - schema := util.Coalesce( - specSchema, - c.OpConfig.ConnectionPooler.Schema) - - user := util.Coalesce( - specUser, - c.OpConfig.ConnectionPooler.User) - - if err = lookup(schema, user); err != nil { - return NoSync, err - } + for _, role := range [2]PostgresRole{Master, Replica} { + if role == Master { + newNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&newSpec.Spec) + oldNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&oldSpec.Spec) + } else { + newNeedConnectionPooler = c.needReplicaConnectionPoolerWorker(&newSpec.Spec) + oldNeedConnectionPooler = c.needReplicaConnectionPoolerWorker(&oldSpec.Spec) } - if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, Master); err != nil { - c.logger.Errorf("could not sync connection pooler: %v", err) - return reason, err - } - if newSpec.Spec.EnableReplicaConnectionPooler != nil && - *newSpec.Spec.EnableReplicaConnectionPooler == true { - if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, Replica); err != nil { + if newNeedConnectionPooler { + // Try to sync in any case. If we didn't needed connection pooler before, + // it means we want to create it. If it was already present, still sync + // since it could happen that there is no difference in specs, and all + // the resources are remembered, but the deployment was manually deleted + // in between + c.logger.Debug("syncing connection pooler") + + // in this case also do not forget to install lookup function as for + // creating cluster + if !oldNeedConnectionPooler || !c.ConnectionPooler.LookupFunction { + newConnectionPooler := newSpec.Spec.ConnectionPooler + + specSchema := "" + specUser := "" + + if newConnectionPooler != nil { + specSchema = newConnectionPooler.Schema + specUser = newConnectionPooler.User + } + + schema := util.Coalesce( + specSchema, + c.OpConfig.ConnectionPooler.Schema) + + user := util.Coalesce( + specUser, + c.OpConfig.ConnectionPooler.User) + + if err = lookup(schema, user); err != nil { + return NoSync, err + } + } + + if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, role); err != nil { c.logger.Errorf("could not sync connection pooler: %v", err) return reason, err } } - } - if oldNeedConnectionPooler && !newNeedConnectionPooler { - // delete and cleanup resources - for _, role := range [2]PostgresRole{Master, Replica} { - if err = c.deleteConnectionPooler(role); err != nil { - c.logger.Warningf("could not remove connection pooler: %v", err) + if oldNeedConnectionPooler && !newNeedConnectionPooler { + // delete and cleanup resources + if role == Master { + + if c.ConnectionPooler != nil && + (c.ConnectionPooler.Deployment != nil || + c.ConnectionPooler.Service != nil) { + + if err = c.deleteConnectionPooler(role); err != nil { + c.logger.Warningf("could not remove connection pooler: %v", err) + } + } + + } else { + + if c.ConnectionPooler != nil && + (c.ConnectionPooler.ReplDeployment != nil || + c.ConnectionPooler.ReplService != nil) { + + if err = c.deleteConnectionPooler(role); err != nil { + c.logger.Warningf("could not remove connection pooler: %v", err) + } + } } } - } - if !oldNeedConnectionPooler && !newNeedConnectionPooler { - // delete and cleanup resources if not empty - if c.ConnectionPooler != nil && - (c.ConnectionPooler.Deployment != nil || - c.ConnectionPooler.Service != nil) { - for _, role := range [2]PostgresRole{Master, Replica} { - if err = c.deleteConnectionPooler(role); err != nil { - c.logger.Warningf("could not remove connection pooler: %v", err) + if !oldNeedConnectionPooler && !newNeedConnectionPooler { + // delete and cleanup resources if not empty + if role == Master { + if c.ConnectionPooler != nil && + (c.ConnectionPooler.Deployment != nil || + c.ConnectionPooler.Service != nil) { + + if err = c.deleteConnectionPooler(role); err != nil { + c.logger.Warningf("could not remove connection pooler: %v", err) + } + + } + } else { + if c.ConnectionPooler != nil && + (c.ConnectionPooler.ReplDeployment != nil || + c.ConnectionPooler.ReplService != nil) { + + if err = c.deleteConnectionPooler(role); err != nil { + c.logger.Warningf("could not remove connection pooler: %v", err) + } } } } @@ -954,13 +983,20 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql return NoSync, err } - c.ConnectionPooler.Deployment = deployment + if role == Master { + c.ConnectionPooler.Deployment = deployment + } else { + c.ConnectionPooler.ReplDeployment = deployment + } } else if err != nil { msg := "could not get connection pooler deployment to sync: %v" return NoSync, fmt.Errorf(msg, err) } else { - c.ConnectionPooler.Deployment = deployment - + if role == Master { + c.ConnectionPooler.Deployment = deployment + } else { + c.ConnectionPooler.ReplDeployment = deployment + } // actual synchronization oldConnectionPooler := oldSpec.Spec.ConnectionPooler newConnectionPooler := newSpec.Spec.ConnectionPooler @@ -983,6 +1019,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment) reason := append(specReason, defaultsReason...) + c.logger.Warningf("role and reason %v, %v", role, reason) if specSync || defaultsSync { c.logger.Infof("Update connection pooler deployment %s, reason: %+v", c.connectionPoolerName(role), reason) @@ -1002,8 +1039,11 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return reason, err } - - c.ConnectionPooler.Deployment = deployment + if role == Master { + c.ConnectionPooler.Deployment = deployment + } else { + c.ConnectionPooler.ReplDeployment = deployment + } return reason, nil } } @@ -1029,14 +1069,22 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return NoSync, err } + if role == Master { + c.ConnectionPooler.Service = service + } else { + c.ConnectionPooler.ReplService = service + } - c.ConnectionPooler.Service = service } else if err != nil { msg := "could not get connection pooler service to sync: %v" return NoSync, fmt.Errorf(msg, err) } else { // Service updates are not supported and probably not that useful anyway - c.ConnectionPooler.Service = service + if role == Master { + c.ConnectionPooler.Service = service + } else { + c.ConnectionPooler.ReplService = service + } } return NoSync, nil diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index d9248ae33..af8e928c5 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -32,11 +32,11 @@ func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { return fmt.Errorf("Connection pooler resources are empty") } - if cluster.ConnectionPooler.Deployment == nil { + if cluster.ConnectionPooler.Deployment == nil && cluster.ConnectionPooler.ReplDeployment == nil { return fmt.Errorf("Deployment was not saved") } - if cluster.ConnectionPooler.Service == nil { + if cluster.ConnectionPooler.Service == nil && cluster.ConnectionPooler.ReplService == nil { return fmt.Errorf("Service was not saved") } @@ -51,6 +51,24 @@ func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error { return nil } +func OnlyMasterDeleted(cluster *Cluster, err error, reason SyncReason) error { + if cluster.ConnectionPooler != nil && + (cluster.ConnectionPooler.Deployment != nil && cluster.ConnectionPooler.Service != nil) { + return fmt.Errorf("Connection pooler master was not deleted") + } + + return nil +} + +func OnlyReplicaDeleted(cluster *Cluster, err error, reason SyncReason) error { + if cluster.ConnectionPooler != nil && + (cluster.ConnectionPooler.ReplDeployment != nil && cluster.ConnectionPooler.ReplService != nil) { + return fmt.Errorf("Connection pooler replica was not deleted") + } + + return nil +} + func noEmptySync(cluster *Cluster, err error, reason SyncReason) error { for _, msg := range reason { if strings.HasPrefix(msg, "update [] from '' to '") { @@ -102,6 +120,12 @@ func TestConnectionPoolerSynchronization(t *testing.T) { Deployment: &appsv1.Deployment{}, Service: &v1.Service{}, } + clusterReplicaDirtyMock := newCluster() + clusterReplicaDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() + clusterReplicaDirtyMock.ConnectionPooler = &ConnectionPoolerObjects{ + ReplDeployment: &appsv1.Deployment{}, + ReplService: &v1.Service{}, + } clusterNewDefaultsMock := newCluster() clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() @@ -147,6 +171,21 @@ func TestConnectionPoolerSynchronization(t *testing.T) { defaultInstances: 1, check: objectsAreSaved, }, + { + subTest: "create replica if doesn't exist with a flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + }, + }, + cluster: clusterReplicaDirtyMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreSaved, + }, { subTest: "create from scratch", oldSpec: &acidv1.Postgresql{ @@ -177,6 +216,43 @@ func TestConnectionPoolerSynchronization(t *testing.T) { defaultInstances: 1, check: objectsAreDeleted, }, + { + subTest: "delete only master if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + EnableConnectionPooler: boolToPointer(false), + }, + }, + cluster: clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: OnlyMasterDeleted, + }, + { + subTest: "delete only replica if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: OnlyReplicaDeleted, + }, { subTest: "cleanup if still there", oldSpec: &acidv1.Postgresql{ diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 6ffe9899b..332874fe1 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -519,21 +519,21 @@ func (c *Cluster) patroniKubernetesUseConfigMaps() bool { return c.OpConfig.KubernetesUseConfigMaps } -func (c *Cluster) needConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { - if spec.EnableConnectionPooler != nil { - return *spec.EnableConnectionPooler - } else if spec.EnableReplicaConnectionPooler != nil { - return *spec.EnableReplicaConnectionPooler - } else if spec.ConnectionPooler == nil { - return spec.ConnectionPooler != nil - } - // if the connectionPooler section is there, then we enable even though the - // flags are not there - return true +// isConnectionPoolerEnabled +func (c *Cluster) needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { + return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) } -func (c *Cluster) needConnectionPooler() bool { - return c.needConnectionPoolerWorker(&c.Spec) +func (c *Cluster) needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { + return nil != spec.EnableReplicaConnectionPooler && *spec.EnableReplicaConnectionPooler +} + +func (c *Cluster) needMasterConnectionPooler() bool { + return c.needMasterConnectionPoolerWorker(&c.Spec) +} + +func (c *Cluster) needReplicaConnectionPooler() bool { + return c.needReplicaConnectionPoolerWorker(&c.Spec) } // Earlier arguments take priority