reflect code review
This commit is contained in:
		
							parent
							
								
									353eb6994d
								
							
						
					
					
						commit
						e246ba7c3f
					
				|  | @ -85,7 +85,7 @@ type PreparedDatabase struct { | |||
| 	Extensions      map[string]string         `json:"extensions,omitempty"` | ||||
| } | ||||
| 
 | ||||
| // PreparedSchema describes elements to be bootstrapped in the schema
 | ||||
| // PreparedSchema describes elements to be bootstrapped per schema
 | ||||
| type PreparedSchema struct { | ||||
| 	DefaultRoles *bool `json:"defaultRoles,omitempty" defaults:"true"` | ||||
| 	DefaultUsers bool  `json:"defaultUsers,omitempty" defaults:"false"` | ||||
|  |  | |||
|  | @ -837,8 +837,17 @@ func (c *Cluster) initPreparedDatabaseRoles() error { | |||
| 		c.Spec.PreparedDatabases = preparedDatabases | ||||
| 	} | ||||
| 
 | ||||
| 	defaultRoles := map[string]string{"_owner": "", "_reader": "", "_writer": "_reader"} | ||||
| 	defaultUsers := map[string]string{"_owner_user": "_owner", "_reader_user": "_reader", "_writer_user": "_writer"} | ||||
| 	// create maps with default roles/users as keys and their membership as values
 | ||||
| 	defaultRoles := map[string]string{ | ||||
| 		constants.OwnerRoleNameSuffix:  "", | ||||
| 		constants.ReaderRoleNameSuffix: "", | ||||
| 		constants.WriterRoleNameSuffix: constants.ReaderRoleNameSuffix, | ||||
| 	} | ||||
| 	defaultUsers := map[string]string{ | ||||
| 		constants.OwnerRoleNameSuffix + constants.UserRoleNameSuffix:  constants.OwnerRoleNameSuffix, | ||||
| 		constants.ReaderRoleNameSuffix + constants.UserRoleNameSuffix: constants.ReaderRoleNameSuffix, | ||||
| 		constants.WriterRoleNameSuffix + constants.UserRoleNameSuffix: constants.WriterRoleNameSuffix, | ||||
| 	} | ||||
| 
 | ||||
| 	for preparedDbName, preparedDB := range preparedDatabases { | ||||
| 		// default roles per database
 | ||||
|  | @ -858,11 +867,15 @@ func (c *Cluster) initPreparedDatabaseRoles() error { | |||
| 		} | ||||
| 		for preparedSchemaName, preparedSchema := range preparedSchemas { | ||||
| 			if preparedSchema.DefaultRoles == nil || *preparedSchema.DefaultRoles { | ||||
| 				if err := c.initDefaultRoles(defaultRoles, preparedDbName+"_owner", preparedDbName+"_"+preparedSchemaName); err != nil { | ||||
| 				if err := c.initDefaultRoles(defaultRoles, | ||||
| 					preparedDbName+constants.OwnerRoleNameSuffix, | ||||
| 					preparedDbName+"_"+preparedSchemaName); err != nil { | ||||
| 					return fmt.Errorf("could not initialize default roles for database schema %s: %v", preparedSchemaName, err) | ||||
| 				} | ||||
| 				if preparedSchema.DefaultUsers { | ||||
| 					if err := c.initDefaultRoles(defaultUsers, preparedDbName+"_owner", preparedDbName+"_"+preparedSchemaName); err != nil { | ||||
| 					if err := c.initDefaultRoles(defaultUsers, | ||||
| 						preparedDbName+constants.OwnerRoleNameSuffix, | ||||
| 						preparedDbName+"_"+preparedSchemaName); err != nil { | ||||
| 						return fmt.Errorf("could not initialize default users for database schema %s: %v", preparedSchemaName, err) | ||||
| 					} | ||||
| 				} | ||||
|  | @ -879,7 +892,7 @@ func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix | |||
| 		roleName := prefix + defaultRole | ||||
| 
 | ||||
| 		flags := []string{constants.RoleFlagNoLogin} | ||||
| 		if defaultRole[len(defaultRole)-5:] == "_user" { | ||||
| 		if defaultRole[len(defaultRole)-5:] == constants.UserRoleNameSuffix { | ||||
| 			flags = []string{constants.RoleFlagLogin} | ||||
| 		} | ||||
| 
 | ||||
|  | @ -889,10 +902,10 @@ func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix | |||
| 		} | ||||
| 
 | ||||
| 		adminRole := "" | ||||
| 		if strings.Contains(defaultRole, "_owner") { | ||||
| 		if strings.Contains(defaultRole, constants.OwnerRoleNameSuffix) { | ||||
| 			adminRole = admin | ||||
| 		} else { | ||||
| 			adminRole = prefix + "_owner" | ||||
| 			adminRole = prefix + constants.OwnerRoleNameSuffix | ||||
| 		} | ||||
| 
 | ||||
| 		newRole := spec.PgUser{ | ||||
|  |  | |||
|  | @ -320,13 +320,13 @@ func (c *Cluster) databaseSchemaNameValid(schemaName string) bool { | |||
| 
 | ||||
| func (c *Cluster) execAlterSchemaDefaultPrivileges(schemaName, owner, rolePrefix string) error { | ||||
| 	if _, err := c.pgDb.Exec(fmt.Sprintf(schemaDefaultPrivilegesSQL, owner, | ||||
| 		schemaName, rolePrefix+"_writer", rolePrefix+"_reader", // schema
 | ||||
| 		schemaName, rolePrefix+"_reader", // tables
 | ||||
| 		schemaName, rolePrefix+"_reader", // sequences
 | ||||
| 		schemaName, rolePrefix+"_writer", // tables
 | ||||
| 		schemaName, rolePrefix+"_writer", // sequences
 | ||||
| 		schemaName, rolePrefix+"_reader", rolePrefix+"_writer", // types
 | ||||
| 		schemaName, rolePrefix+"_reader", rolePrefix+"_writer")); err != nil { // functions
 | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix, // schema
 | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, // tables
 | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, // sequences
 | ||||
| 		schemaName, rolePrefix+constants.WriterRoleNameSuffix, // tables
 | ||||
| 		schemaName, rolePrefix+constants.WriterRoleNameSuffix, // sequences
 | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix, // types
 | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix)); err != nil { // functions
 | ||||
| 		return fmt.Errorf("could not alter default privileges for database schema %s: %v", schemaName, err) | ||||
| 	} | ||||
| 
 | ||||
|  | @ -335,13 +335,13 @@ func (c *Cluster) execAlterSchemaDefaultPrivileges(schemaName, owner, rolePrefix | |||
| 
 | ||||
| func (c *Cluster) execAlterGlobalDefaultPrivileges(owner, rolePrefix string) error { | ||||
| 	if _, err := c.pgDb.Exec(fmt.Sprintf(globalDefaultPrivilegesSQL, owner, | ||||
| 		rolePrefix+"_writer", rolePrefix+"_reader", // schemas
 | ||||
| 		rolePrefix+"_reader",                       // tables
 | ||||
| 		rolePrefix+"_reader",                       // sequences
 | ||||
| 		rolePrefix+"_writer",                       // tables
 | ||||
| 		rolePrefix+"_writer",                       // sequences
 | ||||
| 		rolePrefix+"_reader", rolePrefix+"_writer", // types
 | ||||
| 		rolePrefix+"_reader", rolePrefix+"_writer")); err != nil { // functions
 | ||||
| 		rolePrefix+constants.WriterRoleNameSuffix, rolePrefix+constants.ReaderRoleNameSuffix, // schemas
 | ||||
| 		rolePrefix+constants.ReaderRoleNameSuffix,                                            // tables
 | ||||
| 		rolePrefix+constants.ReaderRoleNameSuffix,                                            // sequences
 | ||||
| 		rolePrefix+constants.WriterRoleNameSuffix,                                            // tables
 | ||||
| 		rolePrefix+constants.WriterRoleNameSuffix,                                            // sequences
 | ||||
| 		rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix, // types
 | ||||
| 		rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix)); err != nil { // functions
 | ||||
| 		return fmt.Errorf("could not alter default privileges for database %s: %v", rolePrefix, err) | ||||
| 	} | ||||
| 
 | ||||
|  |  | |||
|  | @ -524,7 +524,7 @@ func (c *Cluster) syncDatabases() error { | |||
| 	for preparedDatname := range c.Spec.PreparedDatabases { | ||||
| 		_, exists := currentDatabases[preparedDatname] | ||||
| 		if !exists { | ||||
| 			createDatabases[preparedDatname] = preparedDatname + "_owner" | ||||
| 			createDatabases[preparedDatname] = preparedDatname + constants.OwnerRoleNameSuffix | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
|  | @ -568,7 +568,7 @@ func (c *Cluster) syncPreparedDatabases() error { | |||
| 		}() | ||||
| 
 | ||||
| 		// first, set default privileges for prepared database
 | ||||
| 		c.execAlterGlobalDefaultPrivileges(preparedDbName+"_owner", preparedDbName) | ||||
| 		c.execAlterGlobalDefaultPrivileges(preparedDbName+constants.OwnerRoleNameSuffix, preparedDbName) | ||||
| 
 | ||||
| 		// now, prepare defined schemas
 | ||||
| 		preparedSchemas := preparedDB.PreparedSchemas | ||||
|  | @ -604,7 +604,7 @@ func (c *Cluster) syncPreparedSchemas(datname string, preparedSchemas map[string | |||
| 
 | ||||
| 	if createPreparedSchemas, equal := util.SubstractStringSlices(schemas, currentSchemas); !equal { | ||||
| 		for _, schemaName := range createPreparedSchemas { | ||||
| 			owner := "_owner" | ||||
| 			owner := constants.OwnerRoleNameSuffix | ||||
| 			dbOwner := datname + owner | ||||
| 			if preparedSchemas[schemaName].DefaultRoles == nil || *preparedSchemas[schemaName].DefaultRoles { | ||||
| 				owner = datname + "_" + schemaName + owner | ||||
|  | @ -621,14 +621,14 @@ func (c *Cluster) syncPreparedSchemas(datname string, preparedSchemas map[string | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncExtensions(extensions map[string]string) error { | ||||
| 	c.setProcessName("syncing extensions") | ||||
| 	c.setProcessName("syncing database extensions") | ||||
| 
 | ||||
| 	createExtensions := make(map[string]string) | ||||
| 	alterExtensions := make(map[string]string) | ||||
| 
 | ||||
| 	currentExtensions, err := c.getExtensions() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get current extensions: %v", err) | ||||
| 		return fmt.Errorf("could not get current database extensions: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	for extName, newSchema := range extensions { | ||||
|  | @ -640,10 +640,6 @@ func (c *Cluster) syncExtensions(extensions map[string]string) error { | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if len(createExtensions)+len(alterExtensions) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	for extName, schema := range createExtensions { | ||||
| 		if err = c.executeCreateExtension(extName, schema); err != nil { | ||||
| 			return err | ||||
|  |  | |||
|  | @ -13,4 +13,8 @@ const ( | |||
| 	RoleFlagCreateDB       = "CREATEDB" | ||||
| 	RoleFlagReplication    = "REPLICATION" | ||||
| 	RoleFlagByPassRLS      = "BYPASSRLS" | ||||
| 	OwnerRoleNameSuffix    = "_owner" | ||||
| 	ReaderRoleNameSuffix   = "_reader" | ||||
| 	WriterRoleNameSuffix   = "_writer" | ||||
| 	UserRoleNameSuffix     = "_user" | ||||
| ) | ||||
|  |  | |||
|  | @ -73,37 +73,37 @@ func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserM | |||
| } | ||||
| 
 | ||||
| // ExecuteSyncRequests makes actual database changes from the requests passed in its arguments.
 | ||||
| func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserRequest, db *sql.DB) error { | ||||
| 	var rr []spec.PgSyncUserRequest | ||||
| func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(requests []spec.PgSyncUserRequest, db *sql.DB) error { | ||||
| 	var reqretries []spec.PgSyncUserRequest | ||||
| 	var errors []string | ||||
| 	for _, r := range reqs { | ||||
| 		switch r.Kind { | ||||
| 	for _, request := range requests { | ||||
| 		switch request.Kind { | ||||
| 		case spec.PGSyncUserAdd: | ||||
| 			if err := strategy.createPgUser(r.User, db); err != nil { | ||||
| 				rr = append(rr, r) | ||||
| 				errors = append(errors, fmt.Sprintf("could not create user %q: %v", r.User.Name, err)) | ||||
| 			if err := strategy.createPgUser(request.User, db); err != nil { | ||||
| 				reqretries = append(reqretries, request) | ||||
| 				errors = append(errors, fmt.Sprintf("could not create user %q: %v", request.User.Name, err)) | ||||
| 			} | ||||
| 		case spec.PGsyncUserAlter: | ||||
| 			if err := strategy.alterPgUser(r.User, db); err != nil { | ||||
| 				rr = append(rr, r) | ||||
| 				errors = append(errors, fmt.Sprintf("could not alter user %q: %v", r.User.Name, err)) | ||||
| 			if err := strategy.alterPgUser(request.User, db); err != nil { | ||||
| 				reqretries = append(reqretries, request) | ||||
| 				errors = append(errors, fmt.Sprintf("could not alter user %q: %v", request.User.Name, err)) | ||||
| 			} | ||||
| 		case spec.PGSyncAlterSet: | ||||
| 			if err := strategy.alterPgUserSet(r.User, db); err != nil { | ||||
| 				rr = append(rr, r) | ||||
| 				errors = append(errors, fmt.Sprintf("could not set custom user %q parameters: %v", r.User.Name, err)) | ||||
| 			if err := strategy.alterPgUserSet(request.User, db); err != nil { | ||||
| 				reqretries = append(reqretries, request) | ||||
| 				errors = append(errors, fmt.Sprintf("could not set custom user %q parameters: %v", request.User.Name, err)) | ||||
| 			} | ||||
| 		default: | ||||
| 			return fmt.Errorf("unrecognized operation: %v", r.Kind) | ||||
| 			return fmt.Errorf("unrecognized operation: %v", request.Kind) | ||||
| 		} | ||||
| 
 | ||||
| 	} | ||||
| 
 | ||||
| 	// creating roles might fail if group role members are created before the parent role
 | ||||
| 	// retry adding roles as long as the number of failed attempts is shrinking
 | ||||
| 	if len(rr) > 0 { | ||||
| 		if len(rr) < len(reqs) { | ||||
| 			if err := strategy.ExecuteSyncRequests(rr, db); err != nil { | ||||
| 	if len(reqretries) > 0 { | ||||
| 		if len(reqretries) < len(requests) { | ||||
| 			if err := strategy.ExecuteSyncRequests(reqretries, db); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} else { | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue