PreparedDatabases with default role setup
This commit is contained in:
		
							parent
							
								
									cf97ebb2b8
								
							
						
					
					
						commit
						1355fdcd8d
					
				| 
						 | 
				
			
			@ -3,7 +3,7 @@ kind: postgresql
 | 
			
		|||
metadata:
 | 
			
		||||
  name: acid-test-cluster
 | 
			
		||||
spec:
 | 
			
		||||
  dockerImage: registry.opensource.zalan.do/acid/spilo-11:1.6-p1
 | 
			
		||||
  dockerImage: registry.opensource.zalan.do/acid/spilo-11:1.5-p9
 | 
			
		||||
  initContainers:
 | 
			
		||||
  - name: date
 | 
			
		||||
    image: busybox
 | 
			
		||||
| 
						 | 
				
			
			@ -23,20 +23,26 @@ spec:
 | 
			
		|||
  - 127.0.0.1/32
 | 
			
		||||
  databases:
 | 
			
		||||
    foo: zalando
 | 
			
		||||
  preparedDatabases:
 | 
			
		||||
    ab_db:
 | 
			
		||||
      schemas:
 | 
			
		||||
        data:
 | 
			
		||||
        history:
 | 
			
		||||
          defaultRoles: false
 | 
			
		||||
 | 
			
		||||
# Expert section
 | 
			
		||||
 | 
			
		||||
  enableShmVolume: true
 | 
			
		||||
# spiloFSGroup: 103
 | 
			
		||||
  postgresql:
 | 
			
		||||
    version: "11"
 | 
			
		||||
    version: "10"
 | 
			
		||||
    parameters:
 | 
			
		||||
      shared_buffers: "32MB"
 | 
			
		||||
      max_connections: "10"
 | 
			
		||||
      log_statement: "all"
 | 
			
		||||
  resources:
 | 
			
		||||
    requests:
 | 
			
		||||
      cpu: 10m
 | 
			
		||||
      cpu: 100m
 | 
			
		||||
      memory: 100Mi
 | 
			
		||||
    limits:
 | 
			
		||||
      cpu: 300m
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,5 +15,7 @@ spec:
 | 
			
		|||
    foo_user: []  # role for application foo
 | 
			
		||||
  databases:
 | 
			
		||||
    foo: zalando  # dbname: owner
 | 
			
		||||
  preparedDatabases:
 | 
			
		||||
    bar:
 | 
			
		||||
  postgresql:
 | 
			
		||||
    version: "11"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -51,6 +51,7 @@ type PostgresSpec struct {
 | 
			
		|||
	Clone                 CloneDescription            `json:"clone"`
 | 
			
		||||
	ClusterName           string                      `json:"-"`
 | 
			
		||||
	Databases             map[string]string           `json:"databases,omitempty"`
 | 
			
		||||
	PreparedDatabases     map[string]PreparedDatabase `json:"preparedDatabases,omitempty"`
 | 
			
		||||
	Tolerations           []v1.Toleration             `json:"tolerations,omitempty"`
 | 
			
		||||
	Sidecars              []Sidecar                   `json:"sidecars,omitempty"`
 | 
			
		||||
	InitContainers        []v1.Container              `json:"initContainers,omitempty"`
 | 
			
		||||
| 
						 | 
				
			
			@ -75,6 +76,17 @@ type PostgresqlList struct {
 | 
			
		|||
	Items []Postgresql `json:"items"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PreparedDatabase describes elements to be bootstrapped (schemas, prod-prefix)
 | 
			
		||||
type PreparedDatabase struct {
 | 
			
		||||
	PreparedSchemas map[string]PreparedSchema `json:"schemas,omitempty"`
 | 
			
		||||
	Prod            bool                      `json:"prod,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PreparedSchema describes elements to be bootstrapped in the schema
 | 
			
		||||
type PreparedSchema struct {
 | 
			
		||||
	DefaultRoles *bool `json:"defaultRoles,omitempty" defaults:"true"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MaintenanceWindow describes the time window when the operator is allowed to do maintenance on a cluster.
 | 
			
		||||
type MaintenanceWindow struct {
 | 
			
		||||
	Everyday  bool
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -482,6 +482,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) {
 | 
			
		|||
			(*out)[key] = val
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if in.PreparedDatabases != nil {
 | 
			
		||||
		in, out := &in.PreparedDatabases, &out.PreparedDatabases
 | 
			
		||||
		*out = make(map[string]PreparedDatabase, len(*in))
 | 
			
		||||
		for key, val := range *in {
 | 
			
		||||
			(*out)[key] = *val.DeepCopy()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if in.Tolerations != nil {
 | 
			
		||||
		in, out := &in.Tolerations, &out.Tolerations
 | 
			
		||||
		*out = make([]corev1.Toleration, len(*in))
 | 
			
		||||
| 
						 | 
				
			
			@ -649,6 +656,50 @@ func (in *PostgresqlParam) DeepCopy() *PostgresqlParam {
 | 
			
		|||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 | 
			
		||||
func (in *PreparedDatabase) DeepCopyInto(out *PreparedDatabase) {
 | 
			
		||||
	*out = *in
 | 
			
		||||
	if in.PreparedSchemas != nil {
 | 
			
		||||
		in, out := &in.PreparedSchemas, &out.PreparedSchemas
 | 
			
		||||
		*out = make(map[string]PreparedSchema, len(*in))
 | 
			
		||||
		for key, val := range *in {
 | 
			
		||||
			(*out)[key] = *val.DeepCopy()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PreparedDatabase.
 | 
			
		||||
func (in *PreparedDatabase) DeepCopy() *PreparedDatabase {
 | 
			
		||||
	if in == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	out := new(PreparedDatabase)
 | 
			
		||||
	in.DeepCopyInto(out)
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 | 
			
		||||
func (in *PreparedSchema) DeepCopyInto(out *PreparedSchema) {
 | 
			
		||||
	*out = *in
 | 
			
		||||
	if in.DefaultRoles != nil {
 | 
			
		||||
		in, out := &in.DefaultRoles, &out.DefaultRoles
 | 
			
		||||
		*out = new(bool)
 | 
			
		||||
		**out = **in
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PreparedSchema.
 | 
			
		||||
func (in *PreparedSchema) DeepCopy() *PreparedSchema {
 | 
			
		||||
	if in == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	out := new(PreparedSchema)
 | 
			
		||||
	in.DeepCopyInto(out)
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 | 
			
		||||
func (in *ResourceDescription) DeepCopyInto(out *ResourceDescription) {
 | 
			
		||||
	*out = *in
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -194,6 +194,10 @@ func (c *Cluster) initUsers() error {
 | 
			
		|||
		return fmt.Errorf("could not init infrastructure roles: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := c.initPreparedDatabaseRoles(); err != nil {
 | 
			
		||||
		return fmt.Errorf("could not init default users: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := c.initRobotUsers(); err != nil {
 | 
			
		||||
		return fmt.Errorf("could not init robot users: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -298,6 +302,9 @@ func (c *Cluster) Create() error {
 | 
			
		|||
		if err = c.syncDatabases(); err != nil {
 | 
			
		||||
			return fmt.Errorf("could not sync databases: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if err = c.syncPreparedDatabases(); err != nil {
 | 
			
		||||
			return fmt.Errorf("could not sync prepared databases: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		c.logger.Infof("databases have been successfully created")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -641,6 +648,13 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
 | 
			
		|||
				updateFailed = true
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) {
 | 
			
		||||
			c.logger.Infof("syncing prepared databases")
 | 
			
		||||
			if err := c.syncPreparedDatabases(); err != nil {
 | 
			
		||||
				c.logger.Errorf("could not sync prepared databases: %v", err)
 | 
			
		||||
				updateFailed = true
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
| 
						 | 
				
			
			@ -762,6 +776,68 @@ func (c *Cluster) initSystemUsers() {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) initPreparedDatabaseRoles() error {
 | 
			
		||||
 | 
			
		||||
	for preparedDbName, preparedDB := range c.Spec.PreparedDatabases {
 | 
			
		||||
		if err := c.initDefaultRoles("admin", preparedDbName); err != nil {
 | 
			
		||||
			return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err)
 | 
			
		||||
		}
 | 
			
		||||
		preparedSchemas := preparedDB.PreparedSchemas
 | 
			
		||||
		if len(preparedDB.PreparedSchemas) == 0 {
 | 
			
		||||
			preparedSchemas = map[string]acidv1.PreparedSchema{"data": {DefaultRoles: util.True()}}
 | 
			
		||||
		}
 | 
			
		||||
		for preparedSchemaName, preparedSchema := range preparedSchemas {
 | 
			
		||||
			if preparedSchema.DefaultRoles == nil || *preparedSchema.DefaultRoles {
 | 
			
		||||
				if err := c.initDefaultRoles(preparedDbName+"_owner", preparedDbName+"_"+preparedSchemaName); err != nil {
 | 
			
		||||
					return fmt.Errorf("could not initialize default roles for database schema %s: %v", preparedSchemaName, err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) initDefaultRoles(admin, prefix string) error {
 | 
			
		||||
	defaultRoles := map[string]string{
 | 
			
		||||
		"_owner": "", "_reader": "", "_writer": "_reader",
 | 
			
		||||
		"_owner_user": "_owner", "_reader_user": "_reader", "_writer_user": "_writer"}
 | 
			
		||||
 | 
			
		||||
	for defaultRole, inherits := range defaultRoles {
 | 
			
		||||
 | 
			
		||||
		roleName := prefix + defaultRole
 | 
			
		||||
		flags := []string{constants.RoleFlagNoLogin}
 | 
			
		||||
		memberOf := make([]string, 0)
 | 
			
		||||
		adminRole := ""
 | 
			
		||||
		if defaultRole[len(defaultRole)-5:] == "_user" {
 | 
			
		||||
			flags = []string{constants.RoleFlagLogin}
 | 
			
		||||
		} else {
 | 
			
		||||
			if defaultRole == "_owner" {
 | 
			
		||||
				adminRole = admin
 | 
			
		||||
			} else {
 | 
			
		||||
				adminRole = prefix + "_owner"
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if inherits != "" {
 | 
			
		||||
			memberOf = append(memberOf, prefix+inherits)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		newRole := spec.PgUser{
 | 
			
		||||
			Origin:    spec.RoleOriginBootstrap,
 | 
			
		||||
			Name:      roleName,
 | 
			
		||||
			Password:  util.RandomPassword(constants.PasswordLength),
 | 
			
		||||
			Flags:     flags,
 | 
			
		||||
			MemberOf:  memberOf,
 | 
			
		||||
			AdminRole: adminRole,
 | 
			
		||||
		}
 | 
			
		||||
		if currentRole, present := c.pgUsers[roleName]; present {
 | 
			
		||||
			c.pgUsers[roleName] = c.resolveNameConflict(¤tRole, &newRole)
 | 
			
		||||
		} else {
 | 
			
		||||
			c.pgUsers[roleName] = newRole
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) initRobotUsers() error {
 | 
			
		||||
	for username, userFlags := range c.Spec.Users {
 | 
			
		||||
		if !isValidUsername(username) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -26,15 +26,21 @@ const (
 | 
			
		|||
	 ORDER BY 1;`
 | 
			
		||||
 | 
			
		||||
	getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;`
 | 
			
		||||
	getSchemasSQL   = `SELECT n.nspname AS dbschema FROM pg_catalog.pg_namespace n
 | 
			
		||||
			WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' ORDER BY 1`
 | 
			
		||||
 | 
			
		||||
	createDatabaseSQL       = `CREATE DATABASE "%s" OWNER "%s";`
 | 
			
		||||
	createDatabaseSchemaSQL = `SET ROLE TO "%s"; CREATE SCHEMA "%s" AUTHORIZATION "%s"`
 | 
			
		||||
	alterDatabaseOwnerSQL   = `ALTER DATABASE "%s" OWNER TO "%s";`
 | 
			
		||||
	defaultPrivilegesSQL    = `SET ROLE TO "%s"; ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT INSERT, UPDATE, DELETE ON TABLES TO "%s"; ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT SELECT ON TABLES TO "%s";`
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) pgConnectionString() string {
 | 
			
		||||
func (c *Cluster) pgConnectionString(dbname string) string {
 | 
			
		||||
	password := c.systemUsers[constants.SuperuserKeyName].Password
 | 
			
		||||
 | 
			
		||||
	return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user='%s' password='%s' connect_timeout='%d'",
 | 
			
		||||
	return fmt.Sprintf("host='%s' dbname='%s' sslmode=require user='%s' password='%s' connect_timeout='%d'",
 | 
			
		||||
		fmt.Sprintf("%s.%s.svc.%s", c.Name, c.Namespace, c.OpConfig.ClusterDomain),
 | 
			
		||||
		dbname,
 | 
			
		||||
		c.systemUsers[constants.SuperuserKeyName].Name,
 | 
			
		||||
		strings.Replace(password, "$", "\\$", -1),
 | 
			
		||||
		constants.PostgresConnectTimeout/time.Second)
 | 
			
		||||
| 
						 | 
				
			
			@ -48,14 +54,14 @@ func (c *Cluster) databaseAccessDisabled() bool {
 | 
			
		|||
	return !c.OpConfig.EnableDBAccess
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) initDbConn() error {
 | 
			
		||||
func (c *Cluster) initDbConn(dbname string) error {
 | 
			
		||||
	c.setProcessName("initializing db connection")
 | 
			
		||||
	if c.pgDb != nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var conn *sql.DB
 | 
			
		||||
	connstring := c.pgConnectionString()
 | 
			
		||||
	connstring := c.pgConnectionString(dbname)
 | 
			
		||||
 | 
			
		||||
	finalerr := retryutil.Retry(constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout,
 | 
			
		||||
		func() (bool, error) {
 | 
			
		||||
| 
						 | 
				
			
			@ -100,9 +106,9 @@ func (c *Cluster) closeDbConn() (err error) {
 | 
			
		|||
		c.logger.Debug("closing database connection")
 | 
			
		||||
		if err = c.pgDb.Close(); err != nil {
 | 
			
		||||
			c.logger.Errorf("could not close database connection: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		} else {
 | 
			
		||||
			c.pgDb = nil
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	c.logger.Warning("attempted to close an empty db connection object")
 | 
			
		||||
| 
						 | 
				
			
			@ -187,14 +193,14 @@ func (c *Cluster) getDatabases() (dbs map[string]string, err error) {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
// executeCreateDatabase creates new database with the given owner.
 | 
			
		||||
// The caller is responsible for openinging and closing the database connection.
 | 
			
		||||
// The caller is responsible for opening and closing the database connection.
 | 
			
		||||
func (c *Cluster) executeCreateDatabase(datname, owner string) error {
 | 
			
		||||
	return c.execCreateOrAlterDatabase(datname, owner, createDatabaseSQL,
 | 
			
		||||
		"creating database", "create database")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// executeCreateDatabase changes the owner of the given database.
 | 
			
		||||
// The caller is responsible for openinging and closing the database connection.
 | 
			
		||||
// executeAlterDatabaseOwner changes the owner of the given database.
 | 
			
		||||
// The caller is responsible for opening and closing the database connection.
 | 
			
		||||
func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error {
 | 
			
		||||
	return c.execCreateOrAlterDatabase(datname, owner, alterDatabaseOwnerSQL,
 | 
			
		||||
		"changing owner for database", "alter database owner")
 | 
			
		||||
| 
						 | 
				
			
			@ -224,6 +230,77 @@ func (c *Cluster) databaseNameOwnerValid(datname, owner string) bool {
 | 
			
		|||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getSchemas returns the list of current database schemas
 | 
			
		||||
// The caller is responsible for opening and closing the database connection
 | 
			
		||||
func (c *Cluster) getSchemas() (schemas []string, err error) {
 | 
			
		||||
	var (
 | 
			
		||||
		rows      *sql.Rows
 | 
			
		||||
		dbschemas []string
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	if rows, err = c.pgDb.Query(getSchemasSQL); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("could not query database schemas: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err2 := rows.Close(); err2 != nil {
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err)
 | 
			
		||||
			} else {
 | 
			
		||||
				err = fmt.Errorf("error when closing query cursor: %v", err2)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	for rows.Next() {
 | 
			
		||||
		var dbschema string
 | 
			
		||||
 | 
			
		||||
		if err = rows.Scan(&dbschema); err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("error when processing row: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		dbschemas = append(dbschemas, dbschema)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return dbschemas, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// executeCreateDatabaseSchema creates new database schema with the given owner.
 | 
			
		||||
// The caller is responsible for opening and closing the database connection.
 | 
			
		||||
func (c *Cluster) executeCreateDatabaseSchema(datname, schemaName, dbOwner string, schemaOwner string) error {
 | 
			
		||||
	return c.execCreateDatabaseSchema(datname, schemaName, dbOwner, schemaOwner, createDatabaseSchemaSQL,
 | 
			
		||||
		"creating database schema", "create database schema")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) execCreateDatabaseSchema(datname, schemaName, dbOwner, schemaOwner, statement, doing, operation string) error {
 | 
			
		||||
	if !c.databaseSchemaNameValid(schemaName) {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	c.logger.Infof("%s %q owner %q", doing, schemaName, schemaOwner)
 | 
			
		||||
	if _, err := c.pgDb.Exec(fmt.Sprintf(statement, dbOwner, schemaName, schemaOwner)); err != nil {
 | 
			
		||||
		return fmt.Errorf("could not execute %s: %v", operation, err)
 | 
			
		||||
	}
 | 
			
		||||
	c.execAlterDefaultPrivileges(schemaName, schemaOwner, datname)
 | 
			
		||||
	c.execAlterDefaultPrivileges(schemaName, schemaOwner, datname+"_"+schemaName)
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) execAlterDefaultPrivileges(schemaName, owner, rolePrefix string) error {
 | 
			
		||||
	if _, err := c.pgDb.Exec(fmt.Sprintf(defaultPrivilegesSQL, owner, schemaName, rolePrefix+"_writer", schemaName, rolePrefix+"_reader")); err != nil {
 | 
			
		||||
		return fmt.Errorf("could not alter default privileges for database schema: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) databaseSchemaNameValid(schemaName string) bool {
 | 
			
		||||
	if !databaseNameRegexp.MatchString(schemaName) {
 | 
			
		||||
		c.logger.Infof("database schema %q has invalid name", schemaName)
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool) (result []string) {
 | 
			
		||||
	if rolsuper {
 | 
			
		||||
		result = append(result, constants.RoleFlagSuperuser)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1145,6 +1145,13 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser)
 | 
			
		|||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//skip NOLOGIN users
 | 
			
		||||
	for _, flag := range pgUser.Flags {
 | 
			
		||||
		if flag == constants.RoleFlagNoLogin {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	username := pgUser.Name
 | 
			
		||||
	secret := v1.Secret{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -101,6 +101,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
 | 
			
		|||
			err = fmt.Errorf("could not sync databases: %v", err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		c.logger.Debugf("syncing database schemas")
 | 
			
		||||
		if err = c.syncPreparedDatabases(); err != nil {
 | 
			
		||||
			err = fmt.Errorf("could not sync database schemas: %v", err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
| 
						 | 
				
			
			@ -433,7 +438,7 @@ func (c *Cluster) syncRoles() (err error) {
 | 
			
		|||
		userNames []string
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	err = c.initDbConn()
 | 
			
		||||
	err = c.initDbConn("postgres")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("could not init db connection: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -490,7 +495,7 @@ func (c *Cluster) syncDatabases() error {
 | 
			
		|||
	createDatabases := make(map[string]string)
 | 
			
		||||
	alterOwnerDatabases := make(map[string]string)
 | 
			
		||||
 | 
			
		||||
	if err := c.initDbConn(); err != nil {
 | 
			
		||||
	if err := c.initDbConn("postgres"); err != nil {
 | 
			
		||||
		return fmt.Errorf("could not init database connection")
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
| 
						 | 
				
			
			@ -504,6 +509,13 @@ func (c *Cluster) syncDatabases() error {
 | 
			
		|||
		return fmt.Errorf("could not get current databases: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for preparedDatname := range c.Spec.PreparedDatabases {
 | 
			
		||||
		_, exists := currentDatabases[preparedDatname]
 | 
			
		||||
		if !exists {
 | 
			
		||||
			createDatabases[preparedDatname] = preparedDatname + "_owner"
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for datname, newOwner := range c.Spec.Databases {
 | 
			
		||||
		currentOwner, exists := currentDatabases[datname]
 | 
			
		||||
		if !exists {
 | 
			
		||||
| 
						 | 
				
			
			@ -531,6 +543,62 @@ func (c *Cluster) syncDatabases() error {
 | 
			
		|||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) syncPreparedDatabases() error {
 | 
			
		||||
	c.setProcessName("syncing prepared databases")
 | 
			
		||||
	for preparedDbName, preparedDB := range c.Spec.PreparedDatabases {
 | 
			
		||||
		preparedSchemas := preparedDB.PreparedSchemas
 | 
			
		||||
		if len(preparedDB.PreparedSchemas) == 0 {
 | 
			
		||||
			preparedSchemas = map[string]acidv1.PreparedSchema{"data": {DefaultRoles: util.True()}}
 | 
			
		||||
		}
 | 
			
		||||
		if err := c.syncPreparedSchemas(preparedDbName, preparedSchemas); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) syncPreparedSchemas(datname string, preparedSchemas map[string]acidv1.PreparedSchema) error {
 | 
			
		||||
	c.setProcessName("syncing prepared schemas")
 | 
			
		||||
 | 
			
		||||
	if err := c.initDbConn(datname); err != nil {
 | 
			
		||||
		return fmt.Errorf("could not init connection to database %s: %v", datname, err)
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err := c.closeDbConn(); err != nil {
 | 
			
		||||
			c.logger.Errorf("could not close database connection: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	currentSchemas, err := c.getSchemas()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("could not get current schemas: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var schemas []string
 | 
			
		||||
 | 
			
		||||
	for schema := range preparedSchemas {
 | 
			
		||||
		schemas = append(schemas, schema)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if createPreparedSchemas, equal := util.SubstractStringSlices(schemas, currentSchemas); !equal {
 | 
			
		||||
		for _, schemaName := range createPreparedSchemas {
 | 
			
		||||
			owner := "_owner"
 | 
			
		||||
			dbOwner := datname + owner
 | 
			
		||||
			if preparedSchemas[schemaName].DefaultRoles == nil || *preparedSchemas[schemaName].DefaultRoles {
 | 
			
		||||
				owner = datname + "_" + schemaName + owner
 | 
			
		||||
			} else {
 | 
			
		||||
				owner = dbOwner
 | 
			
		||||
			}
 | 
			
		||||
			if err = c.executeCreateDatabaseSchema(datname, schemaName, dbOwner, owner); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) syncLogicalBackupJob() error {
 | 
			
		||||
	var (
 | 
			
		||||
		job        *batchv1beta1.CronJob
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,9 +1,10 @@
 | 
			
		|||
package controller
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
 | 
			
		||||
	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -30,6 +30,7 @@ const (
 | 
			
		|||
	RoleOriginInfrastructure
 | 
			
		||||
	RoleOriginTeamsAPI
 | 
			
		||||
	RoleOriginSystem
 | 
			
		||||
	RoleOriginBootstrap
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type syncUserOperation int
 | 
			
		||||
| 
						 | 
				
			
			@ -178,6 +179,8 @@ func (r RoleOrigin) String() string {
 | 
			
		|||
		return "teams API role"
 | 
			
		||||
	case RoleOriginSystem:
 | 
			
		||||
		return "system role"
 | 
			
		||||
	case RoleOriginBootstrap:
 | 
			
		||||
		return "bootstrapped role"
 | 
			
		||||
	default:
 | 
			
		||||
		panic(fmt.Sprintf("bogus role origin value %d", r))
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -74,25 +74,43 @@ func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserM
 | 
			
		|||
 | 
			
		||||
// ExecuteSyncRequests makes actual database changes from the requests passed in its arguments.
 | 
			
		||||
func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserRequest, db *sql.DB) error {
 | 
			
		||||
	var rr []spec.PgSyncUserRequest
 | 
			
		||||
	var errors []string
 | 
			
		||||
	for _, r := range reqs {
 | 
			
		||||
		switch r.Kind {
 | 
			
		||||
		case spec.PGSyncUserAdd:
 | 
			
		||||
			if err := strategy.createPgUser(r.User, db); err != nil {
 | 
			
		||||
				return fmt.Errorf("could not create user %q: %v", r.User.Name, err)
 | 
			
		||||
				rr = append(rr, r)
 | 
			
		||||
				errors = append(errors, fmt.Sprintf("could not create user %q: %v", r.User.Name, err))
 | 
			
		||||
			}
 | 
			
		||||
		case spec.PGsyncUserAlter:
 | 
			
		||||
			if err := strategy.alterPgUser(r.User, db); err != nil {
 | 
			
		||||
				return fmt.Errorf("could not alter user %q: %v", r.User.Name, err)
 | 
			
		||||
				rr = append(rr, r)
 | 
			
		||||
				errors = append(errors, fmt.Sprintf("could not alter user %q: %v", r.User.Name, err))
 | 
			
		||||
			}
 | 
			
		||||
		case spec.PGSyncAlterSet:
 | 
			
		||||
			if err := strategy.alterPgUserSet(r.User, db); err != nil {
 | 
			
		||||
				return fmt.Errorf("could not set custom user %q parameters: %v", r.User.Name, err)
 | 
			
		||||
				rr = append(rr, r)
 | 
			
		||||
				errors = append(errors, fmt.Sprintf("could not set custom user %q parameters: %v", r.User.Name, err))
 | 
			
		||||
			}
 | 
			
		||||
		default:
 | 
			
		||||
			return fmt.Errorf("unrecognized operation: %v", r.Kind)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// creating roles might fail if group role members are created before the parent role
 | 
			
		||||
	// retry adding roles as long as the number of failed attempts is shrinking
 | 
			
		||||
	if len(rr) > 0 {
 | 
			
		||||
		if len(rr) < len(reqs) {
 | 
			
		||||
			if err := strategy.ExecuteSyncRequests(rr, db); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			return fmt.Errorf("could not execute sync requests for users: %v", errors)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) (err error) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue