support extensions and defaultUsers for preparedDatabases
This commit is contained in:
		
							parent
							
								
									7d9cfe9f54
								
							
						
					
					
						commit
						0fa16ee1dd
					
				| 
						 | 
				
			
			@ -25,6 +25,10 @@ spec:
 | 
			
		|||
    foo: zalando
 | 
			
		||||
  preparedDatabases:
 | 
			
		||||
    bar:
 | 
			
		||||
      defaultUsers: true
 | 
			
		||||
      extensions:
 | 
			
		||||
        pg_partman: public
 | 
			
		||||
        pgcrypto: public
 | 
			
		||||
      schemas:
 | 
			
		||||
        data: {}
 | 
			
		||||
        history:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,7 +15,7 @@ spec:
 | 
			
		|||
      serviceAccountName: zalando-postgres-operator
 | 
			
		||||
      containers:
 | 
			
		||||
      - name: postgres-operator
 | 
			
		||||
        image: registry.opensource.zalan.do/acid/postgres-operator:v1.2.0
 | 
			
		||||
        image: registry.opensource.zalan.do/acid/postgres-operator:v1.2.0-21-g7d9cfe9f-dirty
 | 
			
		||||
        imagePullPolicy: IfNotPresent
 | 
			
		||||
        resources:
 | 
			
		||||
          requests:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -79,6 +79,8 @@ type PostgresqlList struct {
 | 
			
		|||
// PreparedDatabase describes elements to be bootstrapped
 | 
			
		||||
type PreparedDatabase struct {
 | 
			
		||||
	PreparedSchemas map[string]PreparedSchema `json:"schemas,omitempty"`
 | 
			
		||||
	DefaultUsers    bool                      `json:"defaultUsers,omitempty" defaults:"false"`
 | 
			
		||||
	Extensions      map[string]string         `json:"extensions,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PreparedSchema describes elements to be bootstrapped in the schema
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -794,9 +794,11 @@ func (c *Cluster) initPreparedDatabaseRoles() error {
 | 
			
		|||
		if err := c.initDefaultRoles(defaultRoles, "admin", preparedDbName); err != nil {
 | 
			
		||||
			return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err)
 | 
			
		||||
		}
 | 
			
		||||
		if preparedDB.DefaultUsers {
 | 
			
		||||
			if err := c.initDefaultRoles(defaultUsers, "admin", preparedDbName); err != nil {
 | 
			
		||||
				return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// default roles per database schema
 | 
			
		||||
		preparedSchemas := preparedDB.PreparedSchemas
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,10 +28,14 @@ const (
 | 
			
		|||
	getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;`
 | 
			
		||||
	getSchemasSQL   = `SELECT n.nspname AS dbschema FROM pg_catalog.pg_namespace n
 | 
			
		||||
			WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' ORDER BY 1`
 | 
			
		||||
	getExtensionsSQL = `SELECT e.extname, n.nspname FROM pg_catalog.pg_extension e
 | 
			
		||||
	        LEFT JOIN pg_catalog.pg_namespace n ON n.oid = e.extnamespace ORDER BY 1;`
 | 
			
		||||
 | 
			
		||||
	createDatabaseSQL       = `CREATE DATABASE "%s" OWNER "%s";`
 | 
			
		||||
	createDatabaseSchemaSQL = `SET ROLE TO "%s"; CREATE SCHEMA "%s" AUTHORIZATION "%s"`
 | 
			
		||||
	alterDatabaseOwnerSQL   = `ALTER DATABASE "%s" OWNER TO "%s";`
 | 
			
		||||
	createExtensionSQL      = `CREATE EXTENSION IF NOT EXISTS "%s" SCHEMA "%s"`
 | 
			
		||||
	alterExtensionSQL       = `ALTER EXTENSION "%s" SET SCHEMA "%s"`
 | 
			
		||||
 | 
			
		||||
	globalDefaultPrivilegesSQL = `SET ROLE TO "%s";
 | 
			
		||||
			ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s";
 | 
			
		||||
| 
						 | 
				
			
			@ -363,3 +367,62 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin
 | 
			
		|||
 | 
			
		||||
	return result
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getExtension returns the list of current database extensions
 | 
			
		||||
// The caller is responsible for opening and closing the database connection
 | 
			
		||||
func (c *Cluster) getExtensions() (dbExtensions map[string]string, err error) {
 | 
			
		||||
	var (
 | 
			
		||||
		rows *sql.Rows
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	if rows, err = c.pgDb.Query(getExtensionsSQL); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("could not query database extensions: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err2 := rows.Close(); err2 != nil {
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err)
 | 
			
		||||
			} else {
 | 
			
		||||
				err = fmt.Errorf("error when closing query cursor: %v", err2)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	dbExtensions = make(map[string]string)
 | 
			
		||||
 | 
			
		||||
	for rows.Next() {
 | 
			
		||||
		var extension, schema string
 | 
			
		||||
 | 
			
		||||
		if err = rows.Scan(&extension, &schema); err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("error when processing row: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		dbExtensions[extension] = schema
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return dbExtensions, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// executeCreateExtension creates new extension in the given schema.
 | 
			
		||||
// The caller is responsible for opening and closing the database connection.
 | 
			
		||||
func (c *Cluster) executeCreateExtension(extName, schemaName string) error {
 | 
			
		||||
	return c.execCreateOrAlterExtension(extName, schemaName, createExtensionSQL,
 | 
			
		||||
		"creating extension", "create extension")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// executeAlterExtension changes the schema of the given extension.
 | 
			
		||||
// The caller is responsible for opening and closing the database connection.
 | 
			
		||||
func (c *Cluster) executeAlterExtension(extName, schemaName string) error {
 | 
			
		||||
	return c.execCreateOrAlterExtension(extName, schemaName, alterExtensionSQL,
 | 
			
		||||
		"changing schema for extension", "alter extension schema")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doing, operation string) error {
 | 
			
		||||
 | 
			
		||||
	c.logger.Infof("%s %q schema %q", doing, extName, schemaName)
 | 
			
		||||
	if _, err := c.pgDb.Exec(fmt.Sprintf(statement, extName, schemaName)); err != nil {
 | 
			
		||||
		return fmt.Errorf("could not execute %s: %v", operation, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -573,6 +573,11 @@ func (c *Cluster) syncPreparedDatabases() error {
 | 
			
		|||
		if err := c.syncPreparedSchemas(preparedDbName, preparedSchemas); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// install extensions
 | 
			
		||||
		if err := c.syncExtensions(preparedDB.Extensions); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
| 
						 | 
				
			
			@ -610,6 +615,44 @@ func (c *Cluster) syncPreparedSchemas(datname string, preparedSchemas map[string
 | 
			
		|||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) syncExtensions(extensions map[string]string) error {
 | 
			
		||||
	c.setProcessName("syncing extensions")
 | 
			
		||||
 | 
			
		||||
	createExtensions := make(map[string]string)
 | 
			
		||||
	alterExtensions := make(map[string]string)
 | 
			
		||||
 | 
			
		||||
	currentExtensions, err := c.getExtensions()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("could not get current extensions: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for extName, newSchema := range extensions {
 | 
			
		||||
		currentSchema, exists := currentExtensions[extName]
 | 
			
		||||
		if !exists {
 | 
			
		||||
			createExtensions[extName] = newSchema
 | 
			
		||||
		} else if currentSchema != newSchema {
 | 
			
		||||
			alterExtensions[extName] = newSchema
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(createExtensions)+len(alterExtensions) == 0 {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for extName, schema := range createExtensions {
 | 
			
		||||
		if err = c.executeCreateExtension(extName, schema); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for extName, schema := range alterExtensions {
 | 
			
		||||
		if err = c.executeAlterExtension(extName, schema); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) syncLogicalBackupJob() error {
 | 
			
		||||
	var (
 | 
			
		||||
		job        *batchv1beta1.CronJob
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue