From 1f5d0995a58b8df0973deb3fbb90b162d6c982a2 Mon Sep 17 00:00:00 2001 From: Dmitry Dolgov <9erthalion6@gmail.com> Date: Mon, 19 Oct 2020 16:18:58 +0200 Subject: [PATCH] Lookup function installation (#1171) * Lookup function installation Due to reusing a previous database connection without closing it, lookup function installation process was skipping the first database in the list, installing twice into postgres db instead. To prevent that, make internal initDbConnWithName to overwrite a connection object, and return the same object only from initDbConn, which is sort of public interface. Another solution for this would be to modify initDbConnWithName to return a connection object and then generate one temporary connection for each db. It sound feasible but after one attempt it seems it requires a bit more changes around (init, close connections) and doesn't bring anything significantly better on the table. In case if some future changes will prove this wrong, do not hesitate to refactor. Change retry strategy to more insistive one, namely: * retry on the next sync even if we failed to process one database and install pooler appliance. * perform the whole installation unconditionally on update, since the list of target databases could be changed. And for the sake of making it even more robust, also log the case when operator decides to skip installation. Extend connection pooler e2e test with verification that all dbs have required schema installed. --- e2e/exec.sh | 2 +- e2e/tests/test_e2e.py | 78 +++++++++++++++++++++++++++++++++++++- pkg/cluster/cluster.go | 8 +++- pkg/cluster/database.go | 84 ++++++++++++++++++++++++++++------------- pkg/cluster/sync.go | 9 ++++- 5 files changed, 151 insertions(+), 30 deletions(-) diff --git a/e2e/exec.sh b/e2e/exec.sh index 56276bc3c..1ab666e5e 100755 --- a/e2e/exec.sh +++ b/e2e/exec.sh @@ -1,2 +1,2 @@ #!/usr/bin/env bash -kubectl exec -it $1 -- sh -c "$2" +kubectl exec -i $1 -- sh -c "$2" diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 550d3ced8..fc251c430 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -15,6 +15,14 @@ def to_selector(labels): return ",".join(["=".join(l) for l in labels.items()]) +def clean_list(values): + # value is not stripped bytes, strip and convert to a string + clean = lambda v: v.strip().decode() + notNone = lambda v: v + + return list(filter(notNone, map(clean, values))) + + class EndToEndTestCase(unittest.TestCase): ''' Test interaction of the operator with multiple K8s components. @@ -140,6 +148,58 @@ class EndToEndTestCase(unittest.TestCase): k8s.wait_for_running_pods(pod_selector, 2) + # Verify that all the databases have pooler schema installed. + # Do this via psql, since otherwise we need to deal with + # credentials. + dbList = [] + + leader = k8s.get_cluster_leader_pod('acid-minimal-cluster') + dbListQuery = "select datname from pg_database" + schemasQuery = """ + select schema_name + from information_schema.schemata + where schema_name = 'pooler' + """ + exec_query = r"psql -tAq -c \"{}\" -d {}" + + if leader: + try: + q = exec_query.format(dbListQuery, "postgres") + q = "su postgres -c \"{}\"".format(q) + print('Get databases: {}'.format(q)) + result = k8s.exec_with_kubectl(leader.metadata.name, q) + dbList = clean_list(result.stdout.split(b'\n')) + print('dbList: {}, stdout: {}, stderr {}'.format( + dbList, result.stdout, result.stderr + )) + except Exception as ex: + print('Could not get databases: {}'.format(ex)) + print('Stdout: {}'.format(result.stdout)) + print('Stderr: {}'.format(result.stderr)) + + for db in dbList: + if db in ('template0', 'template1'): + continue + + schemas = [] + try: + q = exec_query.format(schemasQuery, db) + q = "su postgres -c \"{}\"".format(q) + print('Get schemas: {}'.format(q)) + result = k8s.exec_with_kubectl(leader.metadata.name, q) + schemas = clean_list(result.stdout.split(b'\n')) + print('schemas: {}, stdout: {}, stderr {}'.format( + schemas, result.stdout, result.stderr + )) + except Exception as ex: + print('Could not get databases: {}'.format(ex)) + print('Stdout: {}'.format(result.stdout)) + print('Stderr: {}'.format(result.stderr)) + + self.assertNotEqual(len(schemas), 0) + else: + print('Could not find leader pod') + # turn it off, keeping configuration section k8s.api.custom_objects_api.patch_namespaced_custom_object( 'acid.zalan.do', 'v1', 'default', @@ -235,7 +295,10 @@ class EndToEndTestCase(unittest.TestCase): # operator configuration via API operator_pod = k8s.get_operator_pod() get_config_cmd = "wget --quiet -O - localhost:8080/config" - result = k8s.exec_with_kubectl(operator_pod.metadata.name, get_config_cmd) + result = k8s.exec_with_kubectl( + operator_pod.metadata.name, + get_config_cmd, + ) roles_dict = (json.loads(result.stdout) .get("controller", {}) .get("InfrastructureRoles")) @@ -862,6 +925,19 @@ class K8s: return master_pod_node, replica_pod_nodes + def get_cluster_leader_pod(self, pg_cluster_name, namespace='default'): + labels = { + 'application': 'spilo', + 'cluster-name': pg_cluster_name, + 'spilo-role': 'master', + } + + pods = self.api.core_v1.list_namespaced_pod( + namespace, label_selector=to_selector(labels)).items + + if pods: + return pods[0] + def wait_for_operator_pod_start(self): self. wait_for_pod_start("name=postgres-operator") # HACK operator must register CRD and/or Sync existing PG clusters after start up diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 9b8b51eb0..6aa1f6fa4 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -780,7 +780,13 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } - // sync connection pooler + // Sync connection pooler. Before actually doing sync reset lookup + // installation flag, since manifest updates could add another db which we + // need to process. In the future we may want to do this more careful and + // check which databases we need to process, but even repeating the whole + // installation process should be good enough. + c.ConnectionPooler.LookupFunction = false + if _, err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil { c.logger.Errorf("could not sync connection pooler: %v", err) diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 75e2d2097..f51b58a89 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -101,15 +101,20 @@ func (c *Cluster) databaseAccessDisabled() bool { } func (c *Cluster) initDbConn() error { - return c.initDbConnWithName("") -} - -func (c *Cluster) initDbConnWithName(dbname string) error { - c.setProcessName("initializing db connection") if c.pgDb != nil { return nil } + return c.initDbConnWithName("") +} + +// Worker function for connection initialization. This function does not check +// if the connection is already open, if it is then it will be overwritten. +// Callers need to make sure no connection is open, otherwise we could leak +// connections +func (c *Cluster) initDbConnWithName(dbname string) error { + c.setProcessName("initializing db connection") + var conn *sql.DB connstring := c.pgConnectionString(dbname) @@ -145,6 +150,12 @@ func (c *Cluster) initDbConnWithName(dbname string) error { conn.SetMaxOpenConns(1) conn.SetMaxIdleConns(-1) + if c.pgDb != nil { + msg := "Closing an existing connection before opening a new one to %s" + c.logger.Warningf(msg, dbname) + c.closeDbConn() + } + c.pgDb = conn return nil @@ -465,8 +476,11 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi // perform remote authentification. func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { var stmtBytes bytes.Buffer + c.logger.Info("Installing lookup function") + // Open a new connection if not yet done. This connection will be used only + // to get the list of databases, not for the actuall installation. if err := c.initDbConn(); err != nil { return fmt.Errorf("could not init database connection") } @@ -480,37 +494,41 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { } }() + // List of databases we failed to process. At the moment it function just + // like a flag to retry on the next sync, but in the future we may want to + // retry only necessary parts, so let's keep the list. + failedDatabases := []string{} currentDatabases, err := c.getDatabases() if err != nil { msg := "could not get databases to install pooler lookup function: %v" return fmt.Errorf(msg, err) } + // We've got the list of target databases, now close this connection to + // open a new one to every each of them. + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + templater := template.Must(template.New("sql").Parse(connectionPoolerLookup)) + params := TemplateParams{ + "pooler_schema": poolerSchema, + "pooler_user": poolerUser, + } + + if err := templater.Execute(&stmtBytes, params); err != nil { + msg := "could not prepare sql statement %+v: %v" + return fmt.Errorf(msg, params, err) + } for dbname := range currentDatabases { + if dbname == "template0" || dbname == "template1" { continue } - if err := c.initDbConnWithName(dbname); err != nil { - return fmt.Errorf("could not init database connection to %s", dbname) - } - c.logger.Infof("Install pooler lookup function into %s", dbname) - params := TemplateParams{ - "pooler_schema": poolerSchema, - "pooler_user": poolerUser, - } - - if err := templater.Execute(&stmtBytes, params); err != nil { - c.logger.Errorf("could not prepare sql statement %+v: %v", - params, err) - // process other databases - continue - } - // golang sql will do retries couple of times if pq driver reports // connections issues (driver.ErrBadConn), but since our query is // idempotent, we can retry in a view of other errors (e.g. due to @@ -520,7 +538,20 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { - if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { + + // At this moment we are not connected to any database + if err := c.initDbConnWithName(dbname); err != nil { + msg := "could not init database connection to %s" + return false, fmt.Errorf(msg, dbname) + } + defer func() { + if err := c.closeDbConn(); err != nil { + msg := "could not close database connection: %v" + c.logger.Errorf(msg, err) + } + }() + + if _, err = c.pgDb.Exec(stmtBytes.String()); err != nil { msg := fmt.Errorf("could not execute sql statement %s: %v", stmtBytes.String(), err) return false, msg @@ -533,15 +564,16 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { c.logger.Errorf("could not execute after retries %s: %v", stmtBytes.String(), err) // process other databases + failedDatabases = append(failedDatabases, dbname) continue } c.logger.Infof("pooler lookup function installed into %s", dbname) - if err := c.closeDbConn(); err != nil { - c.logger.Errorf("could not close database connection: %v", err) - } } - c.ConnectionPooler.LookupFunction = true + if len(failedDatabases) == 0 { + c.ConnectionPooler.LookupFunction = true + } + return nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index fef5b7b66..2a3959b1a 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -847,7 +847,9 @@ func (c *Cluster) syncConnectionPooler(oldSpec, var err error if c.ConnectionPooler == nil { - c.ConnectionPooler = &ConnectionPoolerObjects{} + c.ConnectionPooler = &ConnectionPoolerObjects{ + LookupFunction: false, + } } newNeedConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec) @@ -885,6 +887,11 @@ func (c *Cluster) syncConnectionPooler(oldSpec, if err = lookup(schema, user); err != nil { return NoSync, err } + } else { + // Lookup function installation seems to be a fragile point, so + // let's log for debugging if we skip it + msg := "Skip lookup function installation, old: %d, already installed %d" + c.logger.Debug(msg, oldNeedConnectionPooler, c.ConnectionPooler.LookupFunction) } if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {