diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 98e993957..30ca3d253 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -25,6 +25,10 @@ spec: foo: zalando preparedDatabases: bar: + defaultUsers: true + extensions: + pg_partman: public + pgcrypto: public schemas: data: {} history: diff --git a/manifests/postgres-operator.yaml b/manifests/postgres-operator.yaml index 7b74b80a0..3d9b230ce 100644 --- a/manifests/postgres-operator.yaml +++ b/manifests/postgres-operator.yaml @@ -15,7 +15,7 @@ spec: serviceAccountName: zalando-postgres-operator containers: - name: postgres-operator - image: registry.opensource.zalan.do/acid/postgres-operator:v1.2.0 + image: registry.opensource.zalan.do/acid/postgres-operator:v1.2.0-21-g7d9cfe9f-dirty imagePullPolicy: IfNotPresent resources: requests: diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 645a4cd4c..aa631cf5a 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -79,6 +79,8 @@ type PostgresqlList struct { // PreparedDatabase describes elements to be bootstrapped type PreparedDatabase struct { PreparedSchemas map[string]PreparedSchema `json:"schemas,omitempty"` + DefaultUsers bool `json:"defaultUsers,omitempty" defaults:"false"` + Extensions map[string]string `json:"extensions,omitempty"` } // PreparedSchema describes elements to be bootstrapped in the schema diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 7538579be..85a38fc51 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -794,8 +794,10 @@ func (c *Cluster) initPreparedDatabaseRoles() error { if err := c.initDefaultRoles(defaultRoles, "admin", preparedDbName); err != nil { return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err) } - if err := c.initDefaultRoles(defaultUsers, "admin", preparedDbName); err != nil { - return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err) + if preparedDB.DefaultUsers { + if err := c.initDefaultRoles(defaultUsers, "admin", preparedDbName); err != nil { + return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err) + } } // default roles per database schema diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index e8b053f3d..18dbf80ed 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -28,10 +28,14 @@ const ( getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` getSchemasSQL = `SELECT n.nspname AS dbschema FROM pg_catalog.pg_namespace n WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' ORDER BY 1` + getExtensionsSQL = `SELECT e.extname, n.nspname FROM pg_catalog.pg_extension e + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = e.extnamespace ORDER BY 1;` createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` createDatabaseSchemaSQL = `SET ROLE TO "%s"; CREATE SCHEMA "%s" AUTHORIZATION "%s"` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` + createExtensionSQL = `CREATE EXTENSION IF NOT EXISTS "%s" SCHEMA "%s"` + alterExtensionSQL = `ALTER EXTENSION "%s" SET SCHEMA "%s"` globalDefaultPrivilegesSQL = `SET ROLE TO "%s"; ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s"; @@ -363,3 +367,62 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin return result } + +// getExtension returns the list of current database extensions +// The caller is responsible for opening and closing the database connection +func (c *Cluster) getExtensions() (dbExtensions map[string]string, err error) { + var ( + rows *sql.Rows + ) + + if rows, err = c.pgDb.Query(getExtensionsSQL); err != nil { + return nil, fmt.Errorf("could not query database extensions: %v", err) + } + + defer func() { + if err2 := rows.Close(); err2 != nil { + if err != nil { + err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err) + } else { + err = fmt.Errorf("error when closing query cursor: %v", err2) + } + } + }() + + dbExtensions = make(map[string]string) + + for rows.Next() { + var extension, schema string + + if err = rows.Scan(&extension, &schema); err != nil { + return nil, fmt.Errorf("error when processing row: %v", err) + } + dbExtensions[extension] = schema + } + + return dbExtensions, err +} + +// executeCreateExtension creates new extension in the given schema. +// The caller is responsible for opening and closing the database connection. +func (c *Cluster) executeCreateExtension(extName, schemaName string) error { + return c.execCreateOrAlterExtension(extName, schemaName, createExtensionSQL, + "creating extension", "create extension") +} + +// executeAlterExtension changes the schema of the given extension. +// The caller is responsible for opening and closing the database connection. +func (c *Cluster) executeAlterExtension(extName, schemaName string) error { + return c.execCreateOrAlterExtension(extName, schemaName, alterExtensionSQL, + "changing schema for extension", "alter extension schema") +} + +func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doing, operation string) error { + + c.logger.Infof("%s %q schema %q", doing, extName, schemaName) + if _, err := c.pgDb.Exec(fmt.Sprintf(statement, extName, schemaName)); err != nil { + return fmt.Errorf("could not execute %s: %v", operation, err) + } + + return nil +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index b08d5f5d3..deb77cc58 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -573,6 +573,11 @@ func (c *Cluster) syncPreparedDatabases() error { if err := c.syncPreparedSchemas(preparedDbName, preparedSchemas); err != nil { return err } + + // install extensions + if err := c.syncExtensions(preparedDB.Extensions); err != nil { + return err + } } return nil @@ -610,6 +615,44 @@ func (c *Cluster) syncPreparedSchemas(datname string, preparedSchemas map[string return nil } +func (c *Cluster) syncExtensions(extensions map[string]string) error { + c.setProcessName("syncing extensions") + + createExtensions := make(map[string]string) + alterExtensions := make(map[string]string) + + currentExtensions, err := c.getExtensions() + if err != nil { + return fmt.Errorf("could not get current extensions: %v", err) + } + + for extName, newSchema := range extensions { + currentSchema, exists := currentExtensions[extName] + if !exists { + createExtensions[extName] = newSchema + } else if currentSchema != newSchema { + alterExtensions[extName] = newSchema + } + } + + if len(createExtensions)+len(alterExtensions) == 0 { + return nil + } + + for extName, schema := range createExtensions { + if err = c.executeCreateExtension(extName, schema); err != nil { + return err + } + } + for extName, schema := range alterExtensions { + if err = c.executeAlterExtension(extName, schema); err != nil { + return err + } + } + + return nil +} + func (c *Cluster) syncLogicalBackupJob() error { var ( job *batchv1beta1.CronJob