making pgTeamMap a pointer (#1349)

* making pgTeamMap a pointer
* init empty map
* add e2e test for additional teams and members
* update test_min_resource_limits
* add more waiting in node_affinity_test
* no need for pointers in map of postgresTeamMebership
* another minor update on node affinity test
* refactor and fix fetching additional members
This commit is contained in:
Felix Kunde 2021-02-16 10:38:20 +01:00 committed by GitHub
parent 137fbbf41e
commit 41858a702c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 185 additions and 95 deletions

View File

@ -126,6 +126,7 @@ class EndToEndTestCase(unittest.TestCase):
"api-service.yaml", "api-service.yaml",
"infrastructure-roles.yaml", "infrastructure-roles.yaml",
"infrastructure-roles-new.yaml", "infrastructure-roles-new.yaml",
"custom-team-membership.yaml",
"e2e-storage-class.yaml"]: "e2e-storage-class.yaml"]:
result = k8s.create_with_kubectl("manifests/" + filename) result = k8s.create_with_kubectl("manifests/" + filename)
print("stdout: {}, stderr: {}".format(result.stdout, result.stderr)) print("stdout: {}, stderr: {}".format(result.stdout, result.stderr))
@ -174,6 +175,63 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: self.k8s.count_pods_with_container_capabilities(capabilities, cluster_label), self.eventuallyEqual(lambda: self.k8s.count_pods_with_container_capabilities(capabilities, cluster_label),
2, "Container capabilities not updated") 2, "Container capabilities not updated")
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_additional_teams_and_members(self):
'''
Test PostgresTeam CRD with extra teams and members
'''
# enable PostgresTeam CRD and lower resync
enable_postgres_team_crd = {
"data": {
"enable_postgres_team_crd": "true",
"resync_period": "15s",
},
}
self.k8s.update_config(enable_postgres_team_crd)
self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"},
"Operator does not get in sync")
self.k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default',
'postgresteams', 'custom-team-membership',
{
'spec': {
'additionalTeams': {
'acid': [
'e2e'
]
},
'additionalMembers': {
'e2e': [
'kind'
]
}
}
})
# make sure we let one sync pass and the new user being added
time.sleep(15)
leader = self.k8s.get_cluster_leader_pod('acid-minimal-cluster')
user_query = """
SELECT usename
FROM pg_catalog.pg_user
WHERE usename IN ('elephant', 'kind');
"""
users = self.query_database(leader.metadata.name, "postgres", user_query)
self.eventuallyEqual(lambda: len(users), 2,
"Not all additional users found in database: {}".format(users))
# revert config change
revert_resync = {
"data": {
"resync_period": "30m",
},
}
self.k8s.update_config(revert_resync)
self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"},
"Operator does not get in sync")
@timeout_decorator.timeout(TEST_TIMEOUT_SEC) @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_overwrite_pooler_deployment(self): def test_overwrite_pooler_deployment(self):
self.k8s.create_with_kubectl("manifests/minimal-fake-pooler-deployment.yaml") self.k8s.create_with_kubectl("manifests/minimal-fake-pooler-deployment.yaml")
@ -332,54 +390,19 @@ class EndToEndTestCase(unittest.TestCase):
# Verify that all the databases have pooler schema installed. # Verify that all the databases have pooler schema installed.
# Do this via psql, since otherwise we need to deal with # Do this via psql, since otherwise we need to deal with
# credentials. # credentials.
dbList = [] db_list = []
leader = k8s.get_cluster_leader_pod('acid-minimal-cluster') leader = k8s.get_cluster_leader_pod('acid-minimal-cluster')
dbListQuery = "select datname from pg_database" schemas_query = """
schemasQuery = """
select schema_name select schema_name
from information_schema.schemata from information_schema.schemata
where schema_name = 'pooler' where schema_name = 'pooler'
""" """
exec_query = r"psql -tAq -c \"{}\" -d {}"
if leader: db_list = self.list_databases(leader.metadata.name)
try: for db in db_list:
q = exec_query.format(dbListQuery, "postgres") self.eventuallyNotEqual(lambda: len(self.query_database(leader.metadata.name, db, schemas_query)), 0,
q = "su postgres -c \"{}\"".format(q) "Pooler schema not found in database {}".format(db))
print('Get databases: {}'.format(q))
result = k8s.exec_with_kubectl(leader.metadata.name, q)
dbList = clean_list(result.stdout.split(b'\n'))
print('dbList: {}, stdout: {}, stderr {}'.format(
dbList, result.stdout, result.stderr
))
except Exception as ex:
print('Could not get databases: {}'.format(ex))
print('Stdout: {}'.format(result.stdout))
print('Stderr: {}'.format(result.stderr))
for db in dbList:
if db in ('template0', 'template1'):
continue
schemas = []
try:
q = exec_query.format(schemasQuery, db)
q = "su postgres -c \"{}\"".format(q)
print('Get schemas: {}'.format(q))
result = k8s.exec_with_kubectl(leader.metadata.name, q)
schemas = clean_list(result.stdout.split(b'\n'))
print('schemas: {}, stdout: {}, stderr {}'.format(
schemas, result.stdout, result.stderr
))
except Exception as ex:
print('Could not get databases: {}'.format(ex))
print('Stdout: {}'.format(result.stdout))
print('Stderr: {}'.format(result.stderr))
self.assertNotEqual(len(schemas), 0)
else:
print('Could not find leader pod')
# remove config section to make test work next time # remove config section to make test work next time
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
@ -690,6 +713,7 @@ class EndToEndTestCase(unittest.TestCase):
"min_memory_limit": minMemoryLimit "min_memory_limit": minMemoryLimit
} }
} }
k8s.update_config(patch_min_resource_limits, "Minimum resource test")
# lower resource limits below minimum # lower resource limits below minimum
pg_patch_resources = { pg_patch_resources = {
@ -707,10 +731,8 @@ class EndToEndTestCase(unittest.TestCase):
} }
} }
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.patch_statefulset({"metadata": {"annotations": {"zalando-postgres-operator-rolling-update-required": "False"}}})
k8s.update_config(patch_min_resource_limits, "Minimum resource test")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running") self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running")
@ -967,7 +989,6 @@ class EndToEndTestCase(unittest.TestCase):
# verify we are in good state from potential previous tests # verify we are in good state from potential previous tests
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running") self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running")
self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# get nodes of master and replica(s) # get nodes of master and replica(s)
master_node, replica_nodes = k8s.get_pg_nodes(cluster_label) master_node, replica_nodes = k8s.get_pg_nodes(cluster_label)
@ -1053,6 +1074,9 @@ class EndToEndTestCase(unittest.TestCase):
body=patch_node_remove_affinity_config) body=patch_node_remove_affinity_config)
self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running")
# remove node affinity to move replica away from master node # remove node affinity to move replica away from master node
nm, new_replica_nodes = k8s.get_cluster_nodes() nm, new_replica_nodes = k8s.get_cluster_nodes()
new_master_node = nm[0] new_master_node = nm[0]
@ -1219,6 +1243,60 @@ class EndToEndTestCase(unittest.TestCase):
k8s.wait_for_pod_start('spilo-role=replica') k8s.wait_for_pod_start('spilo-role=replica')
return True return True
def list_databases(self, pod_name):
'''
Get list of databases we might want to iterate over
'''
k8s = self.k8s
result_set = []
db_list = []
db_list_query = "select datname from pg_database"
exec_query = r"psql -tAq -c \"{}\" -d {}"
try:
q = exec_query.format(db_list_query, "postgres")
q = "su postgres -c \"{}\"".format(q)
print('Get databases: {}'.format(q))
result = k8s.exec_with_kubectl(pod_name, q)
db_list = clean_list(result.stdout.split(b'\n'))
print('db_list: {}, stdout: {}, stderr {}'.format(
db_list, result.stdout, result.stderr
))
except Exception as ex:
print('Could not get databases: {}'.format(ex))
print('Stdout: {}'.format(result.stdout))
print('Stderr: {}'.format(result.stderr))
for db in db_list:
if db in ('template0', 'template1'):
continue
result_set.append(db)
return result_set
def query_database(self, pod_name, db_name, query):
'''
Query database and return result as a list
'''
k8s = self.k8s
result_set = []
exec_query = r"psql -tAq -c \"{}\" -d {}"
try:
q = exec_query.format(query, db_name)
q = "su postgres -c \"{}\"".format(q)
print('Send query: {}'.format(q))
result = k8s.exec_with_kubectl(pod_name, q)
result_set = clean_list(result.stdout.split(b'\n'))
print('result: {}, stdout: {}, stderr {}'.format(
result_set, result.stdout, result.stderr
))
except Exception as ex:
print('Error on query execution: {}'.format(ex))
print('Stdout: {}'.format(result.stdout))
print('Stderr: {}'.format(result.stderr))
return result_set
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -49,7 +49,7 @@ var (
type Config struct { type Config struct {
OpConfig config.Config OpConfig config.Config
RestConfig *rest.Config RestConfig *rest.Config
PgTeamMap pgteams.PostgresTeamMap PgTeamMap *pgteams.PostgresTeamMap
InfrastructureRoles map[string]spec.PgUser // inherited from the controller InfrastructureRoles map[string]spec.PgUser // inherited from the controller
PodServiceAccount *v1.ServiceAccount PodServiceAccount *v1.ServiceAccount
PodServiceAccountRoleBinding *rbacv1.RoleBinding PodServiceAccountRoleBinding *rbacv1.RoleBinding
@ -1143,8 +1143,8 @@ func (c *Cluster) initHumanUsers() error {
var clusterIsOwnedBySuperuserTeam bool var clusterIsOwnedBySuperuserTeam bool
superuserTeams := []string{} superuserTeams := []string{}
if c.OpConfig.EnablePostgresTeamCRDSuperusers { if c.OpConfig.EnablePostgresTeamCRD && c.OpConfig.EnablePostgresTeamCRDSuperusers && c.Config.PgTeamMap != nil {
superuserTeams = c.PgTeamMap.GetAdditionalSuperuserTeams(c.Spec.TeamID, true) superuserTeams = c.Config.PgTeamMap.GetAdditionalSuperuserTeams(c.Spec.TeamID, true)
} }
for _, postgresSuperuserTeam := range c.OpConfig.PostgresSuperuserTeams { for _, postgresSuperuserTeam := range c.OpConfig.PostgresSuperuserTeams {
@ -1163,12 +1163,14 @@ func (c *Cluster) initHumanUsers() error {
} }
} }
additionalTeams := c.PgTeamMap.GetAdditionalTeams(c.Spec.TeamID, true) if c.OpConfig.EnablePostgresTeamCRD && c.Config.PgTeamMap != nil {
for _, additionalTeam := range additionalTeams { additionalTeams := c.Config.PgTeamMap.GetAdditionalTeams(c.Spec.TeamID, true)
if !(util.SliceContains(superuserTeams, additionalTeam)) { for _, additionalTeam := range additionalTeams {
err := c.initTeamMembers(additionalTeam, false) if !(util.SliceContains(superuserTeams, additionalTeam)) {
if err != nil { err := c.initTeamMembers(additionalTeam, false)
return fmt.Errorf("Cannot initialize members for additional team %q for cluster owned by %q: %v", additionalTeam, c.Spec.TeamID, err) if err != nil {
return fmt.Errorf("Cannot initialize members for additional team %q for cluster owned by %q: %v", additionalTeam, c.Spec.TeamID, err)
}
} }
} }
} }

View File

@ -238,15 +238,26 @@ func (c *Cluster) getTeamMembers(teamID string) ([]string, error) {
return nil, fmt.Errorf("no teamId specified") return nil, fmt.Errorf("no teamId specified")
} }
c.logger.Debugf("fetching possible additional team members for team %q", teamID)
members := []string{} members := []string{}
additionalMembers := c.PgTeamMap[teamID].AdditionalMembers
for _, member := range additionalMembers { if c.OpConfig.EnablePostgresTeamCRD && c.Config.PgTeamMap != nil {
members = append(members, member) c.logger.Debugf("fetching possible additional team members for team %q", teamID)
additionalMembers := []string{}
for team, membership := range *c.Config.PgTeamMap {
if team == teamID {
additionalMembers = membership.AdditionalMembers
c.logger.Debugf("found %d additional members for team %q", len(members), teamID)
}
}
for _, member := range additionalMembers {
members = append(members, member)
}
} }
if !c.OpConfig.EnableTeamsAPI { if !c.OpConfig.EnableTeamsAPI {
c.logger.Debugf("team API is disabled, only returning %d members for team %q", len(members), teamID) c.logger.Debugf("team API is disabled")
return members, nil return members, nil
} }

View File

@ -329,10 +329,9 @@ func (c *Controller) initController() {
c.initSharedInformers() c.initSharedInformers()
c.pgTeamMap = teams.PostgresTeamMap{}
if c.opConfig.EnablePostgresTeamCRD { if c.opConfig.EnablePostgresTeamCRD {
c.loadPostgresTeams() c.loadPostgresTeams()
} else {
c.pgTeamMap = teams.PostgresTeamMap{}
} }
if c.opConfig.DebugLogging { if c.opConfig.DebugLogging {

View File

@ -15,7 +15,6 @@ import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/cluster" "github.com/zalando/postgres-operator/pkg/cluster"
"github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/teams"
"github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/k8sutil"
@ -31,7 +30,7 @@ func (c *Controller) makeClusterConfig() cluster.Config {
return cluster.Config{ return cluster.Config{
RestConfig: c.config.RestConfig, RestConfig: c.config.RestConfig,
OpConfig: config.Copy(c.opConfig), OpConfig: config.Copy(c.opConfig),
PgTeamMap: c.pgTeamMap, PgTeamMap: &c.pgTeamMap,
InfrastructureRoles: infrastructureRoles, InfrastructureRoles: infrastructureRoles,
PodServiceAccount: c.PodServiceAccount, PodServiceAccount: c.PodServiceAccount,
} }
@ -395,9 +394,6 @@ func (c *Controller) getInfrastructureRole(
} }
func (c *Controller) loadPostgresTeams() { func (c *Controller) loadPostgresTeams() {
// reset team map
c.pgTeamMap = teams.PostgresTeamMap{}
pgTeams, err := c.KubeClient.PostgresTeamsGetter.PostgresTeams(c.opConfig.WatchedNamespace).List(context.TODO(), metav1.ListOptions{}) pgTeams, err := c.KubeClient.PostgresTeamsGetter.PostgresTeams(c.opConfig.WatchedNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil { if err != nil {
c.logger.Errorf("could not list postgres team objects: %v", err) c.logger.Errorf("could not list postgres team objects: %v", err)

View File

@ -94,6 +94,9 @@ func (ptm *PostgresTeamMap) GetAdditionalSuperuserTeams(team string, transitive
// Load function to import data from PostgresTeam CRD // Load function to import data from PostgresTeam CRD
func (ptm *PostgresTeamMap) Load(pgTeams *acidv1.PostgresTeamList) { func (ptm *PostgresTeamMap) Load(pgTeams *acidv1.PostgresTeamList) {
// reset the team map
*ptm = make(PostgresTeamMap, 0)
superuserTeamSet := teamHashSet{} superuserTeamSet := teamHashSet{}
teamSet := teamHashSet{} teamSet := teamHashSet{}
teamMemberSet := teamHashSet{} teamMemberSet := teamHashSet{}

View File

@ -46,9 +46,36 @@ var (
}, },
}, },
} }
pgTeamMap = PostgresTeamMap{
"teamA": {
AdditionalSuperuserTeams: []string{"teamB", "team24x7"},
AdditionalTeams: []string{"teamC"},
AdditionalMembers: []string{},
},
"teamB": {
AdditionalSuperuserTeams: []string{"teamA", "teamC", "team24x7"},
AdditionalTeams: []string{},
AdditionalMembers: []string{"drno"},
},
"teamC": {
AdditionalSuperuserTeams: []string{"team24x7"},
AdditionalTeams: []string{"teamA", "teamB", "acid"},
AdditionalMembers: []string{},
},
"team24x7": {
AdditionalSuperuserTeams: []string{},
AdditionalTeams: []string{},
AdditionalMembers: []string{"optimusprime"},
},
"acid": {
AdditionalSuperuserTeams: []string{},
AdditionalTeams: []string{},
AdditionalMembers: []string{"batman"},
},
}
) )
// PostgresTeamMap is the operator's internal representation of all PostgresTeam CRDs // TestLoadingPostgresTeamCRD PostgresTeamMap is the operator's internal representation of all PostgresTeam CRDs
func TestLoadingPostgresTeamCRD(t *testing.T) { func TestLoadingPostgresTeamCRD(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -59,33 +86,7 @@ func TestLoadingPostgresTeamCRD(t *testing.T) {
{ {
"Check that CRD is imported correctly into the internal format", "Check that CRD is imported correctly into the internal format",
pgTeamList, pgTeamList,
PostgresTeamMap{ pgTeamMap,
"teamA": {
AdditionalSuperuserTeams: []string{"teamB", "team24x7"},
AdditionalTeams: []string{"teamC"},
AdditionalMembers: []string{},
},
"teamB": {
AdditionalSuperuserTeams: []string{"teamA", "teamC", "team24x7"},
AdditionalTeams: []string{},
AdditionalMembers: []string{"drno"},
},
"teamC": {
AdditionalSuperuserTeams: []string{"team24x7"},
AdditionalTeams: []string{"teamA", "teamB", "acid"},
AdditionalMembers: []string{},
},
"team24x7": {
AdditionalSuperuserTeams: []string{},
AdditionalTeams: []string{},
AdditionalMembers: []string{"optimusprime"},
},
"acid": {
AdditionalSuperuserTeams: []string{},
AdditionalTeams: []string{},
AdditionalMembers: []string{"batman"},
},
},
"Mismatch between PostgresTeam CRD and internal map", "Mismatch between PostgresTeam CRD and internal map",
}, },
} }