Lookup function installation (#1171)
* Lookup function installation Due to reusing a previous database connection without closing it, lookup function installation process was skipping the first database in the list, installing twice into postgres db instead. To prevent that, make internal initDbConnWithName to overwrite a connection object, and return the same object only from initDbConn, which is sort of public interface. Another solution for this would be to modify initDbConnWithName to return a connection object and then generate one temporary connection for each db. It sound feasible but after one attempt it seems it requires a bit more changes around (init, close connections) and doesn't bring anything significantly better on the table. In case if some future changes will prove this wrong, do not hesitate to refactor. Change retry strategy to more insistive one, namely: * retry on the next sync even if we failed to process one database and install pooler appliance. * perform the whole installation unconditionally on update, since the list of target databases could be changed. And for the sake of making it even more robust, also log the case when operator decides to skip installation. Extend connection pooler e2e test with verification that all dbs have required schema installed.
This commit is contained in:
parent
d15f2d3392
commit
1f5d0995a5
|
|
@ -1,2 +1,2 @@
|
||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
kubectl exec -it $1 -- sh -c "$2"
|
kubectl exec -i $1 -- sh -c "$2"
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,14 @@ def to_selector(labels):
|
||||||
return ",".join(["=".join(l) for l in labels.items()])
|
return ",".join(["=".join(l) for l in labels.items()])
|
||||||
|
|
||||||
|
|
||||||
|
def clean_list(values):
|
||||||
|
# value is not stripped bytes, strip and convert to a string
|
||||||
|
clean = lambda v: v.strip().decode()
|
||||||
|
notNone = lambda v: v
|
||||||
|
|
||||||
|
return list(filter(notNone, map(clean, values)))
|
||||||
|
|
||||||
|
|
||||||
class EndToEndTestCase(unittest.TestCase):
|
class EndToEndTestCase(unittest.TestCase):
|
||||||
'''
|
'''
|
||||||
Test interaction of the operator with multiple K8s components.
|
Test interaction of the operator with multiple K8s components.
|
||||||
|
|
@ -140,6 +148,58 @@ class EndToEndTestCase(unittest.TestCase):
|
||||||
|
|
||||||
k8s.wait_for_running_pods(pod_selector, 2)
|
k8s.wait_for_running_pods(pod_selector, 2)
|
||||||
|
|
||||||
|
# Verify that all the databases have pooler schema installed.
|
||||||
|
# Do this via psql, since otherwise we need to deal with
|
||||||
|
# credentials.
|
||||||
|
dbList = []
|
||||||
|
|
||||||
|
leader = k8s.get_cluster_leader_pod('acid-minimal-cluster')
|
||||||
|
dbListQuery = "select datname from pg_database"
|
||||||
|
schemasQuery = """
|
||||||
|
select schema_name
|
||||||
|
from information_schema.schemata
|
||||||
|
where schema_name = 'pooler'
|
||||||
|
"""
|
||||||
|
exec_query = r"psql -tAq -c \"{}\" -d {}"
|
||||||
|
|
||||||
|
if leader:
|
||||||
|
try:
|
||||||
|
q = exec_query.format(dbListQuery, "postgres")
|
||||||
|
q = "su postgres -c \"{}\"".format(q)
|
||||||
|
print('Get databases: {}'.format(q))
|
||||||
|
result = k8s.exec_with_kubectl(leader.metadata.name, q)
|
||||||
|
dbList = clean_list(result.stdout.split(b'\n'))
|
||||||
|
print('dbList: {}, stdout: {}, stderr {}'.format(
|
||||||
|
dbList, result.stdout, result.stderr
|
||||||
|
))
|
||||||
|
except Exception as ex:
|
||||||
|
print('Could not get databases: {}'.format(ex))
|
||||||
|
print('Stdout: {}'.format(result.stdout))
|
||||||
|
print('Stderr: {}'.format(result.stderr))
|
||||||
|
|
||||||
|
for db in dbList:
|
||||||
|
if db in ('template0', 'template1'):
|
||||||
|
continue
|
||||||
|
|
||||||
|
schemas = []
|
||||||
|
try:
|
||||||
|
q = exec_query.format(schemasQuery, db)
|
||||||
|
q = "su postgres -c \"{}\"".format(q)
|
||||||
|
print('Get schemas: {}'.format(q))
|
||||||
|
result = k8s.exec_with_kubectl(leader.metadata.name, q)
|
||||||
|
schemas = clean_list(result.stdout.split(b'\n'))
|
||||||
|
print('schemas: {}, stdout: {}, stderr {}'.format(
|
||||||
|
schemas, result.stdout, result.stderr
|
||||||
|
))
|
||||||
|
except Exception as ex:
|
||||||
|
print('Could not get databases: {}'.format(ex))
|
||||||
|
print('Stdout: {}'.format(result.stdout))
|
||||||
|
print('Stderr: {}'.format(result.stderr))
|
||||||
|
|
||||||
|
self.assertNotEqual(len(schemas), 0)
|
||||||
|
else:
|
||||||
|
print('Could not find leader pod')
|
||||||
|
|
||||||
# turn it off, keeping configuration section
|
# turn it off, keeping configuration section
|
||||||
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
||||||
'acid.zalan.do', 'v1', 'default',
|
'acid.zalan.do', 'v1', 'default',
|
||||||
|
|
@ -235,7 +295,10 @@ class EndToEndTestCase(unittest.TestCase):
|
||||||
# operator configuration via API
|
# operator configuration via API
|
||||||
operator_pod = k8s.get_operator_pod()
|
operator_pod = k8s.get_operator_pod()
|
||||||
get_config_cmd = "wget --quiet -O - localhost:8080/config"
|
get_config_cmd = "wget --quiet -O - localhost:8080/config"
|
||||||
result = k8s.exec_with_kubectl(operator_pod.metadata.name, get_config_cmd)
|
result = k8s.exec_with_kubectl(
|
||||||
|
operator_pod.metadata.name,
|
||||||
|
get_config_cmd,
|
||||||
|
)
|
||||||
roles_dict = (json.loads(result.stdout)
|
roles_dict = (json.loads(result.stdout)
|
||||||
.get("controller", {})
|
.get("controller", {})
|
||||||
.get("InfrastructureRoles"))
|
.get("InfrastructureRoles"))
|
||||||
|
|
@ -862,6 +925,19 @@ class K8s:
|
||||||
|
|
||||||
return master_pod_node, replica_pod_nodes
|
return master_pod_node, replica_pod_nodes
|
||||||
|
|
||||||
|
def get_cluster_leader_pod(self, pg_cluster_name, namespace='default'):
|
||||||
|
labels = {
|
||||||
|
'application': 'spilo',
|
||||||
|
'cluster-name': pg_cluster_name,
|
||||||
|
'spilo-role': 'master',
|
||||||
|
}
|
||||||
|
|
||||||
|
pods = self.api.core_v1.list_namespaced_pod(
|
||||||
|
namespace, label_selector=to_selector(labels)).items
|
||||||
|
|
||||||
|
if pods:
|
||||||
|
return pods[0]
|
||||||
|
|
||||||
def wait_for_operator_pod_start(self):
|
def wait_for_operator_pod_start(self):
|
||||||
self. wait_for_pod_start("name=postgres-operator")
|
self. wait_for_pod_start("name=postgres-operator")
|
||||||
# HACK operator must register CRD and/or Sync existing PG clusters after start up
|
# HACK operator must register CRD and/or Sync existing PG clusters after start up
|
||||||
|
|
|
||||||
|
|
@ -780,7 +780,13 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// sync connection pooler
|
// Sync connection pooler. Before actually doing sync reset lookup
|
||||||
|
// installation flag, since manifest updates could add another db which we
|
||||||
|
// need to process. In the future we may want to do this more careful and
|
||||||
|
// check which databases we need to process, but even repeating the whole
|
||||||
|
// installation process should be good enough.
|
||||||
|
c.ConnectionPooler.LookupFunction = false
|
||||||
|
|
||||||
if _, err := c.syncConnectionPooler(oldSpec, newSpec,
|
if _, err := c.syncConnectionPooler(oldSpec, newSpec,
|
||||||
c.installLookupFunction); err != nil {
|
c.installLookupFunction); err != nil {
|
||||||
c.logger.Errorf("could not sync connection pooler: %v", err)
|
c.logger.Errorf("could not sync connection pooler: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -101,15 +101,20 @@ func (c *Cluster) databaseAccessDisabled() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) initDbConn() error {
|
func (c *Cluster) initDbConn() error {
|
||||||
return c.initDbConnWithName("")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cluster) initDbConnWithName(dbname string) error {
|
|
||||||
c.setProcessName("initializing db connection")
|
|
||||||
if c.pgDb != nil {
|
if c.pgDb != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return c.initDbConnWithName("")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Worker function for connection initialization. This function does not check
|
||||||
|
// if the connection is already open, if it is then it will be overwritten.
|
||||||
|
// Callers need to make sure no connection is open, otherwise we could leak
|
||||||
|
// connections
|
||||||
|
func (c *Cluster) initDbConnWithName(dbname string) error {
|
||||||
|
c.setProcessName("initializing db connection")
|
||||||
|
|
||||||
var conn *sql.DB
|
var conn *sql.DB
|
||||||
connstring := c.pgConnectionString(dbname)
|
connstring := c.pgConnectionString(dbname)
|
||||||
|
|
||||||
|
|
@ -145,6 +150,12 @@ func (c *Cluster) initDbConnWithName(dbname string) error {
|
||||||
conn.SetMaxOpenConns(1)
|
conn.SetMaxOpenConns(1)
|
||||||
conn.SetMaxIdleConns(-1)
|
conn.SetMaxIdleConns(-1)
|
||||||
|
|
||||||
|
if c.pgDb != nil {
|
||||||
|
msg := "Closing an existing connection before opening a new one to %s"
|
||||||
|
c.logger.Warningf(msg, dbname)
|
||||||
|
c.closeDbConn()
|
||||||
|
}
|
||||||
|
|
||||||
c.pgDb = conn
|
c.pgDb = conn
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -465,8 +476,11 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi
|
||||||
// perform remote authentification.
|
// perform remote authentification.
|
||||||
func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
|
func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
|
||||||
var stmtBytes bytes.Buffer
|
var stmtBytes bytes.Buffer
|
||||||
|
|
||||||
c.logger.Info("Installing lookup function")
|
c.logger.Info("Installing lookup function")
|
||||||
|
|
||||||
|
// Open a new connection if not yet done. This connection will be used only
|
||||||
|
// to get the list of databases, not for the actuall installation.
|
||||||
if err := c.initDbConn(); err != nil {
|
if err := c.initDbConn(); err != nil {
|
||||||
return fmt.Errorf("could not init database connection")
|
return fmt.Errorf("could not init database connection")
|
||||||
}
|
}
|
||||||
|
|
@ -480,37 +494,41 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// List of databases we failed to process. At the moment it function just
|
||||||
|
// like a flag to retry on the next sync, but in the future we may want to
|
||||||
|
// retry only necessary parts, so let's keep the list.
|
||||||
|
failedDatabases := []string{}
|
||||||
currentDatabases, err := c.getDatabases()
|
currentDatabases, err := c.getDatabases()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := "could not get databases to install pooler lookup function: %v"
|
msg := "could not get databases to install pooler lookup function: %v"
|
||||||
return fmt.Errorf(msg, err)
|
return fmt.Errorf(msg, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We've got the list of target databases, now close this connection to
|
||||||
|
// open a new one to every each of them.
|
||||||
|
if err := c.closeDbConn(); err != nil {
|
||||||
|
c.logger.Errorf("could not close database connection: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
templater := template.Must(template.New("sql").Parse(connectionPoolerLookup))
|
templater := template.Must(template.New("sql").Parse(connectionPoolerLookup))
|
||||||
|
params := TemplateParams{
|
||||||
|
"pooler_schema": poolerSchema,
|
||||||
|
"pooler_user": poolerUser,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := templater.Execute(&stmtBytes, params); err != nil {
|
||||||
|
msg := "could not prepare sql statement %+v: %v"
|
||||||
|
return fmt.Errorf(msg, params, err)
|
||||||
|
}
|
||||||
|
|
||||||
for dbname := range currentDatabases {
|
for dbname := range currentDatabases {
|
||||||
|
|
||||||
if dbname == "template0" || dbname == "template1" {
|
if dbname == "template0" || dbname == "template1" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.initDbConnWithName(dbname); err != nil {
|
|
||||||
return fmt.Errorf("could not init database connection to %s", dbname)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Infof("Install pooler lookup function into %s", dbname)
|
c.logger.Infof("Install pooler lookup function into %s", dbname)
|
||||||
|
|
||||||
params := TemplateParams{
|
|
||||||
"pooler_schema": poolerSchema,
|
|
||||||
"pooler_user": poolerUser,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := templater.Execute(&stmtBytes, params); err != nil {
|
|
||||||
c.logger.Errorf("could not prepare sql statement %+v: %v",
|
|
||||||
params, err)
|
|
||||||
// process other databases
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// golang sql will do retries couple of times if pq driver reports
|
// golang sql will do retries couple of times if pq driver reports
|
||||||
// connections issues (driver.ErrBadConn), but since our query is
|
// connections issues (driver.ErrBadConn), but since our query is
|
||||||
// idempotent, we can retry in a view of other errors (e.g. due to
|
// idempotent, we can retry in a view of other errors (e.g. due to
|
||||||
|
|
@ -520,7 +538,20 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
|
||||||
constants.PostgresConnectTimeout,
|
constants.PostgresConnectTimeout,
|
||||||
constants.PostgresConnectRetryTimeout,
|
constants.PostgresConnectRetryTimeout,
|
||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil {
|
|
||||||
|
// At this moment we are not connected to any database
|
||||||
|
if err := c.initDbConnWithName(dbname); err != nil {
|
||||||
|
msg := "could not init database connection to %s"
|
||||||
|
return false, fmt.Errorf(msg, dbname)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := c.closeDbConn(); err != nil {
|
||||||
|
msg := "could not close database connection: %v"
|
||||||
|
c.logger.Errorf(msg, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if _, err = c.pgDb.Exec(stmtBytes.String()); err != nil {
|
||||||
msg := fmt.Errorf("could not execute sql statement %s: %v",
|
msg := fmt.Errorf("could not execute sql statement %s: %v",
|
||||||
stmtBytes.String(), err)
|
stmtBytes.String(), err)
|
||||||
return false, msg
|
return false, msg
|
||||||
|
|
@ -533,15 +564,16 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
|
||||||
c.logger.Errorf("could not execute after retries %s: %v",
|
c.logger.Errorf("could not execute after retries %s: %v",
|
||||||
stmtBytes.String(), err)
|
stmtBytes.String(), err)
|
||||||
// process other databases
|
// process other databases
|
||||||
|
failedDatabases = append(failedDatabases, dbname)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Infof("pooler lookup function installed into %s", dbname)
|
c.logger.Infof("pooler lookup function installed into %s", dbname)
|
||||||
if err := c.closeDbConn(); err != nil {
|
|
||||||
c.logger.Errorf("could not close database connection: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.ConnectionPooler.LookupFunction = true
|
if len(failedDatabases) == 0 {
|
||||||
|
c.ConnectionPooler.LookupFunction = true
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -847,7 +847,9 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if c.ConnectionPooler == nil {
|
if c.ConnectionPooler == nil {
|
||||||
c.ConnectionPooler = &ConnectionPoolerObjects{}
|
c.ConnectionPooler = &ConnectionPoolerObjects{
|
||||||
|
LookupFunction: false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
newNeedConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec)
|
newNeedConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec)
|
||||||
|
|
@ -885,6 +887,11 @@ func (c *Cluster) syncConnectionPooler(oldSpec,
|
||||||
if err = lookup(schema, user); err != nil {
|
if err = lookup(schema, user); err != nil {
|
||||||
return NoSync, err
|
return NoSync, err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Lookup function installation seems to be a fragile point, so
|
||||||
|
// let's log for debugging if we skip it
|
||||||
|
msg := "Skip lookup function installation, old: %d, already installed %d"
|
||||||
|
c.logger.Debug(msg, oldNeedConnectionPooler, c.ConnectionPooler.LookupFunction)
|
||||||
}
|
}
|
||||||
|
|
||||||
if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {
|
if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue