Bugfix/close db connections (#78)

Open and close DB connections on-demand.

Previously, we used to leave the DB connection open while the
cluster was registered with the operator, potentially resutling
in dangled connections if the operator terminates abnormally.

Small refactoring around the role syncing code.
This commit is contained in:
Oleksii Kliukin 2017-08-10 10:10:00 +02:00 committed by GitHub
parent 8b58782a4a
commit f15f93f479
4 changed files with 45 additions and 38 deletions

View File

@ -143,6 +143,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
} }
} }
// initUsers populates c.systemUsers and c.pgUsers maps.
func (c *Cluster) initUsers() error { func (c *Cluster) initUsers() error {
c.initSystemUsers() c.initSystemUsers()
@ -204,7 +205,7 @@ func (c *Cluster) Create() error {
if err = c.initUsers(); err != nil { if err = c.initUsers(); err != nil {
return err return err
} }
c.logger.Infof("User secrets have been initialized") c.logger.Infof("Users have been initialized")
if err = c.applySecrets(); err != nil { if err = c.applySecrets(); err != nil {
return fmt.Errorf("could not create secrets: %v", err) return fmt.Errorf("could not create secrets: %v", err)
@ -226,11 +227,7 @@ func (c *Cluster) Create() error {
c.logger.Infof("pods are ready") c.logger.Infof("pods are ready")
if !(c.masterLess || c.databaseAccessDisabled()) { if !(c.masterLess || c.databaseAccessDisabled()) {
err = c.initDbConn() err = c.createRoles()
if err != nil {
return fmt.Errorf("could not init db connection: %v", err)
}
err = c.createUsers()
if err != nil { if err != nil {
return fmt.Errorf("could not create users: %v", err) return fmt.Errorf("could not create users: %v", err)
} }

View File

@ -46,6 +46,7 @@ func (c *Cluster) initDbConn() (err error) {
if err != nil { if err != nil {
return err return err
} }
c.logger.Debug("new database connection")
err = conn.Ping() err = conn.Ping()
if err != nil { if err != nil {
if err2 := conn.Close(); err2 != nil { if err2 := conn.Close(); err2 != nil {
@ -60,6 +61,15 @@ func (c *Cluster) initDbConn() (err error) {
return nil return nil
} }
func (c *Cluster) closeDbConn() (err error) {
if c.pgDb != nil {
c.logger.Debug("closing database connection")
return c.pgDb.Close()
}
c.logger.Warning("attempted to close an empty db connection object")
return nil
}
func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUserMap, err error) { func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUserMap, err error) {
var rows *sql.Rows var rows *sql.Rows
users = make(spec.PgUserMap) users = make(spec.PgUserMap)

View File

@ -415,10 +415,7 @@ func (c *Cluster) deleteSecret(secret *v1.Secret) error {
return err return err
} }
func (c *Cluster) createUsers() (err error) { func (c *Cluster) createRoles() (err error) {
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers // TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
reqs := c.userSyncStrategy.ProduceSyncRequests(nil, c.pgUsers) return c.syncRoles(false)
err = c.userSyncStrategy.ExecuteSyncRequests(reqs, c.pgDb)
return err
} }

View File

@ -3,6 +3,7 @@ package cluster
import ( import (
"fmt" "fmt"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
"github.com/zalando-incubator/postgres-operator/pkg/util/volumes" "github.com/zalando-incubator/postgres-operator/pkg/util/volumes"
@ -19,8 +20,14 @@ func (c *Cluster) Sync() error {
c.logger.Errorf("could not load resources: %v", err) c.logger.Errorf("could not load resources: %v", err)
} }
if err = c.initUsers(); err != nil {
return err
}
c.logger.Debugf("Syncing secrets") c.logger.Debugf("Syncing secrets")
if err := c.syncSecrets(); err != nil {
//TODO: mind the secrets of the deleted/new users
if err := c.applySecrets(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not sync secrets: %v", err) return fmt.Errorf("could not sync secrets: %v", err)
} }
@ -59,11 +66,8 @@ func (c *Cluster) Sync() error {
} }
if !c.databaseAccessDisabled() { if !c.databaseAccessDisabled() {
if err := c.initDbConn(); err != nil {
return fmt.Errorf("could not init db connection: %v", err)
}
c.logger.Debugf("Syncing roles") c.logger.Debugf("Syncing roles")
if err := c.syncRoles(); err != nil { if err := c.syncRoles(true); err != nil {
return fmt.Errorf("could not sync roles: %v", err) return fmt.Errorf("could not sync roles: %v", err)
} }
} }
@ -76,17 +80,6 @@ func (c *Cluster) Sync() error {
return nil return nil
} }
func (c *Cluster) syncSecrets() error {
//TODO: mind the secrets of the deleted/new users
if err := c.initUsers(); err != nil {
return err
}
err := c.applySecrets()
return err
}
func (c *Cluster) syncService(role postgresRole) error { func (c *Cluster) syncService(role postgresRole) error {
cSpec := c.Spec cSpec := c.Spec
if c.Service[role] == nil { if c.Service[role] == nil {
@ -193,21 +186,31 @@ func (c *Cluster) syncStatefulSet() error {
return nil return nil
} }
func (c *Cluster) syncRoles() error { func (c *Cluster) syncRoles(readFromDatabase bool) error {
var userNames []string var (
err error
dbUsers spec.PgUserMap
userNames []string
)
if err := c.initUsers(); err != nil { err = c.initDbConn()
return err
}
for _, u := range c.pgUsers {
userNames = append(userNames, u.Name)
}
dbUsers, err := c.readPgUsersFromDatabase(userNames)
if err != nil { if err != nil {
return fmt.Errorf("error getting users from the database: %v", err) return fmt.Errorf("could not init db connection: %v", err)
} }
defer c.closeDbConn()
if readFromDatabase {
for _, u := range c.pgUsers {
userNames = append(userNames, u.Name)
}
dbUsers, err = c.readPgUsersFromDatabase(userNames)
if err != nil {
return fmt.Errorf("error getting users from the database: %v", err)
}
}
pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(dbUsers, c.pgUsers) pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(dbUsers, c.pgUsers)
if err := c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil { if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil {
return fmt.Errorf("error executing sync statements: %v", err) return fmt.Errorf("error executing sync statements: %v", err)
} }
return nil return nil