package cluster import ( "bytes" "database/sql" "fmt" "net" "strings" "text/template" "time" "github.com/lib/pq" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/retryutil" ) const ( getUserSQL = `SELECT a.rolname, COALESCE(a.rolpassword, ''), a.rolsuper, a.rolinherit, a.rolcreaterole, a.rolcreatedb, a.rolcanlogin, s.setconfig, ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_authid b ON (m.roleid = b.oid) WHERE m.member = a.oid) as memberof FROM pg_catalog.pg_authid a LEFT JOIN pg_db_role_setting s ON (a.oid = s.setrole AND s.setdatabase = 0::oid) WHERE a.rolname = ANY($1) ORDER BY 1;` getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` connectionPoolerLookup = ` CREATE SCHEMA IF NOT EXISTS {{.pooler_schema}}; CREATE OR REPLACE FUNCTION {{.pooler_schema}}.user_lookup( in i_username text, out uname text, out phash text) RETURNS record AS $$ BEGIN SELECT usename, passwd FROM pg_catalog.pg_shadow WHERE usename = i_username INTO uname, phash; RETURN; END; $$ LANGUAGE plpgsql SECURITY DEFINER; REVOKE ALL ON FUNCTION {{.pooler_schema}}.user_lookup(text) FROM public, {{.pooler_user}}; GRANT EXECUTE ON FUNCTION {{.pooler_schema}}.user_lookup(text) TO {{.pooler_user}}; GRANT USAGE ON SCHEMA {{.pooler_schema}} TO {{.pooler_user}}; ` ) func (c *Cluster) pgConnectionString(dbname string) string { password := c.systemUsers[constants.SuperuserKeyName].Password if dbname == "" { dbname = "postgres" } return fmt.Sprintf("host='%s' dbname='%s' sslmode=require user='%s' password='%s' connect_timeout='%d'", fmt.Sprintf("%s.%s.svc.%s", c.Name, c.Namespace, c.OpConfig.ClusterDomain), dbname, c.systemUsers[constants.SuperuserKeyName].Name, strings.Replace(password, "$", "\\$", -1), constants.PostgresConnectTimeout/time.Second) } func (c *Cluster) databaseAccessDisabled() bool { if !c.OpConfig.EnableDBAccess { c.logger.Debugf("database access is disabled") } return !c.OpConfig.EnableDBAccess } func (c *Cluster) initDbConn() error { return c.initDbConnWithName("") } func (c *Cluster) initDbConnWithName(dbname string) error { c.setProcessName("initializing db connection") if c.pgDb != nil { return nil } var conn *sql.DB connstring := c.pgConnectionString(dbname) finalerr := retryutil.Retry(constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { var err error conn, err = sql.Open("postgres", connstring) if err == nil { err = conn.Ping() } if err == nil { return true, nil } if _, ok := err.(*net.OpError); ok { c.logger.Errorf("could not connect to PostgreSQL database: %v", err) return false, nil } if err2 := conn.Close(); err2 != nil { c.logger.Errorf("error when closing PostgreSQL connection after another error: %v", err) return false, err2 } return false, err }) if finalerr != nil { return fmt.Errorf("could not init db connection: %v", finalerr) } // Limit ourselves to a single connection and allow no idle connections. conn.SetMaxOpenConns(1) conn.SetMaxIdleConns(-1) c.pgDb = conn return nil } func (c *Cluster) connectionIsClosed() bool { return c.pgDb == nil } func (c *Cluster) closeDbConn() (err error) { c.setProcessName("closing db connection") if c.pgDb != nil { c.logger.Debug("closing database connection") if err = c.pgDb.Close(); err != nil { c.logger.Errorf("could not close database connection: %v", err) } c.pgDb = nil return nil } c.logger.Warning("attempted to close an empty db connection object") return nil } func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUserMap, err error) { c.setProcessName("reading users from the db") var rows *sql.Rows users = make(spec.PgUserMap) if rows, err = c.pgDb.Query(getUserSQL, pq.Array(userNames)); err != nil { return nil, fmt.Errorf("error when querying users: %v", err) } defer func() { if err2 := rows.Close(); err2 != nil { err = fmt.Errorf("error when closing query cursor: %v", err2) } }() for rows.Next() { var ( rolname, rolpassword string rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool roloptions, memberof []string ) err := rows.Scan(&rolname, &rolpassword, &rolsuper, &rolinherit, &rolcreaterole, &rolcreatedb, &rolcanlogin, pq.Array(&roloptions), pq.Array(&memberof)) if err != nil { return nil, fmt.Errorf("error when processing user rows: %v", err) } flags := makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin) // XXX: the code assumes the password we get from pg_authid is always MD5 parameters := make(map[string]string) for _, option := range roloptions { fields := strings.Split(option, "=") if len(fields) != 2 { c.logger.Warningf("skipping malformed option: %q", option) continue } parameters[fields[0]] = fields[1] } users[rolname] = spec.PgUser{Name: rolname, Password: rolpassword, Flags: flags, MemberOf: memberof, Parameters: parameters} } return users, nil } // getDatabases returns the map of current databases with owners // The caller is responsible for opening and closing the database connection func (c *Cluster) getDatabases() (dbs map[string]string, err error) { var ( rows *sql.Rows ) if rows, err = c.pgDb.Query(getDatabasesSQL); err != nil { return nil, fmt.Errorf("could not query database: %v", err) } defer func() { if err2 := rows.Close(); err2 != nil { if err != nil { err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err) } else { err = fmt.Errorf("error when closing query cursor: %v", err2) } } }() dbs = make(map[string]string) for rows.Next() { var datname, owner string if err = rows.Scan(&datname, &owner); err != nil { return nil, fmt.Errorf("error when processing row: %v", err) } dbs[datname] = owner } return dbs, err } // executeCreateDatabase creates new database with the given owner. // The caller is responsible for openinging and closing the database connection. func (c *Cluster) executeCreateDatabase(datname, owner string) error { return c.execCreateOrAlterDatabase(datname, owner, createDatabaseSQL, "creating database", "create database") } // executeCreateDatabase changes the owner of the given database. // The caller is responsible for openinging and closing the database connection. func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error { return c.execCreateOrAlterDatabase(datname, owner, alterDatabaseOwnerSQL, "changing owner for database", "alter database owner") } func (c *Cluster) execCreateOrAlterDatabase(datname, owner, statement, doing, operation string) error { if !c.databaseNameOwnerValid(datname, owner) { return nil } c.logger.Infof("%s %q owner %q", doing, datname, owner) if _, err := c.pgDb.Exec(fmt.Sprintf(statement, datname, owner)); err != nil { return fmt.Errorf("could not execute %s: %v", operation, err) } return nil } func (c *Cluster) databaseNameOwnerValid(datname, owner string) bool { if _, ok := c.pgUsers[owner]; !ok { c.logger.Infof("skipping creation of the %q database, user %q does not exist", datname, owner) return false } if !databaseNameRegexp.MatchString(datname) { c.logger.Infof("database %q has invalid name", datname) return false } return true } func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool) (result []string) { if rolsuper { result = append(result, constants.RoleFlagSuperuser) } if rolinherit { result = append(result, constants.RoleFlagInherit) } if rolcreaterole { result = append(result, constants.RoleFlagCreateRole) } if rolcreatedb { result = append(result, constants.RoleFlagCreateDB) } if rolcanlogin { result = append(result, constants.RoleFlagLogin) } return result } // Creates a connection pooler credentials lookup function in every database to // perform remote authentication. func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { var stmtBytes bytes.Buffer c.logger.Info("Installing lookup function") if err := c.initDbConn(); err != nil { return fmt.Errorf("could not init database connection") } defer func() { if c.connectionIsClosed() { return } if err := c.closeDbConn(); err != nil { c.logger.Errorf("could not close database connection: %v", err) } }() currentDatabases, err := c.getDatabases() if err != nil { msg := "could not get databases to install pooler lookup function: %v" return fmt.Errorf(msg, err) } templater := template.Must(template.New("sql").Parse(connectionPoolerLookup)) for dbname, _ := range currentDatabases { if dbname == "template0" || dbname == "template1" { continue } if err := c.initDbConnWithName(dbname); err != nil { return fmt.Errorf("could not init database connection to %s", dbname) } c.logger.Infof("Install pooler lookup function into %s", dbname) params := TemplateParams{ "pooler_schema": poolerSchema, "pooler_user": poolerUser, } if err := templater.Execute(&stmtBytes, params); err != nil { c.logger.Errorf("could not prepare sql statement %+v: %v", params, err) // process other databases continue } // golang sql will do retries couple of times if pq driver reports // connections issues (driver.ErrBadConn), but since our query is // idempotent, we can retry in a view of other errors (e.g. due to // failover a db is temporary in a read-only mode or so) to make sure // it was applied. execErr := retryutil.Retry( constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { msg := fmt.Errorf("could not execute sql statement %s: %v", stmtBytes.String(), err) return false, msg } return true, nil }) if execErr != nil { c.logger.Errorf("could not execute after retries %s: %v", stmtBytes.String(), err) // process other databases continue } c.logger.Infof("pooler lookup function installed into %s", dbname) if err := c.closeDbConn(); err != nil { c.logger.Errorf("could not close database connection: %v", err) } } c.ConnectionPooler.LookupFunction = true return nil }