let operator resolve reversed memberhip
This commit is contained in:
		
							parent
							
								
									a7210bf232
								
							
						
					
					
						commit
						c2f2c400e4
					
				| 
						 | 
					@ -164,6 +164,13 @@ class EndToEndTestCase(unittest.TestCase):
 | 
				
			||||||
           Test granting additional roles to existing database owners
 | 
					           Test granting additional roles to existing database owners
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        k8s = self.k8s
 | 
					        k8s = self.k8s
 | 
				
			||||||
 | 
					        leader = k8s.get_cluster_leader_pod()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # produce wrong membership from v1.8.0
 | 
				
			||||||
 | 
					        grant_dbowner = """
 | 
				
			||||||
 | 
					            GRANT bar_owner TO cron_admin;
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        self.query_database(leader.metadata.name, "postgres", grant_dbowner)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # enable PostgresTeam CRD and lower resync
 | 
					        # enable PostgresTeam CRD and lower resync
 | 
				
			||||||
        owner_roles = {
 | 
					        owner_roles = {
 | 
				
			||||||
| 
						 | 
					@ -175,7 +182,6 @@ class EndToEndTestCase(unittest.TestCase):
 | 
				
			||||||
        self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
 | 
					        self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
 | 
				
			||||||
                             "Operator does not get in sync")
 | 
					                             "Operator does not get in sync")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        leader = k8s.get_cluster_leader_pod()
 | 
					 | 
				
			||||||
        owner_query = """
 | 
					        owner_query = """
 | 
				
			||||||
            SELECT a2.rolname
 | 
					            SELECT a2.rolname
 | 
				
			||||||
              FROM pg_catalog.pg_authid a
 | 
					              FROM pg_catalog.pg_authid a
 | 
				
			||||||
| 
						 | 
					@ -189,6 +195,8 @@ class EndToEndTestCase(unittest.TestCase):
 | 
				
			||||||
        self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", owner_query)), 3,
 | 
					        self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", owner_query)), 3,
 | 
				
			||||||
            "Not all additional users found in database", 10, 5)
 | 
					            "Not all additional users found in database", 10, 5)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        print('Operator log: {}'.format(k8s.get_operator_log()))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
 | 
					    @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
 | 
				
			||||||
    def test_additional_pod_capabilities(self):
 | 
					    def test_additional_pod_capabilities(self):
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -657,7 +657,7 @@ func (c *Cluster) syncSecrets() error {
 | 
				
			||||||
			return fmt.Errorf("could not init db connection: %v", err)
 | 
								return fmt.Errorf("could not init db connection: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(spec.PgUserMap{}, rotationUsers)
 | 
							pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(spec.PgUserMap{}, rotationUsers)
 | 
				
			||||||
		if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil {
 | 
							if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb, c.OpConfig.AdditionalOwnerRoles); err != nil {
 | 
				
			||||||
			return fmt.Errorf("error creating database roles for password rotation: %v", err)
 | 
								return fmt.Errorf("error creating database roles for password rotation: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if err := c.closeDbConn(); err != nil {
 | 
							if err := c.closeDbConn(); err != nil {
 | 
				
			||||||
| 
						 | 
					@ -872,7 +872,7 @@ func (c *Cluster) syncRoles() (err error) {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(dbUsers, c.pgUsers)
 | 
						pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(dbUsers, c.pgUsers)
 | 
				
			||||||
	if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil {
 | 
						if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb, c.OpConfig.AdditionalOwnerRoles); err != nil {
 | 
				
			||||||
		return fmt.Errorf("error executing sync statements: %v", err)
 | 
							return fmt.Errorf("error executing sync statements: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -75,7 +75,7 @@ type PgSyncUserRequest struct {
 | 
				
			||||||
// UserSyncer defines an interface for the implementations to sync users from the manifest to the DB.
 | 
					// UserSyncer defines an interface for the implementations to sync users from the manifest to the DB.
 | 
				
			||||||
type UserSyncer interface {
 | 
					type UserSyncer interface {
 | 
				
			||||||
	ProduceSyncRequests(dbUsers PgUserMap, newUsers PgUserMap) (req []PgSyncUserRequest)
 | 
						ProduceSyncRequests(dbUsers PgUserMap, newUsers PgUserMap) (req []PgSyncUserRequest)
 | 
				
			||||||
	ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error
 | 
						ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB, additionalOwners []string) error
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// LogEntry describes log entry in the RingLogger
 | 
					// LogEntry describes log entry in the RingLogger
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -20,6 +20,7 @@ const (
 | 
				
			||||||
	alterRoleSetSQL      = `ALTER ROLE "%s" SET %s TO %s`
 | 
						alterRoleSetSQL      = `ALTER ROLE "%s" SET %s TO %s`
 | 
				
			||||||
	dropUserSQL          = `SET LOCAL synchronous_commit = 'local'; DROP ROLE "%s";`
 | 
						dropUserSQL          = `SET LOCAL synchronous_commit = 'local'; DROP ROLE "%s";`
 | 
				
			||||||
	grantToUserSQL       = `GRANT %s TO "%s"`
 | 
						grantToUserSQL       = `GRANT %s TO "%s"`
 | 
				
			||||||
 | 
						revokeFromUserSQL    = `REVOKE %s FROM %s`
 | 
				
			||||||
	doBlockStmt          = `SET LOCAL synchronous_commit = 'local'; DO $$ BEGIN %s; END;$$;`
 | 
						doBlockStmt          = `SET LOCAL synchronous_commit = 'local'; DO $$ BEGIN %s; END;$$;`
 | 
				
			||||||
	passwordTemplate     = "ENCRYPTED PASSWORD '%s'"
 | 
						passwordTemplate     = "ENCRYPTED PASSWORD '%s'"
 | 
				
			||||||
	inRoleTemplate       = `IN ROLE %s`
 | 
						inRoleTemplate       = `IN ROLE %s`
 | 
				
			||||||
| 
						 | 
					@ -103,7 +104,7 @@ func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserM
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ExecuteSyncRequests makes actual database changes from the requests passed in its arguments.
 | 
					// ExecuteSyncRequests makes actual database changes from the requests passed in its arguments.
 | 
				
			||||||
func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(requests []spec.PgSyncUserRequest, db *sql.DB) error {
 | 
					func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(requests []spec.PgSyncUserRequest, db *sql.DB, additionalOwnerRoles []string) error {
 | 
				
			||||||
	var reqretries []spec.PgSyncUserRequest
 | 
						var reqretries []spec.PgSyncUserRequest
 | 
				
			||||||
	errors := make([]string, 0)
 | 
						errors := make([]string, 0)
 | 
				
			||||||
	for _, request := range requests {
 | 
						for _, request := range requests {
 | 
				
			||||||
| 
						 | 
					@ -117,6 +118,11 @@ func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(requests []spec.PgSy
 | 
				
			||||||
			if err := strategy.alterPgUser(request.User, db); err != nil {
 | 
								if err := strategy.alterPgUser(request.User, db); err != nil {
 | 
				
			||||||
				reqretries = append(reqretries, request)
 | 
									reqretries = append(reqretries, request)
 | 
				
			||||||
				errors = append(errors, fmt.Sprintf("could not alter user %q: %v", request.User.Name, err))
 | 
									errors = append(errors, fmt.Sprintf("could not alter user %q: %v", request.User.Name, err))
 | 
				
			||||||
 | 
									if request.User.IsDbOwner && len(additionalOwnerRoles) > 0 {
 | 
				
			||||||
 | 
										if err := resolveOwnerMembership(request.User, additionalOwnerRoles, db); err != nil {
 | 
				
			||||||
 | 
											errors = append(errors, fmt.Sprintf("could not resolve owner membership for %q: %v", request.User.Name, err))
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		case spec.PGSyncAlterSet:
 | 
							case spec.PGSyncAlterSet:
 | 
				
			||||||
			if err := strategy.alterPgUserSet(request.User, db); err != nil {
 | 
								if err := strategy.alterPgUserSet(request.User, db); err != nil {
 | 
				
			||||||
| 
						 | 
					@ -138,7 +144,7 @@ func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(requests []spec.PgSy
 | 
				
			||||||
	// retry adding roles as long as the number of failed attempts is shrinking
 | 
						// retry adding roles as long as the number of failed attempts is shrinking
 | 
				
			||||||
	if len(reqretries) > 0 {
 | 
						if len(reqretries) > 0 {
 | 
				
			||||||
		if len(reqretries) < len(requests) {
 | 
							if len(reqretries) < len(requests) {
 | 
				
			||||||
			if err := strategy.ExecuteSyncRequests(reqretries, db); err != nil {
 | 
								if err := strategy.ExecuteSyncRequests(reqretries, db, additionalOwnerRoles); err != nil {
 | 
				
			||||||
				return err
 | 
									return err
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
| 
						 | 
					@ -149,6 +155,25 @@ func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(requests []spec.PgSy
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func resolveOwnerMembership(dbOwner spec.PgUser, additionalOwners []string, db *sql.DB) error {
 | 
				
			||||||
 | 
						errors := make([]string, 0)
 | 
				
			||||||
 | 
						for _, groupRole := range dbOwner.MemberOf {
 | 
				
			||||||
 | 
							for _, additionalOwner := range additionalOwners {
 | 
				
			||||||
 | 
								if additionalOwner == groupRole {
 | 
				
			||||||
 | 
									if err := revokeRole(dbOwner.Name, additionalOwner, db); err != nil {
 | 
				
			||||||
 | 
										errors = append(errors, fmt.Sprintf("could not revoke %q from %q: %v", dbOwner.Name, additionalOwner, err))
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(errors) > 0 {
 | 
				
			||||||
 | 
							return fmt.Errorf("could not resolve membership between %q and additional owner roles: %v", dbOwner.Name, strings.Join(errors, `', '`))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) error {
 | 
					func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) error {
 | 
				
			||||||
	queries := produceAlterRoleSetStmts(user)
 | 
						queries := produceAlterRoleSetStmts(user)
 | 
				
			||||||
	query := fmt.Sprintf(doBlockStmt, strings.Join(queries, ";"))
 | 
						query := fmt.Sprintf(doBlockStmt, strings.Join(queries, ";"))
 | 
				
			||||||
| 
						 | 
					@ -269,6 +294,16 @@ func quoteMemberList(user spec.PgUser) string {
 | 
				
			||||||
	return strings.Join(memberof, ",")
 | 
						return strings.Join(memberof, ",")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func revokeRole(groupRole, role string, db *sql.DB) error {
 | 
				
			||||||
 | 
						revokeStmt := fmt.Sprintf(revokeFromUserSQL, groupRole, role)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if _, err := db.Exec(fmt.Sprintf(doBlockStmt, revokeStmt)); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// quoteVal quotes values to be used at ALTER ROLE SET param = value if necessary
 | 
					// quoteVal quotes values to be used at ALTER ROLE SET param = value if necessary
 | 
				
			||||||
func quoteParameterValue(name, val string) string {
 | 
					func quoteParameterValue(name, val string) string {
 | 
				
			||||||
	start := val[0]
 | 
						start := val[0]
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue