skip db sync on failed initUsers during UPDATE (#2083)
* skip db sync on failed initUsers during UPDATE * provide unit test for teams API being unavailable * add test for 404 case
This commit is contained in:
parent
d55e74e1e7
commit
70f3ee8e36
|
|
@ -227,6 +227,10 @@ func (c *Cluster) initUsers() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.initHumanUsers(); err != nil {
|
if err := c.initHumanUsers(); err != nil {
|
||||||
|
// remember all cached users in c.pgUsers
|
||||||
|
for cachedUserName, cachedUser := range c.pgUsersCache {
|
||||||
|
c.pgUsers[cachedUserName] = cachedUser
|
||||||
|
}
|
||||||
return fmt.Errorf("could not init human users: %v", err)
|
return fmt.Errorf("could not init human users: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -748,6 +752,7 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
|
||||||
// for a cluster that had no such job before. In this case a missing job is not an error.
|
// for a cluster that had no such job before. In this case a missing job is not an error.
|
||||||
func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
updateFailed := false
|
updateFailed := false
|
||||||
|
userInitFailed := false
|
||||||
syncStatefulSet := false
|
syncStatefulSet := false
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
|
@ -785,32 +790,39 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if users need to be synced
|
// Users
|
||||||
|
func() {
|
||||||
|
// check if users need to be synced during update
|
||||||
sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) &&
|
sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) &&
|
||||||
reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases)
|
reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases)
|
||||||
sameRotatedUsers := reflect.DeepEqual(oldSpec.Spec.UsersWithSecretRotation, newSpec.Spec.UsersWithSecretRotation) &&
|
sameRotatedUsers := reflect.DeepEqual(oldSpec.Spec.UsersWithSecretRotation, newSpec.Spec.UsersWithSecretRotation) &&
|
||||||
reflect.DeepEqual(oldSpec.Spec.UsersWithInPlaceSecretRotation, newSpec.Spec.UsersWithInPlaceSecretRotation)
|
reflect.DeepEqual(oldSpec.Spec.UsersWithInPlaceSecretRotation, newSpec.Spec.UsersWithInPlaceSecretRotation)
|
||||||
|
|
||||||
// connection pooler needs one system user created, which is done in
|
// connection pooler needs one system user created who is initialized in initUsers
|
||||||
// initUsers. Check if it needs to be called.
|
// only when disabled in oldSpec and enabled in newSpec
|
||||||
needConnectionPooler := needMasterConnectionPoolerWorker(&newSpec.Spec) ||
|
needPoolerUser := c.needConnectionPoolerUser(&oldSpec.Spec, &newSpec.Spec)
|
||||||
needReplicaConnectionPoolerWorker(&newSpec.Spec)
|
|
||||||
|
|
||||||
if !sameUsers || !sameRotatedUsers || needConnectionPooler {
|
// streams new replication user created who is initialized in initUsers
|
||||||
|
// only when streams were not specified in oldSpec but in newSpec
|
||||||
|
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0
|
||||||
|
|
||||||
|
if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser {
|
||||||
c.logger.Debugf("initialize users")
|
c.logger.Debugf("initialize users")
|
||||||
if err := c.initUsers(); err != nil {
|
if err := c.initUsers(); err != nil {
|
||||||
c.logger.Errorf("could not init users: %v", err)
|
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
|
||||||
|
userInitFailed = true
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debugf("syncing secrets")
|
c.logger.Debugf("syncing secrets")
|
||||||
|
|
||||||
//TODO: mind the secrets of the deleted/new users
|
//TODO: mind the secrets of the deleted/new users
|
||||||
if err := c.syncSecrets(); err != nil {
|
if err := c.syncSecrets(); err != nil {
|
||||||
c.logger.Errorf("could not sync secrets: %v", err)
|
c.logger.Errorf("could not sync secrets: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Volume
|
// Volume
|
||||||
if c.OpConfig.StorageResizeMode != "off" {
|
if c.OpConfig.StorageResizeMode != "off" {
|
||||||
|
|
@ -892,7 +904,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Roles and Databases
|
// Roles and Databases
|
||||||
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
|
if !userInitFailed && !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
|
||||||
c.logger.Debugf("syncing roles")
|
c.logger.Debugf("syncing roles")
|
||||||
if err := c.syncRoles(); err != nil {
|
if err := c.syncRoles(); err != nil {
|
||||||
c.logger.Errorf("could not sync roles: %v", err)
|
c.logger.Errorf("could not sync roles: %v", err)
|
||||||
|
|
@ -920,13 +932,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
// need to process. In the future we may want to do this more careful and
|
// need to process. In the future we may want to do this more careful and
|
||||||
// check which databases we need to process, but even repeating the whole
|
// check which databases we need to process, but even repeating the whole
|
||||||
// installation process should be good enough.
|
// installation process should be good enough.
|
||||||
|
|
||||||
if _, err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil {
|
if _, err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil {
|
||||||
c.logger.Errorf("could not sync connection pooler: %v", err)
|
c.logger.Errorf("could not sync connection pooler: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(c.Spec.Streams) > 0 {
|
if len(newSpec.Spec.Streams) > 0 {
|
||||||
if err := c.syncStreams(); err != nil {
|
if err := c.syncStreams(); err != nil {
|
||||||
c.logger.Errorf("could not sync streams: %v", err)
|
c.logger.Errorf("could not sync streams: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
|
|
@ -1094,28 +1105,10 @@ func (c *Cluster) initSystemUsers() {
|
||||||
Password: util.RandomPassword(constants.PasswordLength),
|
Password: util.RandomPassword(constants.PasswordLength),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connection pooler user is an exception, if requested it's going to be
|
// Connection pooler user is an exception
|
||||||
// created by operator as a normal pgUser
|
// if requested it's going to be created by operator
|
||||||
if needConnectionPooler(&c.Spec) {
|
if needConnectionPooler(&c.Spec) {
|
||||||
connectionPoolerSpec := c.Spec.ConnectionPooler
|
username := c.poolerUser(&c.Spec)
|
||||||
if connectionPoolerSpec == nil {
|
|
||||||
connectionPoolerSpec = &acidv1.ConnectionPooler{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Using superuser as pooler user is not a good idea. First of all it's
|
|
||||||
// not going to be synced correctly with the current implementation,
|
|
||||||
// and second it's a bad practice.
|
|
||||||
username := c.OpConfig.ConnectionPooler.User
|
|
||||||
|
|
||||||
isSuperUser := connectionPoolerSpec.User == c.OpConfig.SuperUsername
|
|
||||||
isProtectedUser := c.shouldAvoidProtectedOrSystemRole(
|
|
||||||
connectionPoolerSpec.User, "connection pool role")
|
|
||||||
|
|
||||||
if !isSuperUser && !isProtectedUser {
|
|
||||||
username = util.Coalesce(
|
|
||||||
connectionPoolerSpec.User,
|
|
||||||
c.OpConfig.ConnectionPooler.User)
|
|
||||||
}
|
|
||||||
|
|
||||||
// connection pooler application should be able to login with this role
|
// connection pooler application should be able to login with this role
|
||||||
connectionPoolerUser := spec.PgUser{
|
connectionPoolerUser := spec.PgUser{
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package cluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
@ -222,7 +223,14 @@ type mockTeamsAPIClient struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockTeamsAPIClient) TeamInfo(teamID, token string) (tm *teams.Team, statusCode int, err error) {
|
func (m *mockTeamsAPIClient) TeamInfo(teamID, token string) (tm *teams.Team, statusCode int, err error) {
|
||||||
return &teams.Team{Members: m.members}, statusCode, nil
|
if len(m.members) > 0 {
|
||||||
|
return &teams.Team{Members: m.members}, http.StatusOK, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// when members are not set handle this as an error for this mock API
|
||||||
|
// makes it easier to test behavior when teams API is unavailable
|
||||||
|
return nil, http.StatusInternalServerError,
|
||||||
|
fmt.Errorf("mocked %d error of mock Teams API for team %q", http.StatusInternalServerError, teamID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockTeamsAPIClient) setMembers(members []string) {
|
func (m *mockTeamsAPIClient) setMembers(members []string) {
|
||||||
|
|
@ -237,32 +245,53 @@ func TestInitHumanUsers(t *testing.T) {
|
||||||
|
|
||||||
// members of a product team are granted superuser rights for DBs of their team
|
// members of a product team are granted superuser rights for DBs of their team
|
||||||
cl.OpConfig.EnableTeamSuperuser = true
|
cl.OpConfig.EnableTeamSuperuser = true
|
||||||
|
|
||||||
cl.OpConfig.EnableTeamsAPI = true
|
cl.OpConfig.EnableTeamsAPI = true
|
||||||
|
cl.OpConfig.EnableTeamMemberDeprecation = true
|
||||||
cl.OpConfig.PamRoleName = "zalandos"
|
cl.OpConfig.PamRoleName = "zalandos"
|
||||||
cl.Spec.TeamID = "test"
|
cl.Spec.TeamID = "test"
|
||||||
|
cl.Spec.Users = map[string]acidv1.UserFlags{"bar": []string{}}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
existingRoles map[string]spec.PgUser
|
existingRoles map[string]spec.PgUser
|
||||||
teamRoles []string
|
teamRoles []string
|
||||||
result map[string]spec.PgUser
|
result map[string]spec.PgUser
|
||||||
|
err error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
existingRoles: map[string]spec.PgUser{"foo": {Name: "foo", Origin: spec.RoleOriginTeamsAPI,
|
existingRoles: map[string]spec.PgUser{"foo": {Name: "foo", Origin: spec.RoleOriginTeamsAPI,
|
||||||
Flags: []string{"NOLOGIN"}}, "bar": {Name: "bar", Flags: []string{"NOLOGIN"}}},
|
Flags: []string{"LOGIN"}}, "bar": {Name: "bar", Flags: []string{"LOGIN"}}},
|
||||||
teamRoles: []string{"foo"},
|
teamRoles: []string{"foo"},
|
||||||
result: map[string]spec.PgUser{"foo": {Name: "foo", Origin: spec.RoleOriginTeamsAPI,
|
result: map[string]spec.PgUser{"foo": {Name: "foo", Origin: spec.RoleOriginTeamsAPI,
|
||||||
MemberOf: []string{cl.OpConfig.PamRoleName}, Flags: []string{"LOGIN", "SUPERUSER"}},
|
MemberOf: []string{cl.OpConfig.PamRoleName}, Flags: []string{"LOGIN", "SUPERUSER"}},
|
||||||
"bar": {Name: "bar", Flags: []string{"NOLOGIN"}}},
|
"bar": {Name: "bar", Flags: []string{"LOGIN"}}},
|
||||||
|
err: fmt.Errorf("could not init human users: cannot initialize members for team %q who owns the Postgres cluster: could not get list of team members for team %q: could not get team info for team %q: mocked %d error of mock Teams API for team %q",
|
||||||
|
cl.Spec.TeamID, cl.Spec.TeamID, cl.Spec.TeamID, http.StatusInternalServerError, cl.Spec.TeamID),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
existingRoles: map[string]spec.PgUser{},
|
existingRoles: map[string]spec.PgUser{},
|
||||||
teamRoles: []string{"admin", replicationUserName},
|
teamRoles: []string{"admin", replicationUserName},
|
||||||
result: map[string]spec.PgUser{},
|
result: map[string]spec.PgUser{},
|
||||||
|
err: nil,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
// set pgUsers so that initUsers sets up pgUsersCache with team roles
|
||||||
|
cl.pgUsers = tt.existingRoles
|
||||||
|
|
||||||
|
// initUsers calls initHumanUsers which should fail
|
||||||
|
// because no members are set for mocked teams API
|
||||||
|
if err := cl.initUsers(); err != nil {
|
||||||
|
// check that at least team roles are remembered in c.pgUsers
|
||||||
|
if len(cl.pgUsers) < len(tt.teamRoles) {
|
||||||
|
t.Errorf("%s unexpected size of pgUsers: expected at least %d, got %d", t.Name(), len(tt.teamRoles), len(cl.pgUsers))
|
||||||
|
}
|
||||||
|
if err.Error() != tt.err.Error() {
|
||||||
|
t.Errorf("%s expected error %v, got %v", t.Name(), err, tt.err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// set pgUsers again to test initHumanUsers with working teams API
|
||||||
cl.pgUsers = tt.existingRoles
|
cl.pgUsers = tt.existingRoles
|
||||||
mockTeamsAPI.setMembers(tt.teamRoles)
|
mockTeamsAPI.setMembers(tt.teamRoles)
|
||||||
if err := cl.initHumanUsers(); err != nil {
|
if err := cl.initHumanUsers(); err != nil {
|
||||||
|
|
@ -288,12 +317,14 @@ type mockTeamsAPIClientMultipleTeams struct {
|
||||||
func (m *mockTeamsAPIClientMultipleTeams) TeamInfo(teamID, token string) (tm *teams.Team, statusCode int, err error) {
|
func (m *mockTeamsAPIClientMultipleTeams) TeamInfo(teamID, token string) (tm *teams.Team, statusCode int, err error) {
|
||||||
for _, team := range m.teams {
|
for _, team := range m.teams {
|
||||||
if team.teamID == teamID {
|
if team.teamID == teamID {
|
||||||
return &teams.Team{Members: team.members}, statusCode, nil
|
return &teams.Team{Members: team.members}, http.StatusOK, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// should not be reached if a slice with teams is populated correctly
|
// when given teamId is not found in teams return StatusNotFound
|
||||||
return nil, statusCode, nil
|
// the operator should only return a warning in this case and not error out (#1842)
|
||||||
|
return nil, http.StatusNotFound,
|
||||||
|
fmt.Errorf("mocked %d error of mock Teams API for team %q", http.StatusNotFound, teamID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test adding members of maintenance teams that get superuser rights for all PG databases
|
// Test adding members of maintenance teams that get superuser rights for all PG databases
|
||||||
|
|
@ -392,6 +423,16 @@ func TestInitHumanUsersWithSuperuserTeams(t *testing.T) {
|
||||||
"postgres_superuser": userA,
|
"postgres_superuser": userA,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
// case 4: the team does not exist which should not return an error
|
||||||
|
{
|
||||||
|
ownerTeam: "acid",
|
||||||
|
existingRoles: map[string]spec.PgUser{},
|
||||||
|
superuserTeams: []string{"postgres_superusers"},
|
||||||
|
teams: []mockTeam{teamA, teamB, teamTest},
|
||||||
|
result: map[string]spec.PgUser{
|
||||||
|
"postgres_superuser": userA,
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
|
|
||||||
|
|
@ -75,6 +75,37 @@ func needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
|
||||||
*spec.EnableReplicaConnectionPooler
|
*spec.EnableReplicaConnectionPooler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) needConnectionPoolerUser(oldSpec, newSpec *acidv1.PostgresSpec) bool {
|
||||||
|
// return true if pooler is needed AND was not disabled before OR user name differs
|
||||||
|
return (needMasterConnectionPoolerWorker(newSpec) || needReplicaConnectionPoolerWorker(newSpec)) &&
|
||||||
|
((!needMasterConnectionPoolerWorker(oldSpec) &&
|
||||||
|
!needReplicaConnectionPoolerWorker(oldSpec)) ||
|
||||||
|
c.poolerUser(oldSpec) != c.poolerUser(newSpec))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) poolerUser(spec *acidv1.PostgresSpec) string {
|
||||||
|
connectionPoolerSpec := spec.ConnectionPooler
|
||||||
|
if connectionPoolerSpec == nil {
|
||||||
|
connectionPoolerSpec = &acidv1.ConnectionPooler{}
|
||||||
|
}
|
||||||
|
// Using superuser as pooler user is not a good idea. First of all it's
|
||||||
|
// not going to be synced correctly with the current implementation,
|
||||||
|
// and second it's a bad practice.
|
||||||
|
username := c.OpConfig.ConnectionPooler.User
|
||||||
|
|
||||||
|
isSuperUser := connectionPoolerSpec.User == c.OpConfig.SuperUsername
|
||||||
|
isProtectedUser := c.shouldAvoidProtectedOrSystemRole(
|
||||||
|
connectionPoolerSpec.User, "connection pool role")
|
||||||
|
|
||||||
|
if !isSuperUser && !isProtectedUser {
|
||||||
|
username = util.Coalesce(
|
||||||
|
connectionPoolerSpec.User,
|
||||||
|
c.OpConfig.ConnectionPooler.User)
|
||||||
|
}
|
||||||
|
|
||||||
|
return username
|
||||||
|
}
|
||||||
|
|
||||||
// when listing pooler k8s objects
|
// when listing pooler k8s objects
|
||||||
func (c *Cluster) poolerLabelsSet(addExtraLabels bool) labels.Set {
|
func (c *Cluster) poolerLabelsSet(addExtraLabels bool) labels.Set {
|
||||||
poolerLabels := c.labelsSet(addExtraLabels)
|
poolerLabels := c.labelsSet(addExtraLabels)
|
||||||
|
|
|
||||||
|
|
@ -104,10 +104,6 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
||||||
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
|
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
|
||||||
c.logger.Debug("syncing roles")
|
c.logger.Debug("syncing roles")
|
||||||
if err = c.syncRoles(); err != nil {
|
if err = c.syncRoles(); err != nil {
|
||||||
// remember all cached users in c.pgUsers
|
|
||||||
for cachedUserName, cachedUser := range c.pgUsersCache {
|
|
||||||
c.pgUsers[cachedUserName] = cachedUser
|
|
||||||
}
|
|
||||||
c.logger.Errorf("could not sync roles: %v", err)
|
c.logger.Errorf("could not sync roles: %v", err)
|
||||||
}
|
}
|
||||||
c.logger.Debug("syncing databases")
|
c.logger.Debug("syncing databases")
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue