diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go index 642c388b2..450236449 100644 --- a/pkg/cluster/pg.go +++ b/pkg/cluster/pg.go @@ -16,12 +16,12 @@ import ( const ( getUserSQL = `SELECT a.rolname, COALESCE(a.rolpassword, ''), a.rolsuper, a.rolinherit, - a.rolcreaterole, a.rolcreatedb, a.rolcanlogin, + a.rolcreaterole, a.rolcreatedb, a.rolcanlogin, s.setconfig, ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_authid b ON (m.roleid = b.oid) WHERE m.member = a.oid) as memberof - FROM pg_catalog.pg_authid a + FROM pg_catalog.pg_authid a LEFT JOIN pg_db_role_setting s ON (a.oid = s.setrole AND s.setdatabase = 0::oid) WHERE a.rolname = ANY($1) ORDER BY 1;` @@ -85,6 +85,9 @@ func (c *Cluster) initDbConn() error { if finalerr != nil { return fmt.Errorf("could not init db connection: %v", finalerr) } + // Limit ourselves to a single connection and allow no idle connections. + conn.SetMaxOpenConns(1) + conn.SetMaxIdleConns(-1) c.pgDb = conn @@ -123,16 +126,26 @@ func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUser var ( rolname, rolpassword string rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool - memberof []string + roloptions, memberof []string ) err := rows.Scan(&rolname, &rolpassword, &rolsuper, &rolinherit, - &rolcreaterole, &rolcreatedb, &rolcanlogin, pq.Array(&memberof)) + &rolcreaterole, &rolcreatedb, &rolcanlogin, pq.Array(&roloptions), pq.Array(&memberof)) if err != nil { return nil, fmt.Errorf("error when processing user rows: %v", err) } flags := makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin) // XXX: the code assumes the password we get from pg_authid is always MD5 - users[rolname] = spec.PgUser{Name: rolname, Password: rolpassword, Flags: flags, MemberOf: memberof} + parameters := make(map[string]string) + for _, option := range roloptions { + fields := strings.Split(option, "=") + if len(fields) != 2 { + c.logger.Warningf("skipping malformed option: %q", option) + continue + } + parameters[fields[0]] = fields[1] + } + + users[rolname] = spec.PgUser{Name: rolname, Password: rolpassword, Flags: flags, MemberOf: memberof, Parameters: parameters} } return users, nil diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 7e9bfacfe..db98d5c2b 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -71,7 +71,7 @@ type Config struct { APIPort int `name:"api_port" default:"8080"` RingLogLines int `name:"ring_log_lines" default:"100"` ClusterHistoryEntries int `name:"cluster_history_entries" default:"1000"` - TeamAPIRoleConfiguration map[string]string `name:"team_api_role_configuration" default:"log_statement:'all'"` + TeamAPIRoleConfiguration map[string]string `name:"team_api_role_configuration" default:"log_statement:all"` PodTerminateGracePeriod time.Duration `name:"pod_terminate_grace_period" default:"5m"` } diff --git a/pkg/util/users/users.go b/pkg/util/users/users.go index 1f83706fe..9f622652a 100644 --- a/pkg/util/users/users.go +++ b/pkg/util/users/users.go @@ -60,7 +60,7 @@ func (s DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserMap, r.User.Name = newUser.Name reqs = append(reqs, r) } - if !reflect.DeepEqual(dbUser.Parameters, newUser.Parameters) { + if len(newUser.Parameters) > 0 && !reflect.DeepEqual(dbUser.Parameters, newUser.Parameters) { reqs = append(reqs, spec.PgSyncUserRequest{Kind: spec.PGSyncAlterSet, User: newUser}) } } @@ -95,7 +95,7 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) (err error) { queries := produceAlterRoleSetStmts(user) query := fmt.Sprintf(doBlockStmt, strings.Join(queries, ";")) - if _, err = db.Query(query); err != nil { + if err = runQueryDiscardResult(db, query); err != nil { err = fmt.Errorf("dB error: %v, query: %s", err, query) return } @@ -120,7 +120,7 @@ func (s DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err } query := fmt.Sprintf(createUserSQL, user.Name, strings.Join(userFlags, " "), userPassword) - _, err = db.Query(query) // TODO: Try several times + err = runQueryDiscardResult(db, query) // TODO: Try several times if err != nil { err = fmt.Errorf("dB error: %v, query: %s", err, query) return @@ -146,7 +146,7 @@ func (s DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err query := fmt.Sprintf(doBlockStmt, strings.Join(resultStmt, ";")) - _, err = db.Query(query) // TODO: Try several times + err = runQueryDiscardResult(db, query) // TODO: Try several times if err != nil { err = fmt.Errorf("dB error: %v query %s", err, query) return @@ -215,3 +215,11 @@ func quoteParameterValue(name, val string) string { } return fmt.Sprintf(`'%s'`, strings.Trim(val, " ")) } + +func runQueryDiscardResult(db *sql.DB, sql string) error { + rows, err := db.Query(sql) + if rows != nil { + rows.Close() + } + return err +}