Fix the connection leak and user options sync.
- fix the lack of closing the cursor for the query that returned no rows. - fix syncing of the user options, as previously those were not fetched from the database.
This commit is contained in:
parent
68bb3cd52d
commit
1ffe98ba9f
|
|
@ -16,12 +16,12 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
getUserSQL = `SELECT a.rolname, COALESCE(a.rolpassword, ''), a.rolsuper, a.rolinherit,
|
getUserSQL = `SELECT a.rolname, COALESCE(a.rolpassword, ''), a.rolsuper, a.rolinherit,
|
||||||
a.rolcreaterole, a.rolcreatedb, a.rolcanlogin,
|
a.rolcreaterole, a.rolcreatedb, a.rolcanlogin, s.setconfig,
|
||||||
ARRAY(SELECT b.rolname
|
ARRAY(SELECT b.rolname
|
||||||
FROM pg_catalog.pg_auth_members m
|
FROM pg_catalog.pg_auth_members m
|
||||||
JOIN pg_catalog.pg_authid b ON (m.roleid = b.oid)
|
JOIN pg_catalog.pg_authid b ON (m.roleid = b.oid)
|
||||||
WHERE m.member = a.oid) as memberof
|
WHERE m.member = a.oid) as memberof
|
||||||
FROM pg_catalog.pg_authid a
|
FROM pg_catalog.pg_authid a LEFT JOIN pg_db_role_setting s ON (a.oid = s.setrole AND s.setdatabase = 0::oid)
|
||||||
WHERE a.rolname = ANY($1)
|
WHERE a.rolname = ANY($1)
|
||||||
ORDER BY 1;`
|
ORDER BY 1;`
|
||||||
|
|
||||||
|
|
@ -85,6 +85,9 @@ func (c *Cluster) initDbConn() error {
|
||||||
if finalerr != nil {
|
if finalerr != nil {
|
||||||
return fmt.Errorf("could not init db connection: %v", finalerr)
|
return fmt.Errorf("could not init db connection: %v", finalerr)
|
||||||
}
|
}
|
||||||
|
// Limit ourselves to a single connection and allow no idle connections.
|
||||||
|
conn.SetMaxOpenConns(1)
|
||||||
|
conn.SetMaxIdleConns(-1)
|
||||||
|
|
||||||
c.pgDb = conn
|
c.pgDb = conn
|
||||||
|
|
||||||
|
|
@ -123,16 +126,26 @@ func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUser
|
||||||
var (
|
var (
|
||||||
rolname, rolpassword string
|
rolname, rolpassword string
|
||||||
rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool
|
rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool
|
||||||
memberof []string
|
roloptions, memberof []string
|
||||||
)
|
)
|
||||||
err := rows.Scan(&rolname, &rolpassword, &rolsuper, &rolinherit,
|
err := rows.Scan(&rolname, &rolpassword, &rolsuper, &rolinherit,
|
||||||
&rolcreaterole, &rolcreatedb, &rolcanlogin, pq.Array(&memberof))
|
&rolcreaterole, &rolcreatedb, &rolcanlogin, pq.Array(&roloptions), pq.Array(&memberof))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error when processing user rows: %v", err)
|
return nil, fmt.Errorf("error when processing user rows: %v", err)
|
||||||
}
|
}
|
||||||
flags := makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin)
|
flags := makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin)
|
||||||
// XXX: the code assumes the password we get from pg_authid is always MD5
|
// XXX: the code assumes the password we get from pg_authid is always MD5
|
||||||
users[rolname] = spec.PgUser{Name: rolname, Password: rolpassword, Flags: flags, MemberOf: memberof}
|
parameters := make(map[string]string)
|
||||||
|
for _, option := range roloptions {
|
||||||
|
fields := strings.Split(option, "=")
|
||||||
|
if len(fields) != 2 {
|
||||||
|
c.logger.Warningf("skipping malformed option: %q", option)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
parameters[fields[0]] = fields[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
users[rolname] = spec.PgUser{Name: rolname, Password: rolpassword, Flags: flags, MemberOf: memberof, Parameters: parameters}
|
||||||
}
|
}
|
||||||
|
|
||||||
return users, nil
|
return users, nil
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ type Config struct {
|
||||||
APIPort int `name:"api_port" default:"8080"`
|
APIPort int `name:"api_port" default:"8080"`
|
||||||
RingLogLines int `name:"ring_log_lines" default:"100"`
|
RingLogLines int `name:"ring_log_lines" default:"100"`
|
||||||
ClusterHistoryEntries int `name:"cluster_history_entries" default:"1000"`
|
ClusterHistoryEntries int `name:"cluster_history_entries" default:"1000"`
|
||||||
TeamAPIRoleConfiguration map[string]string `name:"team_api_role_configuration" default:"log_statement:'all'"`
|
TeamAPIRoleConfiguration map[string]string `name:"team_api_role_configuration" default:"log_statement:all"`
|
||||||
PodTerminateGracePeriod time.Duration `name:"pod_terminate_grace_period" default:"5m"`
|
PodTerminateGracePeriod time.Duration `name:"pod_terminate_grace_period" default:"5m"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,7 @@ func (s DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserMap,
|
||||||
r.User.Name = newUser.Name
|
r.User.Name = newUser.Name
|
||||||
reqs = append(reqs, r)
|
reqs = append(reqs, r)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(dbUser.Parameters, newUser.Parameters) {
|
if len(newUser.Parameters) > 0 && !reflect.DeepEqual(dbUser.Parameters, newUser.Parameters) {
|
||||||
reqs = append(reqs, spec.PgSyncUserRequest{Kind: spec.PGSyncAlterSet, User: newUser})
|
reqs = append(reqs, spec.PgSyncUserRequest{Kind: spec.PGSyncAlterSet, User: newUser})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -95,7 +95,7 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque
|
||||||
func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) (err error) {
|
func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) (err error) {
|
||||||
queries := produceAlterRoleSetStmts(user)
|
queries := produceAlterRoleSetStmts(user)
|
||||||
query := fmt.Sprintf(doBlockStmt, strings.Join(queries, ";"))
|
query := fmt.Sprintf(doBlockStmt, strings.Join(queries, ";"))
|
||||||
if _, err = db.Query(query); err != nil {
|
if err = runQueryDiscardResult(db, query); err != nil {
|
||||||
err = fmt.Errorf("dB error: %v, query: %s", err, query)
|
err = fmt.Errorf("dB error: %v, query: %s", err, query)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -120,7 +120,7 @@ func (s DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err
|
||||||
}
|
}
|
||||||
query := fmt.Sprintf(createUserSQL, user.Name, strings.Join(userFlags, " "), userPassword)
|
query := fmt.Sprintf(createUserSQL, user.Name, strings.Join(userFlags, " "), userPassword)
|
||||||
|
|
||||||
_, err = db.Query(query) // TODO: Try several times
|
err = runQueryDiscardResult(db, query) // TODO: Try several times
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("dB error: %v, query: %s", err, query)
|
err = fmt.Errorf("dB error: %v, query: %s", err, query)
|
||||||
return
|
return
|
||||||
|
|
@ -146,7 +146,7 @@ func (s DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err
|
||||||
|
|
||||||
query := fmt.Sprintf(doBlockStmt, strings.Join(resultStmt, ";"))
|
query := fmt.Sprintf(doBlockStmt, strings.Join(resultStmt, ";"))
|
||||||
|
|
||||||
_, err = db.Query(query) // TODO: Try several times
|
err = runQueryDiscardResult(db, query) // TODO: Try several times
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("dB error: %v query %s", err, query)
|
err = fmt.Errorf("dB error: %v query %s", err, query)
|
||||||
return
|
return
|
||||||
|
|
@ -215,3 +215,11 @@ func quoteParameterValue(name, val string) string {
|
||||||
}
|
}
|
||||||
return fmt.Sprintf(`'%s'`, strings.Trim(val, " "))
|
return fmt.Sprintf(`'%s'`, strings.Trim(val, " "))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runQueryDiscardResult(db *sql.DB, sql string) error {
|
||||||
|
rows, err := db.Query(sql)
|
||||||
|
if rows != nil {
|
||||||
|
rows.Close()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue