Address feedback
Set default numberOfInstances to 2. Add verifications for config. Fix schema/user typos. Avoid closing an empty connections.
This commit is contained in:
parent
6ae3c3d752
commit
f839806616
|
|
@ -44,7 +44,7 @@ const (
|
|||
$$ LANGUAGE plpgsql SECURITY DEFINER;
|
||||
|
||||
REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text)
|
||||
FROM public, {{.pool_schema}};
|
||||
FROM public, {{.pool_user}};
|
||||
GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text)
|
||||
TO {{.pool_user}};
|
||||
GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}};
|
||||
|
|
@ -124,6 +124,10 @@ func (c *Cluster) initDbConnWithName(dbname string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) connectionIsClosed() bool {
|
||||
return c.pgDb == nil
|
||||
}
|
||||
|
||||
func (c *Cluster) closeDbConn() (err error) {
|
||||
c.setProcessName("closing db connection")
|
||||
if c.pgDb != nil {
|
||||
|
|
@ -284,8 +288,10 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
|
|||
return fmt.Errorf("could not init database connection")
|
||||
}
|
||||
defer func() {
|
||||
// in case if everything went fine this can generate a warning about
|
||||
// trying to close an empty connection.
|
||||
if c.connectionIsClosed() {
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.closeDbConn(); err != nil {
|
||||
c.logger.Errorf("could not close database connection: %v", err)
|
||||
}
|
||||
|
|
@ -300,8 +306,12 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
|
|||
templater := template.Must(template.New("sql").Parse(connectionPoolLookup))
|
||||
|
||||
for dbname, _ := range currentDatabases {
|
||||
if dbname == "template0" || dbname == "template1" {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := c.initDbConnWithName(dbname); err != nil {
|
||||
return fmt.Errorf("could not init database connection")
|
||||
return fmt.Errorf("could not init database connection to %s", dbname)
|
||||
}
|
||||
|
||||
c.logger.Infof("Install pool lookup function into %s", dbname)
|
||||
|
|
@ -312,8 +322,10 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
|
|||
}
|
||||
|
||||
if err := templater.Execute(&stmtBytes, params); err != nil {
|
||||
return fmt.Errorf("could not prepare sql statement %+v: %v",
|
||||
c.logger.Errorf("could not prepare sql statement %+v: %v",
|
||||
params, err)
|
||||
// process other databases
|
||||
continue
|
||||
}
|
||||
|
||||
// golang sql will do retries couple of times if pq driver reports
|
||||
|
|
@ -335,8 +347,10 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
|
|||
})
|
||||
|
||||
if execErr != nil {
|
||||
return fmt.Errorf("could not execute after retries %s: %v",
|
||||
c.logger.Errorf("could not execute after retries %s: %v",
|
||||
stmtBytes.String(), err)
|
||||
// process other databases
|
||||
continue
|
||||
}
|
||||
|
||||
c.logger.Infof("Pool lookup function installed into %s", dbname)
|
||||
|
|
|
|||
|
|
@ -645,7 +645,7 @@ func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup
|
|||
|
||||
if newConnPool != nil {
|
||||
specSchema = newConnPool.Schema
|
||||
specUser = newConnPool.Schema
|
||||
specUser = newConnPool.User
|
||||
}
|
||||
|
||||
schema := util.Coalesce(
|
||||
|
|
@ -656,7 +656,9 @@ func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup
|
|||
specUser,
|
||||
c.OpConfig.ConnectionPool.User)
|
||||
|
||||
lookup(schema, user)
|
||||
if err := lookup(schema, user); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.syncConnectionPoolWorker(oldSpec, newSpec); err != nil {
|
||||
|
|
|
|||
|
|
@ -152,11 +152,11 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
|
|||
// so ensure default values here.
|
||||
result.ConnectionPool.NumberOfInstances = util.CoalesceInt32(
|
||||
fromCRD.ConnectionPool.NumberOfInstances,
|
||||
int32ToPointer(1))
|
||||
int32ToPointer(2))
|
||||
|
||||
result.ConnectionPool.NumberOfInstances = util.MaxInt32(
|
||||
result.ConnectionPool.NumberOfInstances,
|
||||
int32ToPointer(1))
|
||||
int32ToPointer(2))
|
||||
|
||||
result.ConnectionPool.Schema = util.Coalesce(
|
||||
fromCRD.ConnectionPool.Schema,
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/zalando/postgres-operator/pkg/spec"
|
||||
"github.com/zalando/postgres-operator/pkg/util/constants"
|
||||
)
|
||||
|
||||
// CRD describes CustomResourceDefinition specific configuration parameters
|
||||
|
|
@ -211,5 +212,10 @@ func validate(cfg *Config) (err error) {
|
|||
if cfg.Workers == 0 {
|
||||
err = fmt.Errorf("number of workers should be higher than 0")
|
||||
}
|
||||
|
||||
if *cfg.ConnectionPool.NumberOfInstances < constants.ConnPoolMinInstances {
|
||||
msg := "number of connection pool instances should be higher than %d"
|
||||
err = fmt.Errorf(msg, constants.ConnPoolMinInstances)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue