diff --git a/e2e/exec.sh b/e2e/exec.sh index 56276bc3c..1ab666e5e 100755 --- a/e2e/exec.sh +++ b/e2e/exec.sh @@ -1,2 +1,2 @@ #!/usr/bin/env bash -kubectl exec -it $1 -- sh -c "$2" +kubectl exec -i $1 -- sh -c "$2" diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 550d3ced8..fc251c430 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -15,6 +15,14 @@ def to_selector(labels): return ",".join(["=".join(l) for l in labels.items()]) +def clean_list(values): + # value is not stripped bytes, strip and convert to a string + clean = lambda v: v.strip().decode() + notNone = lambda v: v + + return list(filter(notNone, map(clean, values))) + + class EndToEndTestCase(unittest.TestCase): ''' Test interaction of the operator with multiple K8s components. @@ -140,6 +148,58 @@ class EndToEndTestCase(unittest.TestCase): k8s.wait_for_running_pods(pod_selector, 2) + # Verify that all the databases have pooler schema installed. + # Do this via psql, since otherwise we need to deal with + # credentials. + dbList = [] + + leader = k8s.get_cluster_leader_pod('acid-minimal-cluster') + dbListQuery = "select datname from pg_database" + schemasQuery = """ + select schema_name + from information_schema.schemata + where schema_name = 'pooler' + """ + exec_query = r"psql -tAq -c \"{}\" -d {}" + + if leader: + try: + q = exec_query.format(dbListQuery, "postgres") + q = "su postgres -c \"{}\"".format(q) + print('Get databases: {}'.format(q)) + result = k8s.exec_with_kubectl(leader.metadata.name, q) + dbList = clean_list(result.stdout.split(b'\n')) + print('dbList: {}, stdout: {}, stderr {}'.format( + dbList, result.stdout, result.stderr + )) + except Exception as ex: + print('Could not get databases: {}'.format(ex)) + print('Stdout: {}'.format(result.stdout)) + print('Stderr: {}'.format(result.stderr)) + + for db in dbList: + if db in ('template0', 'template1'): + continue + + schemas = [] + try: + q = exec_query.format(schemasQuery, db) + q = "su postgres -c \"{}\"".format(q) + print('Get schemas: {}'.format(q)) + result = k8s.exec_with_kubectl(leader.metadata.name, q) + schemas = clean_list(result.stdout.split(b'\n')) + print('schemas: {}, stdout: {}, stderr {}'.format( + schemas, result.stdout, result.stderr + )) + except Exception as ex: + print('Could not get databases: {}'.format(ex)) + print('Stdout: {}'.format(result.stdout)) + print('Stderr: {}'.format(result.stderr)) + + self.assertNotEqual(len(schemas), 0) + else: + print('Could not find leader pod') + # turn it off, keeping configuration section k8s.api.custom_objects_api.patch_namespaced_custom_object( 'acid.zalan.do', 'v1', 'default', @@ -235,7 +295,10 @@ class EndToEndTestCase(unittest.TestCase): # operator configuration via API operator_pod = k8s.get_operator_pod() get_config_cmd = "wget --quiet -O - localhost:8080/config" - result = k8s.exec_with_kubectl(operator_pod.metadata.name, get_config_cmd) + result = k8s.exec_with_kubectl( + operator_pod.metadata.name, + get_config_cmd, + ) roles_dict = (json.loads(result.stdout) .get("controller", {}) .get("InfrastructureRoles")) @@ -862,6 +925,19 @@ class K8s: return master_pod_node, replica_pod_nodes + def get_cluster_leader_pod(self, pg_cluster_name, namespace='default'): + labels = { + 'application': 'spilo', + 'cluster-name': pg_cluster_name, + 'spilo-role': 'master', + } + + pods = self.api.core_v1.list_namespaced_pod( + namespace, label_selector=to_selector(labels)).items + + if pods: + return pods[0] + def wait_for_operator_pod_start(self): self. wait_for_pod_start("name=postgres-operator") # HACK operator must register CRD and/or Sync existing PG clusters after start up diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 9b8b51eb0..6aa1f6fa4 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -780,7 +780,13 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } - // sync connection pooler + // Sync connection pooler. Before actually doing sync reset lookup + // installation flag, since manifest updates could add another db which we + // need to process. In the future we may want to do this more careful and + // check which databases we need to process, but even repeating the whole + // installation process should be good enough. + c.ConnectionPooler.LookupFunction = false + if _, err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil { c.logger.Errorf("could not sync connection pooler: %v", err) diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 75e2d2097..f51b58a89 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -101,15 +101,20 @@ func (c *Cluster) databaseAccessDisabled() bool { } func (c *Cluster) initDbConn() error { - return c.initDbConnWithName("") -} - -func (c *Cluster) initDbConnWithName(dbname string) error { - c.setProcessName("initializing db connection") if c.pgDb != nil { return nil } + return c.initDbConnWithName("") +} + +// Worker function for connection initialization. This function does not check +// if the connection is already open, if it is then it will be overwritten. +// Callers need to make sure no connection is open, otherwise we could leak +// connections +func (c *Cluster) initDbConnWithName(dbname string) error { + c.setProcessName("initializing db connection") + var conn *sql.DB connstring := c.pgConnectionString(dbname) @@ -145,6 +150,12 @@ func (c *Cluster) initDbConnWithName(dbname string) error { conn.SetMaxOpenConns(1) conn.SetMaxIdleConns(-1) + if c.pgDb != nil { + msg := "Closing an existing connection before opening a new one to %s" + c.logger.Warningf(msg, dbname) + c.closeDbConn() + } + c.pgDb = conn return nil @@ -465,8 +476,11 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi // perform remote authentification. func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { var stmtBytes bytes.Buffer + c.logger.Info("Installing lookup function") + // Open a new connection if not yet done. This connection will be used only + // to get the list of databases, not for the actuall installation. if err := c.initDbConn(); err != nil { return fmt.Errorf("could not init database connection") } @@ -480,37 +494,41 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { } }() + // List of databases we failed to process. At the moment it function just + // like a flag to retry on the next sync, but in the future we may want to + // retry only necessary parts, so let's keep the list. + failedDatabases := []string{} currentDatabases, err := c.getDatabases() if err != nil { msg := "could not get databases to install pooler lookup function: %v" return fmt.Errorf(msg, err) } + // We've got the list of target databases, now close this connection to + // open a new one to every each of them. + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + templater := template.Must(template.New("sql").Parse(connectionPoolerLookup)) + params := TemplateParams{ + "pooler_schema": poolerSchema, + "pooler_user": poolerUser, + } + + if err := templater.Execute(&stmtBytes, params); err != nil { + msg := "could not prepare sql statement %+v: %v" + return fmt.Errorf(msg, params, err) + } for dbname := range currentDatabases { + if dbname == "template0" || dbname == "template1" { continue } - if err := c.initDbConnWithName(dbname); err != nil { - return fmt.Errorf("could not init database connection to %s", dbname) - } - c.logger.Infof("Install pooler lookup function into %s", dbname) - params := TemplateParams{ - "pooler_schema": poolerSchema, - "pooler_user": poolerUser, - } - - if err := templater.Execute(&stmtBytes, params); err != nil { - c.logger.Errorf("could not prepare sql statement %+v: %v", - params, err) - // process other databases - continue - } - // golang sql will do retries couple of times if pq driver reports // connections issues (driver.ErrBadConn), but since our query is // idempotent, we can retry in a view of other errors (e.g. due to @@ -520,7 +538,20 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { - if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { + + // At this moment we are not connected to any database + if err := c.initDbConnWithName(dbname); err != nil { + msg := "could not init database connection to %s" + return false, fmt.Errorf(msg, dbname) + } + defer func() { + if err := c.closeDbConn(); err != nil { + msg := "could not close database connection: %v" + c.logger.Errorf(msg, err) + } + }() + + if _, err = c.pgDb.Exec(stmtBytes.String()); err != nil { msg := fmt.Errorf("could not execute sql statement %s: %v", stmtBytes.String(), err) return false, msg @@ -533,15 +564,16 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { c.logger.Errorf("could not execute after retries %s: %v", stmtBytes.String(), err) // process other databases + failedDatabases = append(failedDatabases, dbname) continue } c.logger.Infof("pooler lookup function installed into %s", dbname) - if err := c.closeDbConn(); err != nil { - c.logger.Errorf("could not close database connection: %v", err) - } } - c.ConnectionPooler.LookupFunction = true + if len(failedDatabases) == 0 { + c.ConnectionPooler.LookupFunction = true + } + return nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index fef5b7b66..2a3959b1a 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -847,7 +847,9 @@ func (c *Cluster) syncConnectionPooler(oldSpec, var err error if c.ConnectionPooler == nil { - c.ConnectionPooler = &ConnectionPoolerObjects{} + c.ConnectionPooler = &ConnectionPoolerObjects{ + LookupFunction: false, + } } newNeedConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec) @@ -885,6 +887,11 @@ func (c *Cluster) syncConnectionPooler(oldSpec, if err = lookup(schema, user); err != nil { return NoSync, err } + } else { + // Lookup function installation seems to be a fragile point, so + // let's log for debugging if we skip it + msg := "Skip lookup function installation, old: %d, already installed %d" + c.logger.Debug(msg, oldNeedConnectionPooler, c.ConnectionPooler.LookupFunction) } if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {