From 1355fdcd8d523c99b1a4b157ae0b4118496f858d Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 23 Sep 2019 18:32:01 +0200 Subject: [PATCH] PreparedDatabases with default role setup --- manifests/complete-postgres-manifest.yaml | 12 ++- manifests/minimal-postgres-manifest.yaml | 2 + pkg/apis/acid.zalan.do/v1/postgresql_type.go | 40 ++++--- .../acid.zalan.do/v1/zz_generated.deepcopy.go | 51 +++++++++ pkg/cluster/cluster.go | 76 +++++++++++++ pkg/cluster/database.go | 101 +++++++++++++++--- pkg/cluster/k8sres.go | 7 ++ pkg/cluster/sync.go | 72 ++++++++++++- pkg/controller/types.go | 3 +- pkg/spec/types.go | 3 + pkg/util/users/users.go | 24 ++++- 11 files changed, 356 insertions(+), 35 deletions(-) diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 2c600fc3e..48e614801 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -3,7 +3,7 @@ kind: postgresql metadata: name: acid-test-cluster spec: - dockerImage: registry.opensource.zalan.do/acid/spilo-11:1.6-p1 + dockerImage: registry.opensource.zalan.do/acid/spilo-11:1.5-p9 initContainers: - name: date image: busybox @@ -23,20 +23,26 @@ spec: - 127.0.0.1/32 databases: foo: zalando + preparedDatabases: + ab_db: + schemas: + data: + history: + defaultRoles: false # Expert section enableShmVolume: true # spiloFSGroup: 103 postgresql: - version: "11" + version: "10" parameters: shared_buffers: "32MB" max_connections: "10" log_statement: "all" resources: requests: - cpu: 10m + cpu: 100m memory: 100Mi limits: cpu: 300m diff --git a/manifests/minimal-postgres-manifest.yaml b/manifests/minimal-postgres-manifest.yaml index 91d297cac..840bc69b4 100644 --- a/manifests/minimal-postgres-manifest.yaml +++ b/manifests/minimal-postgres-manifest.yaml @@ -15,5 +15,7 @@ spec: foo_user: [] # role for application foo databases: foo: zalando # dbname: owner + preparedDatabases: + bar: postgresql: version: "11" diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 72e40e122..ec2318d92 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -45,20 +45,21 @@ type PostgresSpec struct { // load balancers' source ranges are the same for master and replica services AllowedSourceRanges []string `json:"allowedSourceRanges"` - NumberOfInstances int32 `json:"numberOfInstances"` - Users map[string]UserFlags `json:"users"` - MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"` - Clone CloneDescription `json:"clone"` - ClusterName string `json:"-"` - Databases map[string]string `json:"databases,omitempty"` - Tolerations []v1.Toleration `json:"tolerations,omitempty"` - Sidecars []Sidecar `json:"sidecars,omitempty"` - InitContainers []v1.Container `json:"initContainers,omitempty"` - PodPriorityClassName string `json:"podPriorityClassName,omitempty"` - ShmVolume *bool `json:"enableShmVolume,omitempty"` - EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"` - LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"` - StandbyCluster *StandbyDescription `json:"standby"` + NumberOfInstances int32 `json:"numberOfInstances"` + Users map[string]UserFlags `json:"users"` + MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"` + Clone CloneDescription `json:"clone"` + ClusterName string `json:"-"` + Databases map[string]string `json:"databases,omitempty"` + PreparedDatabases map[string]PreparedDatabase `json:"preparedDatabases,omitempty"` + Tolerations []v1.Toleration `json:"tolerations,omitempty"` + Sidecars []Sidecar `json:"sidecars,omitempty"` + InitContainers []v1.Container `json:"initContainers,omitempty"` + PodPriorityClassName string `json:"podPriorityClassName,omitempty"` + ShmVolume *bool `json:"enableShmVolume,omitempty"` + EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"` + LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"` + StandbyCluster *StandbyDescription `json:"standby"` // deprecated json tags InitContainersOld []v1.Container `json:"init_containers,omitempty"` @@ -75,6 +76,17 @@ type PostgresqlList struct { Items []Postgresql `json:"items"` } +// PreparedDatabase describes elements to be bootstrapped (schemas, prod-prefix) +type PreparedDatabase struct { + PreparedSchemas map[string]PreparedSchema `json:"schemas,omitempty"` + Prod bool `json:"prod,omitempty"` +} + +// PreparedSchema describes elements to be bootstrapped in the schema +type PreparedSchema struct { + DefaultRoles *bool `json:"defaultRoles,omitempty" defaults:"true"` +} + // MaintenanceWindow describes the time window when the operator is allowed to do maintenance on a cluster. type MaintenanceWindow struct { Everyday bool diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 92b2c8704..bc66c3a6b 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -482,6 +482,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { (*out)[key] = val } } + if in.PreparedDatabases != nil { + in, out := &in.PreparedDatabases, &out.PreparedDatabases + *out = make(map[string]PreparedDatabase, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } if in.Tolerations != nil { in, out := &in.Tolerations, &out.Tolerations *out = make([]corev1.Toleration, len(*in)) @@ -649,6 +656,50 @@ func (in *PostgresqlParam) DeepCopy() *PostgresqlParam { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PreparedDatabase) DeepCopyInto(out *PreparedDatabase) { + *out = *in + if in.PreparedSchemas != nil { + in, out := &in.PreparedSchemas, &out.PreparedSchemas + *out = make(map[string]PreparedSchema, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PreparedDatabase. +func (in *PreparedDatabase) DeepCopy() *PreparedDatabase { + if in == nil { + return nil + } + out := new(PreparedDatabase) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PreparedSchema) DeepCopyInto(out *PreparedSchema) { + *out = *in + if in.DefaultRoles != nil { + in, out := &in.DefaultRoles, &out.DefaultRoles + *out = new(bool) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PreparedSchema. +func (in *PreparedSchema) DeepCopy() *PreparedSchema { + if in == nil { + return nil + } + out := new(PreparedSchema) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceDescription) DeepCopyInto(out *ResourceDescription) { *out = *in diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index bf9542a48..379efd692 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -194,6 +194,10 @@ func (c *Cluster) initUsers() error { return fmt.Errorf("could not init infrastructure roles: %v", err) } + if err := c.initPreparedDatabaseRoles(); err != nil { + return fmt.Errorf("could not init default users: %v", err) + } + if err := c.initRobotUsers(); err != nil { return fmt.Errorf("could not init robot users: %v", err) } @@ -298,6 +302,9 @@ func (c *Cluster) Create() error { if err = c.syncDatabases(); err != nil { return fmt.Errorf("could not sync databases: %v", err) } + if err = c.syncPreparedDatabases(); err != nil { + return fmt.Errorf("could not sync prepared databases: %v", err) + } c.logger.Infof("databases have been successfully created") } @@ -641,6 +648,13 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true } } + if !reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) { + c.logger.Infof("syncing prepared databases") + if err := c.syncPreparedDatabases(); err != nil { + c.logger.Errorf("could not sync prepared databases: %v", err) + updateFailed = true + } + } } return nil @@ -762,6 +776,68 @@ func (c *Cluster) initSystemUsers() { } } +func (c *Cluster) initPreparedDatabaseRoles() error { + + for preparedDbName, preparedDB := range c.Spec.PreparedDatabases { + if err := c.initDefaultRoles("admin", preparedDbName); err != nil { + return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err) + } + preparedSchemas := preparedDB.PreparedSchemas + if len(preparedDB.PreparedSchemas) == 0 { + preparedSchemas = map[string]acidv1.PreparedSchema{"data": {DefaultRoles: util.True()}} + } + for preparedSchemaName, preparedSchema := range preparedSchemas { + if preparedSchema.DefaultRoles == nil || *preparedSchema.DefaultRoles { + if err := c.initDefaultRoles(preparedDbName+"_owner", preparedDbName+"_"+preparedSchemaName); err != nil { + return fmt.Errorf("could not initialize default roles for database schema %s: %v", preparedSchemaName, err) + } + } + } + } + return nil +} + +func (c *Cluster) initDefaultRoles(admin, prefix string) error { + defaultRoles := map[string]string{ + "_owner": "", "_reader": "", "_writer": "_reader", + "_owner_user": "_owner", "_reader_user": "_reader", "_writer_user": "_writer"} + + for defaultRole, inherits := range defaultRoles { + + roleName := prefix + defaultRole + flags := []string{constants.RoleFlagNoLogin} + memberOf := make([]string, 0) + adminRole := "" + if defaultRole[len(defaultRole)-5:] == "_user" { + flags = []string{constants.RoleFlagLogin} + } else { + if defaultRole == "_owner" { + adminRole = admin + } else { + adminRole = prefix + "_owner" + } + } + if inherits != "" { + memberOf = append(memberOf, prefix+inherits) + } + + newRole := spec.PgUser{ + Origin: spec.RoleOriginBootstrap, + Name: roleName, + Password: util.RandomPassword(constants.PasswordLength), + Flags: flags, + MemberOf: memberOf, + AdminRole: adminRole, + } + if currentRole, present := c.pgUsers[roleName]; present { + c.pgUsers[roleName] = c.resolveNameConflict(¤tRole, &newRole) + } else { + c.pgUsers[roleName] = newRole + } + } + return nil +} + func (c *Cluster) initRobotUsers() error { for username, userFlags := range c.Spec.Users { if !isValidUsername(username) { diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 07ea011a6..767989e0a 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -25,16 +25,22 @@ const ( WHERE a.rolname = ANY($1) ORDER BY 1;` - getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` - createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` - alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` + getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` + getSchemasSQL = `SELECT n.nspname AS dbschema FROM pg_catalog.pg_namespace n + WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' ORDER BY 1` + + createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` + createDatabaseSchemaSQL = `SET ROLE TO "%s"; CREATE SCHEMA "%s" AUTHORIZATION "%s"` + alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` + defaultPrivilegesSQL = `SET ROLE TO "%s"; ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT INSERT, UPDATE, DELETE ON TABLES TO "%s"; ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT SELECT ON TABLES TO "%s";` ) -func (c *Cluster) pgConnectionString() string { +func (c *Cluster) pgConnectionString(dbname string) string { password := c.systemUsers[constants.SuperuserKeyName].Password - return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user='%s' password='%s' connect_timeout='%d'", + return fmt.Sprintf("host='%s' dbname='%s' sslmode=require user='%s' password='%s' connect_timeout='%d'", fmt.Sprintf("%s.%s.svc.%s", c.Name, c.Namespace, c.OpConfig.ClusterDomain), + dbname, c.systemUsers[constants.SuperuserKeyName].Name, strings.Replace(password, "$", "\\$", -1), constants.PostgresConnectTimeout/time.Second) @@ -48,14 +54,14 @@ func (c *Cluster) databaseAccessDisabled() bool { return !c.OpConfig.EnableDBAccess } -func (c *Cluster) initDbConn() error { +func (c *Cluster) initDbConn(dbname string) error { c.setProcessName("initializing db connection") if c.pgDb != nil { return nil } var conn *sql.DB - connstring := c.pgConnectionString() + connstring := c.pgConnectionString(dbname) finalerr := retryutil.Retry(constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { @@ -100,9 +106,9 @@ func (c *Cluster) closeDbConn() (err error) { c.logger.Debug("closing database connection") if err = c.pgDb.Close(); err != nil { c.logger.Errorf("could not close database connection: %v", err) + } else { + c.pgDb = nil } - c.pgDb = nil - return nil } c.logger.Warning("attempted to close an empty db connection object") @@ -187,14 +193,14 @@ func (c *Cluster) getDatabases() (dbs map[string]string, err error) { } // executeCreateDatabase creates new database with the given owner. -// The caller is responsible for openinging and closing the database connection. +// The caller is responsible for opening and closing the database connection. func (c *Cluster) executeCreateDatabase(datname, owner string) error { return c.execCreateOrAlterDatabase(datname, owner, createDatabaseSQL, "creating database", "create database") } -// executeCreateDatabase changes the owner of the given database. -// The caller is responsible for openinging and closing the database connection. +// executeAlterDatabaseOwner changes the owner of the given database. +// The caller is responsible for opening and closing the database connection. func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error { return c.execCreateOrAlterDatabase(datname, owner, alterDatabaseOwnerSQL, "changing owner for database", "alter database owner") @@ -224,6 +230,77 @@ func (c *Cluster) databaseNameOwnerValid(datname, owner string) bool { return true } +// getSchemas returns the list of current database schemas +// The caller is responsible for opening and closing the database connection +func (c *Cluster) getSchemas() (schemas []string, err error) { + var ( + rows *sql.Rows + dbschemas []string + ) + + if rows, err = c.pgDb.Query(getSchemasSQL); err != nil { + return nil, fmt.Errorf("could not query database schemas: %v", err) + } + + defer func() { + if err2 := rows.Close(); err2 != nil { + if err != nil { + err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err) + } else { + err = fmt.Errorf("error when closing query cursor: %v", err2) + } + } + }() + + for rows.Next() { + var dbschema string + + if err = rows.Scan(&dbschema); err != nil { + return nil, fmt.Errorf("error when processing row: %v", err) + } + dbschemas = append(dbschemas, dbschema) + } + + return dbschemas, err +} + +// executeCreateDatabaseSchema creates new database schema with the given owner. +// The caller is responsible for opening and closing the database connection. +func (c *Cluster) executeCreateDatabaseSchema(datname, schemaName, dbOwner string, schemaOwner string) error { + return c.execCreateDatabaseSchema(datname, schemaName, dbOwner, schemaOwner, createDatabaseSchemaSQL, + "creating database schema", "create database schema") +} + +func (c *Cluster) execCreateDatabaseSchema(datname, schemaName, dbOwner, schemaOwner, statement, doing, operation string) error { + if !c.databaseSchemaNameValid(schemaName) { + return nil + } + c.logger.Infof("%s %q owner %q", doing, schemaName, schemaOwner) + if _, err := c.pgDb.Exec(fmt.Sprintf(statement, dbOwner, schemaName, schemaOwner)); err != nil { + return fmt.Errorf("could not execute %s: %v", operation, err) + } + c.execAlterDefaultPrivileges(schemaName, schemaOwner, datname) + c.execAlterDefaultPrivileges(schemaName, schemaOwner, datname+"_"+schemaName) + + return nil +} + +func (c *Cluster) execAlterDefaultPrivileges(schemaName, owner, rolePrefix string) error { + if _, err := c.pgDb.Exec(fmt.Sprintf(defaultPrivilegesSQL, owner, schemaName, rolePrefix+"_writer", schemaName, rolePrefix+"_reader")); err != nil { + return fmt.Errorf("could not alter default privileges for database schema: %v", err) + } + + return nil +} + +func (c *Cluster) databaseSchemaNameValid(schemaName string) bool { + if !databaseNameRegexp.MatchString(schemaName) { + c.logger.Infof("database schema %q has invalid name", schemaName) + return false + } + return true +} + func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool) (result []string) { if rolsuper { result = append(result, constants.RoleFlagSuperuser) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 4074c61c6..acf6c73d3 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1145,6 +1145,13 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) return nil } + //skip NOLOGIN users + for _, flag := range pgUser.Flags { + if flag == constants.RoleFlagNoLogin { + return nil + } + } + username := pgUser.Name secret := v1.Secret{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index dd55cd04c..019cce15d 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -101,6 +101,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { err = fmt.Errorf("could not sync databases: %v", err) return err } + c.logger.Debugf("syncing database schemas") + if err = c.syncPreparedDatabases(); err != nil { + err = fmt.Errorf("could not sync database schemas: %v", err) + return err + } } return err @@ -433,7 +438,7 @@ func (c *Cluster) syncRoles() (err error) { userNames []string ) - err = c.initDbConn() + err = c.initDbConn("postgres") if err != nil { return fmt.Errorf("could not init db connection: %v", err) } @@ -490,7 +495,7 @@ func (c *Cluster) syncDatabases() error { createDatabases := make(map[string]string) alterOwnerDatabases := make(map[string]string) - if err := c.initDbConn(); err != nil { + if err := c.initDbConn("postgres"); err != nil { return fmt.Errorf("could not init database connection") } defer func() { @@ -504,6 +509,13 @@ func (c *Cluster) syncDatabases() error { return fmt.Errorf("could not get current databases: %v", err) } + for preparedDatname := range c.Spec.PreparedDatabases { + _, exists := currentDatabases[preparedDatname] + if !exists { + createDatabases[preparedDatname] = preparedDatname + "_owner" + } + } + for datname, newOwner := range c.Spec.Databases { currentOwner, exists := currentDatabases[datname] if !exists { @@ -531,6 +543,62 @@ func (c *Cluster) syncDatabases() error { return nil } +func (c *Cluster) syncPreparedDatabases() error { + c.setProcessName("syncing prepared databases") + for preparedDbName, preparedDB := range c.Spec.PreparedDatabases { + preparedSchemas := preparedDB.PreparedSchemas + if len(preparedDB.PreparedSchemas) == 0 { + preparedSchemas = map[string]acidv1.PreparedSchema{"data": {DefaultRoles: util.True()}} + } + if err := c.syncPreparedSchemas(preparedDbName, preparedSchemas); err != nil { + return err + } + } + + return nil +} + +func (c *Cluster) syncPreparedSchemas(datname string, preparedSchemas map[string]acidv1.PreparedSchema) error { + c.setProcessName("syncing prepared schemas") + + if err := c.initDbConn(datname); err != nil { + return fmt.Errorf("could not init connection to database %s: %v", datname, err) + } + defer func() { + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + + currentSchemas, err := c.getSchemas() + if err != nil { + return fmt.Errorf("could not get current schemas: %v", err) + } + + var schemas []string + + for schema := range preparedSchemas { + schemas = append(schemas, schema) + } + + if createPreparedSchemas, equal := util.SubstractStringSlices(schemas, currentSchemas); !equal { + for _, schemaName := range createPreparedSchemas { + owner := "_owner" + dbOwner := datname + owner + if preparedSchemas[schemaName].DefaultRoles == nil || *preparedSchemas[schemaName].DefaultRoles { + owner = datname + "_" + schemaName + owner + } else { + owner = dbOwner + } + if err = c.executeCreateDatabaseSchema(datname, schemaName, dbOwner, owner); err != nil { + return err + } + } + } + + return nil +} + func (c *Cluster) syncLogicalBackupJob() error { var ( job *batchv1beta1.CronJob diff --git a/pkg/controller/types.go b/pkg/controller/types.go index 0d86abec8..b598014c9 100644 --- a/pkg/controller/types.go +++ b/pkg/controller/types.go @@ -1,9 +1,10 @@ package controller import ( - "k8s.io/apimachinery/pkg/types" "time" + "k8s.io/apimachinery/pkg/types" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" ) diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 3e6bec8db..5369c288b 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -30,6 +30,7 @@ const ( RoleOriginInfrastructure RoleOriginTeamsAPI RoleOriginSystem + RoleOriginBootstrap ) type syncUserOperation int @@ -178,6 +179,8 @@ func (r RoleOrigin) String() string { return "teams API role" case RoleOriginSystem: return "system role" + case RoleOriginBootstrap: + return "bootstrapped role" default: panic(fmt.Sprintf("bogus role origin value %d", r)) } diff --git a/pkg/util/users/users.go b/pkg/util/users/users.go index 112f89b43..c6e1079f6 100644 --- a/pkg/util/users/users.go +++ b/pkg/util/users/users.go @@ -74,25 +74,43 @@ func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserM // ExecuteSyncRequests makes actual database changes from the requests passed in its arguments. func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserRequest, db *sql.DB) error { + var rr []spec.PgSyncUserRequest + var errors []string for _, r := range reqs { switch r.Kind { case spec.PGSyncUserAdd: if err := strategy.createPgUser(r.User, db); err != nil { - return fmt.Errorf("could not create user %q: %v", r.User.Name, err) + rr = append(rr, r) + errors = append(errors, fmt.Sprintf("could not create user %q: %v", r.User.Name, err)) } case spec.PGsyncUserAlter: if err := strategy.alterPgUser(r.User, db); err != nil { - return fmt.Errorf("could not alter user %q: %v", r.User.Name, err) + rr = append(rr, r) + errors = append(errors, fmt.Sprintf("could not alter user %q: %v", r.User.Name, err)) } case spec.PGSyncAlterSet: if err := strategy.alterPgUserSet(r.User, db); err != nil { - return fmt.Errorf("could not set custom user %q parameters: %v", r.User.Name, err) + rr = append(rr, r) + errors = append(errors, fmt.Sprintf("could not set custom user %q parameters: %v", r.User.Name, err)) } default: return fmt.Errorf("unrecognized operation: %v", r.Kind) } } + + // creating roles might fail if group role members are created before the parent role + // retry adding roles as long as the number of failed attempts is shrinking + if len(rr) > 0 { + if len(rr) < len(reqs) { + if err := strategy.ExecuteSyncRequests(rr, db); err != nil { + return err + } + } else { + return fmt.Errorf("could not execute sync requests for users: %v", errors) + } + } + return nil } func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) (err error) {