From ce960e892a97537a0dc4f08679766ff21d3054ef Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Thu, 2 Nov 2017 17:46:33 +0100 Subject: [PATCH] Create new databases and change owners of existing ones during sync. (#153) * Create new databases and change owners of existing ones during sync. --- pkg/cluster/cluster.go | 4 +- pkg/cluster/pg.go | 87 ++++++++++++++++++---------------------- pkg/cluster/resources.go | 24 +++++++++++ pkg/cluster/sync.go | 52 ++++++++++++++++++++++++ 4 files changed, 116 insertions(+), 51 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d4af4318f..3f7b38007 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -255,12 +255,12 @@ func (c *Cluster) Create() error { if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) } + c.logger.Infof("users have been successfully created") if err = c.createDatabases(); err != nil { return fmt.Errorf("could not create databases: %v", err) } - - c.logger.Infof("users have been successfully created") + c.logger.Infof("databases have been successfully created") } else { if c.masterLess { c.logger.Warnln("cluster is masterless") diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go index 9fbd92f2a..642c388b2 100644 --- a/pkg/cluster/pg.go +++ b/pkg/cluster/pg.go @@ -25,8 +25,9 @@ const ( WHERE a.rolname = ANY($1) ORDER BY 1;` - getDatabasesSQL = `SELECT datname, a.rolname AS owner FROM pg_database d INNER JOIN pg_authid a ON a.oid = d.datdba;` - createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` + getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` + createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` + alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` ) func (c *Cluster) pgConnectionString() string { @@ -137,6 +138,8 @@ func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUser return users, nil } +// getDatabases returns the map of current databases with owners +// The caller is responsible for opening and closing the database connection func (c *Cluster) getDatabases() (map[string]string, error) { var ( rows *sql.Rows @@ -144,15 +147,6 @@ func (c *Cluster) getDatabases() (map[string]string, error) { ) dbs := make(map[string]string) - if err = c.initDbConn(); err != nil { - return nil, fmt.Errorf("could not init db connection") - } - defer func() { - if err = c.closeDbConn(); err != nil { - c.logger.Errorf("could not close db connection: %v", err) - } - }() - if rows, err = c.pgDb.Query(getDatabasesSQL); err != nil { return nil, fmt.Errorf("could not query database: %v", err) } @@ -176,51 +170,46 @@ func (c *Cluster) getDatabases() (map[string]string, error) { return dbs, nil } -func (c *Cluster) createDatabases() error { - c.setProcessName("creating databases") - - newDbs := c.Spec.Databases - curDbs, err := c.getDatabases() - if err != nil { - return fmt.Errorf("could not get current databases: %v", err) - } - for datname := range curDbs { - delete(newDbs, datname) - } - - if len(newDbs) == 0 { +// executeCreateDatabase creates new database with the given owner. +// The caller is responsible for openinging and closing the database connection. +func (c *Cluster) executeCreateDatabase(datname, owner string) error { + if !c.databaseNameOwnerValid(datname, owner) { return nil } + c.logger.Infof("creating database %q with owner %q", datname, owner) - if err = c.initDbConn(); err != nil { - return fmt.Errorf("could not init database connection") + if _, err := c.pgDb.Query(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil { + return fmt.Errorf("could not execute create database: %v", err) } - defer func() { - if err = c.closeDbConn(); err != nil { - c.logger.Errorf("could not close database connection: %v", err) - } - }() - - for datname, owner := range newDbs { - if _, ok := c.pgUsers[owner]; !ok { - c.logger.Infof("skipping creation of the %q database, user %q does not exist", datname, owner) - continue - } - - if !databaseNameRegexp.MatchString(datname) { - c.logger.Infof("database %q has invalid name", datname) - continue - } - c.logger.Infof("creating database %q with owner %q", datname, owner) - - if _, err = c.pgDb.Query(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil { - return fmt.Errorf("could not query database: %v", err) - } - } - return nil } +// executeCreateDatabase changes the owner of the given database. +// The caller is responsible for openinging and closing the database connection. +func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error { + if !c.databaseNameOwnerValid(datname, owner) { + return nil + } + c.logger.Infof("changing database %q owner to %q", datname, owner) + if _, err := c.pgDb.Query(fmt.Sprintf(alterDatabaseOwnerSQL, datname, owner)); err != nil { + return fmt.Errorf("could not execute alter database owner: %v", err) + } + return nil +} + +func (c *Cluster) databaseNameOwnerValid(datname, owner string) bool { + if _, ok := c.pgUsers[owner]; !ok { + c.logger.Infof("skipping creation of the %q database, user %q does not exist", datname, owner) + return false + } + + if !databaseNameRegexp.MatchString(datname) { + c.logger.Infof("database %q has invalid name", datname) + return false + } + return true +} + func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool) (result []string) { if rolsuper { result = append(result, constants.RoleFlagSuperuser) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index ce8e80f56..273bd90d0 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -557,3 +557,27 @@ func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet { func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { return c.PodDisruptionBudget } + +func (c *Cluster) createDatabases() error { + c.setProcessName("creating databases") + + if len(c.Spec.Databases) == 0 { + return nil + } + + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init database connection") + } + defer func() { + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + + for datname, owner := range c.Spec.Databases { + if err := c.executeCreateDatabase(datname, owner); err != nil { + return err + } + } + return nil +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 97a539e8f..6a9b681bc 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -90,6 +90,11 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { err = fmt.Errorf("could not sync roles: %v", err) return } + c.logger.Debugf("syncing databases") + if err = c.syncDatabases(); err != nil { + err = fmt.Errorf("could not sync databases: %v", err) + return + } } c.logger.Debugf("syncing persistent volumes") @@ -292,3 +297,50 @@ func (c *Cluster) samePDBWith(pdb *policybeta1.PodDisruptionBudget) (match bool, return } + +func (c *Cluster) syncDatabases() error { + c.setProcessName("syncing databases") + + createDatabases := make(map[string]string) + alterOwnerDatabases := make(map[string]string) + + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init database connection") + } + defer func() { + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + + currentDatabases, err := c.getDatabases() + if err != nil { + return fmt.Errorf("could not get current databases: %v", err) + } + + for datname, newOwner := range c.Spec.Databases { + currentOwner, exists := currentDatabases[datname] + if !exists { + createDatabases[datname] = newOwner + } else if currentOwner != newOwner { + alterOwnerDatabases[datname] = newOwner + } + } + + if len(createDatabases)+len(alterOwnerDatabases) == 0 { + return nil + } + + for datname, owner := range createDatabases { + if err = c.executeCreateDatabase(datname, owner); err != nil { + return err + } + } + for datname, owner := range alterOwnerDatabases { + if err = c.executeAlterDatabaseOwner(datname, owner); err != nil { + return err + } + } + + return nil +}