fetch pooler and fes_user system user only when corresponding features are used (#2009)

* fetch pooler and fes_user system user only when corresponding features are used
* cover error case in unit test
* use string formatting instead of +
This commit is contained in:
Felix Kunde 2022-08-24 16:28:49 +02:00 committed by GitHub
parent e11edcdcde
commit ef324494a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 59 additions and 26 deletions

View File

@ -1127,7 +1127,7 @@ func (c *Cluster) initSystemUsers() {
// replication users for event streams are another exception
// the operator will create one replication user for all streams
if len(c.Spec.Streams) > 0 {
username := constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix
username := fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix)
streamUser := spec.PgUser{
Origin: spec.RoleOriginStream,
Name: username,
@ -1136,8 +1136,8 @@ func (c *Cluster) initSystemUsers() {
Password: util.RandomPassword(constants.PasswordLength),
}
if _, exists := c.systemUsers[username]; !exists {
c.systemUsers[username] = streamUser
if _, exists := c.systemUsers[constants.EventStreamUserKeyName]; !exists {
c.systemUsers[constants.EventStreamUserKeyName] = streamUser
}
}
}
@ -1155,9 +1155,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error {
constants.WriterRoleNameSuffix: constants.ReaderRoleNameSuffix,
}
defaultUsers := map[string]string{
constants.OwnerRoleNameSuffix + constants.UserRoleNameSuffix: constants.OwnerRoleNameSuffix,
constants.ReaderRoleNameSuffix + constants.UserRoleNameSuffix: constants.ReaderRoleNameSuffix,
constants.WriterRoleNameSuffix + constants.UserRoleNameSuffix: constants.WriterRoleNameSuffix,
fmt.Sprintf("%s%s", constants.OwnerRoleNameSuffix, constants.UserRoleNameSuffix): constants.OwnerRoleNameSuffix,
fmt.Sprintf("%s%s", constants.ReaderRoleNameSuffix, constants.UserRoleNameSuffix): constants.ReaderRoleNameSuffix,
fmt.Sprintf("%s%s", constants.WriterRoleNameSuffix, constants.UserRoleNameSuffix): constants.WriterRoleNameSuffix,
}
for preparedDbName, preparedDB := range c.Spec.PreparedDatabases {
@ -1218,7 +1218,7 @@ func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix
c.logger.Warn("secretNamespace ignored because enable_cross_namespace_secret set to false. Creating secrets in cluster namespace.")
}
}
roleName := prefix + defaultRole
roleName := fmt.Sprintf("%s%s", prefix, defaultRole)
flags := []string{constants.RoleFlagNoLogin}
if defaultRole[len(defaultRole)-5:] == constants.UserRoleNameSuffix {
@ -1236,7 +1236,7 @@ func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix
adminRole = admin
isOwner = true
} else {
adminRole = prefix + constants.OwnerRoleNameSuffix
adminRole = fmt.Sprintf("%s%s", prefix, constants.OwnerRoleNameSuffix)
}
newRole := spec.PgUser{

View File

@ -759,11 +759,14 @@ func TestServiceAnnotations(t *testing.T) {
func TestInitSystemUsers(t *testing.T) {
testName := "Test system users initialization"
// default cluster without connection pooler
// default cluster without connection pooler and event streams
cl.initSystemUsers()
if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; exist {
t.Errorf("%s, connection pooler user is present", testName)
}
if _, exist := cl.systemUsers[constants.EventStreamUserKeyName]; exist {
t.Errorf("%s, stream user is present", testName)
}
// cluster with connection pooler
cl.Spec.EnableConnectionPooler = boolToPointer(true)
@ -805,6 +808,31 @@ func TestInitSystemUsers(t *testing.T) {
if _, exist := cl.systemUsers["pooler"]; !exist {
t.Errorf("%s, System users are not allowed to be a connection pool user", testName)
}
// using stream user in manifest but no streams defined should be treated like normal robot user
streamUser := fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix)
cl.Spec.Users = map[string]acidv1.UserFlags{streamUser: []string{}}
cl.initSystemUsers()
if _, exist := cl.systemUsers[constants.EventStreamUserKeyName]; exist {
t.Errorf("%s, stream user is present", testName)
}
// cluster with streams
cl.Spec.Streams = []acidv1.Stream{
{
ApplicationId: "test-app",
Database: "test_db",
Tables: map[string]acidv1.StreamTable{
"data.test_table": acidv1.StreamTable{
EventType: "test_event",
},
},
},
}
cl.initSystemUsers()
if _, exist := cl.systemUsers[constants.EventStreamUserKeyName]; !exist {
t.Errorf("%s, stream user is not present", testName)
}
}
func TestPreparedDatabases(t *testing.T) {

View File

@ -46,7 +46,7 @@ type ConnectionPoolerObjects struct {
func (c *Cluster) connectionPoolerName(role PostgresRole) string {
name := c.Name + "-pooler"
if role == Replica {
name = name + "-repl"
name = fmt.Sprintf("%s-%s", name, "repl")
}
return name
}

View File

@ -80,7 +80,7 @@ func (c *Cluster) statefulSetName() string {
func (c *Cluster) endpointName(role PostgresRole) string {
name := c.Name
if role == Replica {
name = name + "-repl"
name = fmt.Sprintf("%s-%s", name, "repl")
}
return name
@ -89,7 +89,7 @@ func (c *Cluster) endpointName(role PostgresRole) string {
func (c *Cluster) serviceName(role PostgresRole) string {
name := c.Name
if role == Replica {
name = name + "-repl"
name = fmt.Sprintf("%s-%s", name, "repl")
}
return name
@ -2238,7 +2238,7 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
// getLogicalBackupJobName returns the name; the job itself may not exists
func (c *Cluster) getLogicalBackupJobName() (jobName string) {
return trimCronjobName(c.OpConfig.LogicalBackupJobPrefix + c.clusterName().Name)
return trimCronjobName(fmt.Sprintf("%s%s", c.OpConfig.LogicalBackupJobPrefix, c.clusterName().Name))
}
// Return an array of ownerReferences to make an arbitraty object dependent on

View File

@ -40,7 +40,7 @@ var (
namespace string = "default"
appId string = "test-app"
dbName string = "foo"
fesUser string = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix
fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix)
fesName string = fmt.Sprintf("%s-%s", clusterName, appId)
slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1))
@ -55,7 +55,7 @@ var (
},
Spec: acidv1.PostgresSpec{
Databases: map[string]string{
dbName: dbName + constants.UserRoleNameSuffix,
dbName: fmt.Sprintf("%s%s", dbName, constants.UserRoleNameSuffix),
},
Streams: []acidv1.Stream{
{

View File

@ -715,12 +715,16 @@ func (c *Cluster) updateSecret(
} else if secretUsername == c.systemUsers[constants.ReplicationUserKeyName].Name {
userKey = constants.ReplicationUserKeyName
userMap = c.systemUsers
} else if secretUsername == constants.ConnectionPoolerUserName {
userKey = constants.ConnectionPoolerUserName
userMap = c.systemUsers
} else if secretUsername == constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix {
userKey = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix
userMap = c.systemUsers
} else if _, exists := c.systemUsers[constants.ConnectionPoolerUserKeyName]; exists {
if secretUsername == c.systemUsers[constants.ConnectionPoolerUserKeyName].Name {
userKey = constants.ConnectionPoolerUserName
userMap = c.systemUsers
}
} else if _, exists := c.systemUsers[constants.EventStreamUserKeyName]; exists {
if secretUsername == c.systemUsers[constants.EventStreamUserKeyName].Name {
userKey = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix)
userMap = c.systemUsers
}
} else {
userKey = secretUsername
userMap = c.pgUsers
@ -816,7 +820,7 @@ func (c *Cluster) rotatePasswordInSecret(
// create rotation user if role is not listed for in-place password update
if !util.SliceContains(c.Spec.UsersWithInPlaceSecretRotation, secretUsername) {
rotationUser := secretPgUser
newRotationUsername := secretUsername + currentTime.Format("060102")
newRotationUsername := fmt.Sprintf("%s%s", secretUsername, currentTime.Format("060102"))
rotationUser.Name = newRotationUsername
rotationUser.MemberOf = []string{secretUsername}
(*rotationUsers)[newRotationUsername] = rotationUser
@ -976,7 +980,7 @@ func (c *Cluster) syncDatabases() error {
for preparedDatabaseName := range c.Spec.PreparedDatabases {
_, exists := currentDatabases[preparedDatabaseName]
if !exists {
createDatabases[preparedDatabaseName] = preparedDatabaseName + constants.OwnerRoleNameSuffix
createDatabases[preparedDatabaseName] = fmt.Sprintf("%s%s", preparedDatabaseName, constants.OwnerRoleNameSuffix)
preparedDatabases = append(preparedDatabases, preparedDatabaseName)
}
}
@ -1077,9 +1081,9 @@ func (c *Cluster) syncPreparedSchemas(databaseName string, preparedSchemas map[s
if createPreparedSchemas, equal := util.SubstractStringSlices(schemas, currentSchemas); !equal {
for _, schemaName := range createPreparedSchemas {
owner := constants.OwnerRoleNameSuffix
dbOwner := databaseName + owner
dbOwner := fmt.Sprintf("%s%s", databaseName, owner)
if preparedSchemas[schemaName].DefaultRoles == nil || *preparedSchemas[schemaName].DefaultRoles {
owner = databaseName + "_" + schemaName + owner
owner = fmt.Sprintf("%s_%s%s", databaseName, schemaName, owner)
} else {
owner = dbOwner
}

View File

@ -4,8 +4,9 @@ package constants
const (
PasswordLength = 64
SuperuserKeyName = "superuser"
ConnectionPoolerUserKeyName = "pooler"
ReplicationUserKeyName = "replication"
ConnectionPoolerUserKeyName = "pooler"
EventStreamUserKeyName = "streamer"
RoleFlagSuperuser = "SUPERUSER"
RoleFlagInherit = "INHERIT"
RoleFlagLogin = "LOGIN"