improve pooler sync (#1593)

* remove role from installLookupFunction and run it on database sync, too
* fix condition to decide on syncing pooler
* trigger lookup from database sync only if pooler is set
* use empty spec everywhere and do not sync if one lookupfunction was passed
* do not sync pooler after being disabled
This commit is contained in:
Felix Kunde 2021-08-27 12:41:37 +02:00 committed by GitHub
parent 7469efac88
commit 62ed7e470f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 68 additions and 76 deletions

View File

@ -1006,9 +1006,9 @@ func (c *Cluster) initSystemUsers() {
// Connection pooler user is an exception, if requested it's going to be // Connection pooler user is an exception, if requested it's going to be
// created by operator as a normal pgUser // created by operator as a normal pgUser
if needConnectionPooler(&c.Spec) { if needConnectionPooler(&c.Spec) {
// initialize empty connection pooler if not done yet connectionPoolerSpec := c.Spec.ConnectionPooler
if c.Spec.ConnectionPooler == nil { if connectionPoolerSpec == nil {
c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{} connectionPoolerSpec = &acidv1.ConnectionPooler{}
} }
// Using superuser as pooler user is not a good idea. First of all it's // Using superuser as pooler user is not a good idea. First of all it's
@ -1016,13 +1016,13 @@ func (c *Cluster) initSystemUsers() {
// and second it's a bad practice. // and second it's a bad practice.
username := c.OpConfig.ConnectionPooler.User username := c.OpConfig.ConnectionPooler.User
isSuperUser := c.Spec.ConnectionPooler.User == c.OpConfig.SuperUsername isSuperUser := connectionPoolerSpec.User == c.OpConfig.SuperUsername
isProtectedUser := c.shouldAvoidProtectedOrSystemRole( isProtectedUser := c.shouldAvoidProtectedOrSystemRole(
c.Spec.ConnectionPooler.User, "connection pool role") connectionPoolerSpec.User, "connection pool role")
if !isSuperUser && !isProtectedUser { if !isSuperUser && !isProtectedUser {
username = util.Coalesce( username = util.Coalesce(
c.Spec.ConnectionPooler.User, connectionPoolerSpec.User,
c.OpConfig.ConnectionPooler.User) c.OpConfig.ConnectionPooler.User)
} }

View File

@ -3,6 +3,7 @@ package cluster
import ( import (
"context" "context"
"fmt" "fmt"
"reflect"
"strings" "strings"
"github.com/r3labs/diff" "github.com/r3labs/diff"
@ -60,7 +61,7 @@ func needMasterConnectionPooler(spec *acidv1.PostgresSpec) bool {
} }
func needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { func needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || return (spec.EnableConnectionPooler != nil && *spec.EnableConnectionPooler) ||
(spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil)
} }
@ -114,7 +115,7 @@ func (c *Cluster) createConnectionPooler(LookupFunction InstallFunction) (SyncRe
c.setProcessName("creating connection pooler") c.setProcessName("creating connection pooler")
//this is essentially sync with nil as oldSpec //this is essentially sync with nil as oldSpec
if reason, err := c.syncConnectionPooler(nil, &c.Postgresql, LookupFunction); err != nil { if reason, err := c.syncConnectionPooler(&acidv1.Postgresql{}, &c.Postgresql, LookupFunction); err != nil {
return reason, err return reason, err
} }
return reason, nil return reason, nil
@ -140,11 +141,15 @@ func (c *Cluster) createConnectionPooler(LookupFunction InstallFunction) (SyncRe
// RESERVE_SIZE is how many additional connections to allow for a pooler. // RESERVE_SIZE is how many additional connections to allow for a pooler.
func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar { func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar {
spec := &c.Spec spec := &c.Spec
connectionPoolerSpec := spec.ConnectionPooler
if connectionPoolerSpec == nil {
connectionPoolerSpec = &acidv1.ConnectionPooler{}
}
effectiveMode := util.Coalesce( effectiveMode := util.Coalesce(
spec.ConnectionPooler.Mode, connectionPoolerSpec.Mode,
c.OpConfig.ConnectionPooler.Mode) c.OpConfig.ConnectionPooler.Mode)
numberOfInstances := spec.ConnectionPooler.NumberOfInstances numberOfInstances := connectionPoolerSpec.NumberOfInstances
if numberOfInstances == nil { if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32( numberOfInstances = util.CoalesceInt32(
c.OpConfig.ConnectionPooler.NumberOfInstances, c.OpConfig.ConnectionPooler.NumberOfInstances,
@ -152,7 +157,7 @@ func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar {
} }
effectiveMaxDBConn := util.CoalesceInt32( effectiveMaxDBConn := util.CoalesceInt32(
spec.ConnectionPooler.MaxDBConnections, connectionPoolerSpec.MaxDBConnections,
c.OpConfig.ConnectionPooler.MaxDBConnections) c.OpConfig.ConnectionPooler.MaxDBConnections)
if effectiveMaxDBConn == nil { if effectiveMaxDBConn == nil {
@ -201,17 +206,21 @@ func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar {
func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) ( func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) (
*v1.PodTemplateSpec, error) { *v1.PodTemplateSpec, error) {
spec := &c.Spec spec := &c.Spec
connectionPoolerSpec := spec.ConnectionPooler
if connectionPoolerSpec == nil {
connectionPoolerSpec = &acidv1.ConnectionPooler{}
}
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
resources, err := generateResourceRequirements( resources, err := generateResourceRequirements(
spec.ConnectionPooler.Resources, connectionPoolerSpec.Resources,
makeDefaultConnectionPoolerResources(&c.OpConfig)) makeDefaultConnectionPoolerResources(&c.OpConfig))
effectiveDockerImage := util.Coalesce( effectiveDockerImage := util.Coalesce(
spec.ConnectionPooler.DockerImage, connectionPoolerSpec.DockerImage,
c.OpConfig.ConnectionPooler.Image) c.OpConfig.ConnectionPooler.Image)
effectiveSchema := util.Coalesce( effectiveSchema := util.Coalesce(
spec.ConnectionPooler.Schema, connectionPoolerSpec.Schema,
c.OpConfig.ConnectionPooler.Schema) c.OpConfig.ConnectionPooler.Schema)
if err != nil { if err != nil {
@ -220,7 +229,7 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) (
secretSelector := func(key string) *v1.SecretKeySelector { secretSelector := func(key string) *v1.SecretKeySelector {
effectiveUser := util.Coalesce( effectiveUser := util.Coalesce(
spec.ConnectionPooler.User, connectionPoolerSpec.User,
c.OpConfig.ConnectionPooler.User) c.OpConfig.ConnectionPooler.User)
return &v1.SecretKeySelector{ return &v1.SecretKeySelector{
@ -321,12 +330,13 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio
// default values, initialize it to an empty structure. It could be done // default values, initialize it to an empty structure. It could be done
// anywhere, but here is the earliest common entry point between sync and // anywhere, but here is the earliest common entry point between sync and
// create code, so init here. // create code, so init here.
if spec.ConnectionPooler == nil { connectionPoolerSpec := spec.ConnectionPooler
spec.ConnectionPooler = &acidv1.ConnectionPooler{} if connectionPoolerSpec == nil {
connectionPoolerSpec = &acidv1.ConnectionPooler{}
} }
podTemplate, err := c.generateConnectionPoolerPodTemplate(connectionPooler.Role) podTemplate, err := c.generateConnectionPoolerPodTemplate(connectionPooler.Role)
numberOfInstances := spec.ConnectionPooler.NumberOfInstances numberOfInstances := connectionPoolerSpec.NumberOfInstances
if numberOfInstances == nil { if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32( numberOfInstances = util.CoalesceInt32(
c.OpConfig.ConnectionPooler.NumberOfInstances, c.OpConfig.ConnectionPooler.NumberOfInstances,
@ -371,16 +381,6 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio
func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPoolerObjects) *v1.Service { func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPoolerObjects) *v1.Service {
spec := &c.Spec spec := &c.Spec
// there are two ways to enable connection pooler, either to specify a
// connectionPooler section or enableConnectionPooler. In the second case
// spec.connectionPooler will be nil, so to make it easier to calculate
// default values, initialize it to an empty structure. It could be done
// anywhere, but here is the earliest common entry point between sync and
// create code, so init here.
if spec.ConnectionPooler == nil {
spec.ConnectionPooler = &acidv1.ConnectionPooler{}
}
serviceSpec := v1.ServiceSpec{ serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{ Ports: []v1.ServicePort{
{ {
@ -668,12 +668,14 @@ func makeDefaultConnectionPoolerResources(config *config.Config) acidv1.Resource
func logPoolerEssentials(log *logrus.Entry, oldSpec, newSpec *acidv1.Postgresql) { func logPoolerEssentials(log *logrus.Entry, oldSpec, newSpec *acidv1.Postgresql) {
var v []string var v []string
var input []*bool var input []*bool
newMasterConnectionPoolerEnabled := needMasterConnectionPoolerWorker(&newSpec.Spec)
if oldSpec == nil { if oldSpec == nil {
input = []*bool{nil, nil, newSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler} input = []*bool{nil, nil, &newMasterConnectionPoolerEnabled, newSpec.Spec.EnableReplicaConnectionPooler}
} else { } else {
input = []*bool{oldSpec.Spec.EnableConnectionPooler, oldSpec.Spec.EnableReplicaConnectionPooler, newSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler} oldMasterConnectionPoolerEnabled := needMasterConnectionPoolerWorker(&oldSpec.Spec)
input = []*bool{&oldMasterConnectionPoolerEnabled, oldSpec.Spec.EnableReplicaConnectionPooler, &newMasterConnectionPoolerEnabled, newSpec.Spec.EnableReplicaConnectionPooler}
} }
for _, b := range input { for _, b := range input {
@ -684,25 +686,16 @@ func logPoolerEssentials(log *logrus.Entry, oldSpec, newSpec *acidv1.Postgresql)
} }
} }
log.Debugf("syncing connection pooler from (%v, %v) to (%v, %v)", v[0], v[1], v[2], v[3]) log.Debugf("syncing connection pooler (master, replica) from (%v, %v) to (%v, %v)", v[0], v[1], v[2], v[3])
} }
func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, LookupFunction InstallFunction) (SyncReason, error) { func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, LookupFunction InstallFunction) (SyncReason, error) {
var reason SyncReason var reason SyncReason
var err error var err error
var newNeedConnectionPooler, oldNeedConnectionPooler bool var connectionPoolerNeeded bool
oldNeedConnectionPooler = false
if oldSpec == nil { needSync := !reflect.DeepEqual(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler)
oldSpec = &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPooler: &acidv1.ConnectionPooler{},
},
}
}
needSync, _ := needSyncConnectionPoolerSpecs(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler, c.logger)
masterChanges, err := diff.Diff(oldSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableConnectionPooler) masterChanges, err := diff.Diff(oldSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableConnectionPooler)
if err != nil { if err != nil {
c.logger.Error("Error in getting diff of master connection pooler changes") c.logger.Error("Error in getting diff of master connection pooler changes")
@ -712,15 +705,14 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
c.logger.Error("Error in getting diff of replica connection pooler changes") c.logger.Error("Error in getting diff of replica connection pooler changes")
} }
// skip pooler sync only // skip pooler sync when theres no diff or it's deactivated
// 1. if there is no diff in spec, AND // but, handling the case when connectionPooler is not there but it is required
// 2. if connection pooler is already there and is also required as per newSpec
//
// Handling the case when connectionPooler is not there but it is required
// as per spec, hence do not skip syncing in that case, even though there // as per spec, hence do not skip syncing in that case, even though there
// is no diff in specs // is no diff in specs
if (!needSync && len(masterChanges) <= 0 && len(replicaChanges) <= 0) && if (!needSync && len(masterChanges) <= 0 && len(replicaChanges) <= 0) &&
(c.ConnectionPooler != nil && (needConnectionPooler(&newSpec.Spec))) { ((!needConnectionPooler(&newSpec.Spec) && (c.ConnectionPooler == nil || !needConnectionPooler(&oldSpec.Spec))) ||
(c.ConnectionPooler != nil && needConnectionPooler(&newSpec.Spec) &&
(c.ConnectionPooler[Master].LookupFunction || c.ConnectionPooler[Replica].LookupFunction))) {
c.logger.Debugln("syncing pooler is not required") c.logger.Debugln("syncing pooler is not required")
return nil, nil return nil, nil
} }
@ -731,15 +723,9 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
for _, role := range [2]PostgresRole{Master, Replica} { for _, role := range [2]PostgresRole{Master, Replica} {
if role == Master { if role == Master {
newNeedConnectionPooler = needMasterConnectionPoolerWorker(&newSpec.Spec) connectionPoolerNeeded = needMasterConnectionPoolerWorker(&newSpec.Spec)
if oldSpec != nil {
oldNeedConnectionPooler = needMasterConnectionPoolerWorker(&oldSpec.Spec)
}
} else { } else {
newNeedConnectionPooler = needReplicaConnectionPoolerWorker(&newSpec.Spec) connectionPoolerNeeded = needReplicaConnectionPoolerWorker(&newSpec.Spec)
if oldSpec != nil {
oldNeedConnectionPooler = needReplicaConnectionPoolerWorker(&oldSpec.Spec)
}
} }
// if the call is via createConnectionPooler, then it is required to initialize // if the call is via createConnectionPooler, then it is required to initialize
@ -759,24 +745,22 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
} }
} }
if newNeedConnectionPooler { if connectionPoolerNeeded {
// Try to sync in any case. If we didn't needed connection pooler before, // Try to sync in any case. If we didn't needed connection pooler before,
// it means we want to create it. If it was already present, still sync // it means we want to create it. If it was already present, still sync
// since it could happen that there is no difference in specs, and all // since it could happen that there is no difference in specs, and all
// the resources are remembered, but the deployment was manually deleted // the resources are remembered, but the deployment was manually deleted
// in between // in between
// in this case also do not forget to install lookup function as for // in this case also do not forget to install lookup function
// creating cluster if !c.ConnectionPooler[role].LookupFunction {
if !oldNeedConnectionPooler || !c.ConnectionPooler[role].LookupFunction { connectionPooler := c.Spec.ConnectionPooler
newConnectionPooler := newSpec.Spec.ConnectionPooler
specSchema := "" specSchema := ""
specUser := "" specUser := ""
if newConnectionPooler != nil { if connectionPooler != nil {
specSchema = newConnectionPooler.Schema specSchema = connectionPooler.Schema
specUser = newConnectionPooler.User specUser = connectionPooler.User
} }
schema := util.Coalesce( schema := util.Coalesce(
@ -787,9 +771,10 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
specUser, specUser,
c.OpConfig.ConnectionPooler.User) c.OpConfig.ConnectionPooler.User)
if err = LookupFunction(schema, user, role); err != nil { if err = LookupFunction(schema, user); err != nil {
return NoSync, err return NoSync, err
} }
c.ConnectionPooler[role].LookupFunction = true
} }
if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, role); err != nil { if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, role); err != nil {
@ -808,8 +793,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
} }
} }
} }
if !needMasterConnectionPoolerWorker(&newSpec.Spec) && if (needMasterConnectionPoolerWorker(&oldSpec.Spec) || needReplicaConnectionPoolerWorker(&oldSpec.Spec)) &&
!needReplicaConnectionPoolerWorker(&newSpec.Spec) { !needMasterConnectionPoolerWorker(&newSpec.Spec) && !needReplicaConnectionPoolerWorker(&newSpec.Spec) {
if err = c.deleteConnectionPoolerSecret(); err != nil { if err = c.deleteConnectionPoolerSecret(); err != nil {
c.logger.Warningf("could not remove connection pooler secret: %v", err) c.logger.Warningf("could not remove connection pooler secret: %v", err)
} }
@ -874,8 +859,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
newConnectionPooler = &acidv1.ConnectionPooler{} newConnectionPooler = &acidv1.ConnectionPooler{}
} }
c.logger.Infof("old: %+v, new %+v", oldConnectionPooler, newConnectionPooler)
var specSync bool var specSync bool
var specReason []string var specReason []string

View File

@ -19,7 +19,7 @@ import (
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
) )
func mockInstallLookupFunction(schema string, user string, role PostgresRole) error { func mockInstallLookupFunction(schema string, user string) error {
return nil return nil
} }

View File

@ -508,7 +508,7 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi
// Creates a connection pool credentials lookup function in every database to // Creates a connection pool credentials lookup function in every database to
// perform remote authentication. // perform remote authentication.
func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role PostgresRole) error { func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
var stmtBytes bytes.Buffer var stmtBytes bytes.Buffer
c.logger.Info("Installing lookup function") c.logger.Info("Installing lookup function")
@ -604,8 +604,8 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role Po
c.logger.Infof("pooler lookup function installed into %s", dbname) c.logger.Infof("pooler lookup function installed into %s", dbname)
} }
if len(failedDatabases) == 0 { if len(failedDatabases) > 0 {
c.ConnectionPooler[role].LookupFunction = true return fmt.Errorf("could not install pooler lookup function in every specified databases")
} }
return nil return nil

View File

@ -758,6 +758,15 @@ func (c *Cluster) syncDatabases() error {
} }
} }
if len(createDatabases) > 0 {
// trigger creation of pooler objects in new database in syncConnectionPooler
if c.ConnectionPooler != nil {
for _, role := range [2]PostgresRole{Master, Replica} {
c.ConnectionPooler[role].LookupFunction = false
}
}
}
// set default privileges for prepared database // set default privileges for prepared database
for _, preparedDatabase := range preparedDatabases { for _, preparedDatabase := range preparedDatabases {
if err := c.initDbConnWithName(preparedDatabase); err != nil { if err := c.initDbConnWithName(preparedDatabase); err != nil {

View File

@ -72,7 +72,7 @@ type ClusterStatus struct {
type TemplateParams map[string]interface{} type TemplateParams map[string]interface{}
type InstallFunction func(schema string, user string, role PostgresRole) error type InstallFunction func(schema string, user string) error
type SyncReason []string type SyncReason []string