merge master

This commit is contained in:
Rafia Sabih 2020-10-29 13:36:07 +01:00
commit fd18e02f2a
61 changed files with 2879 additions and 657 deletions

View File

@ -24,12 +24,16 @@ PKG := `go list ./... | grep -v /vendor/`
ifeq ($(DEBUG),1)
DOCKERFILE = DebugDockerfile
DEBUG_POSTFIX := -debug
DEBUG_POSTFIX := -debug-$(shell date hhmmss)
BUILD_FLAGS += -gcflags "-N -l"
else
DOCKERFILE = Dockerfile
endif
ifeq ($(FRESH),1)
DEBUG_FRESH=$(shell date +"%H-%M-%S")
endif
ifdef CDP_PULL_REQUEST_NUMBER
CDP_TAG := -${CDP_BUILD_VERSION}
endif
@ -66,7 +70,7 @@ docker: ${DOCKERDIR}/${DOCKERFILE} docker-context
echo "Version ${VERSION}"
echo "CDP tag ${CDP_TAG}"
echo "git describe $(shell git describe --tags --always --dirty)"
cd "${DOCKERDIR}" && docker build --rm -t "$(IMAGE):$(TAG)$(CDP_TAG)$(DEBUG_POSTFIX)" -f "${DOCKERFILE}" .
cd "${DOCKERDIR}" && docker build --rm -t "$(IMAGE):$(TAG)$(CDP_TAG)$(DEBUG_FRESH)$(DEBUG_POSTFIX)" -f "${DOCKERFILE}" .
indocker-race:
docker run --rm -v "${GOPATH}":"${GOPATH}" -e GOPATH="${GOPATH}" -e RACE=1 -w ${PWD} golang:1.8.1 bash -c "make linux"
@ -97,4 +101,4 @@ test:
GO111MODULE=on go test ./...
e2e: docker # build operator image to be tested
cd e2e; make e2etest
cd e2e; make e2etest

View File

@ -319,6 +319,10 @@ spec:
properties:
enable_admin_role_for_users:
type: boolean
enable_postgres_team_crd:
type: boolean
enable_postgres_team_crd_superusers:
type: boolean
enable_team_superuser:
type: boolean
enable_teams_api:

View File

@ -0,0 +1,67 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: postgresteams.acid.zalan.do
labels:
app.kubernetes.io/name: postgres-operator
annotations:
"helm.sh/hook": crd-install
spec:
group: acid.zalan.do
names:
kind: PostgresTeam
listKind: PostgresTeamList
plural: postgresteams
singular: postgresteam
shortNames:
- pgteam
scope: Namespaced
subresources:
status: {}
version: v1
validation:
openAPIV3Schema:
type: object
required:
- kind
- apiVersion
- spec
properties:
kind:
type: string
enum:
- PostgresTeam
apiVersion:
type: string
enum:
- acid.zalan.do/v1
spec:
type: object
properties:
additionalSuperuserTeams:
type: object
description: "Map for teamId and associated additional superuser teams"
additionalProperties:
type: array
nullable: true
description: "List of teams to become Postgres superusers"
items:
type: string
additionalTeams:
type: object
description: "Map for teamId and associated additional teams"
additionalProperties:
type: array
nullable: true
description: "List of teams whose members will also be added to the Postgres cluster"
items:
type: string
additionalMembers:
type: object
description: "Map for teamId and associated additional users"
additionalProperties:
type: array
nullable: true
description: "List of users who will also be added to the Postgres cluster"
items:
type: string

View File

@ -25,6 +25,15 @@ rules:
- patch
- update
- watch
# operator only reads PostgresTeams
- apiGroups:
- acid.zalan.do
resources:
- postgresteams
verbs:
- get
- list
- watch
# to create or get/update CRDs when starting up
- apiGroups:
- apiextensions.k8s.io

View File

@ -256,6 +256,11 @@ configTeamsApi:
# team_admin_role will have the rights to grant roles coming from PG manifests
# enable_admin_role_for_users: true
# operator watches for PostgresTeam CRs to assign additional teams and members to clusters
enable_postgres_team_crd: false
# toogle to create additional superuser teams from PostgresTeam CRs
# enable_postgres_team_crd_superusers: "false"
# toggle to grant superuser to team members created from the Teams API
enable_team_superuser: false
# toggles usage of the Teams API by the operator

View File

@ -248,6 +248,11 @@ configTeamsApi:
# team_admin_role will have the rights to grant roles coming from PG manifests
# enable_admin_role_for_users: "true"
# operator watches for PostgresTeam CRs to assign additional teams and members to clusters
enable_postgres_team_crd: "false"
# toogle to create additional superuser teams from PostgresTeam CRs
# enable_postgres_team_crd_superusers: "false"
# toggle to grant superuser to team members created from the Teams API
# enable_team_superuser: "false"

View File

@ -561,9 +561,12 @@ database.
* **Human users** originate from the [Teams API](user.md#teams-api-roles) that
returns a list of the team members given a team id. The operator differentiates
between (a) product teams that own a particular Postgres cluster and are granted
admin rights to maintain it, and (b) Postgres superuser teams that get the
superuser access to all Postgres databases running in a K8s cluster for the
purposes of maintaining and troubleshooting.
admin rights to maintain it, (b) Postgres superuser teams that get superuser
access to all Postgres databases running in a K8s cluster for the purposes of
maintaining and troubleshooting, and (c) additional teams, superuser teams or
members associated with the owning team. The latter is managed via the
[PostgresTeam CRD](user.md#additional-teams-and-members-per-cluster).
## Understanding rolling update of Spilo pods

View File

@ -598,8 +598,8 @@ key.
The default is `"log_statement:all"`
* **enable_team_superuser**
whether to grant superuser to team members created from the Teams API.
The default is `false`.
whether to grant superuser to members of the cluster's owning team created
from the Teams API. The default is `false`.
* **team_admin_role**
role name to grant to team members created from the Teams API. The default is
@ -632,6 +632,16 @@ key.
cluster to administer Postgres and maintain infrastructure built around it.
The default is empty.
* **enable_postgres_team_crd**
toggle to make the operator watch for created or updated `PostgresTeam` CRDs
and create roles for specified additional teams and members.
The default is `false`.
* **enable_postgres_team_crd_superusers**
in a `PostgresTeam` CRD additional superuser teams can assigned to teams that
own clusters. With this flag set to `false`, it will be ignored.
The default is `false`.
## Logging and REST API
Parameters affecting logging and REST API listener. In the CRD-based

View File

@ -269,6 +269,72 @@ to choose superusers, group roles, [PAM configuration](https://github.com/CyberD
etc. An OAuth2 token can be passed to the Teams API via a secret. The name for
this secret is configurable with the `oauth_token_secret_name` parameter.
### Additional teams and members per cluster
Postgres clusters are associated with one team by providing the `teamID` in
the manifest. Additional superuser teams can be configured as mentioned in
the previous paragraph. However, this is a global setting. To assign
additional teams, superuser teams and single users to clusters of a given
team, use the [PostgresTeam CRD](../manifests/postgresteam.yaml). It provides
a simple mapping structure.
```yaml
apiVersion: "acid.zalan.do/v1"
kind: PostgresTeam
metadata:
name: custom-team-membership
spec:
additionalSuperuserTeams:
acid:
- "postgres_superusers"
additionalTeams:
acid: []
additionalMembers:
acid:
- "elephant"
```
One `PostgresTeam` resource could contain mappings of multiple teams but you
can choose to create separate CRDs, alternatively. On each CRD creation or
update the operator will gather all mappings to create additional human users
in databases the next time they are synced. Additional teams are resolved
transitively, meaning you will also add users for their `additionalTeams`
or (not and) `additionalSuperuserTeams`.
For each additional team the Teams API would be queried. Additional members
will be added either way. There can be "virtual teams" that do not exists in
your Teams API but users of associated teams as well as members will get
created. With `PostgresTeams` it's also easy to cover team name changes. Just
add the mapping between old and new team name and the rest can stay the same.
```yaml
apiVersion: "acid.zalan.do/v1"
kind: PostgresTeam
metadata:
name: virtualteam-membership
spec:
additionalSuperuserTeams:
acid:
- "virtual_superusers"
virtual_superusers:
- "real_teamA"
- "real_teamB"
real_teamA:
- "real_teamA_renamed"
additionalTeams:
real_teamA:
- "real_teamA_renamed"
additionalMembers:
virtual_superusers:
- "foo"
```
Note, by default the `PostgresTeam` support is disabled in the configuration.
Switch `enable_postgres_team_crd` flag to `true` and the operator will start to
watch for this CRD. Make sure, the cluster role is up to date and contains a
section for [PostgresTeam](../manifests/operator-service-account-rbac.yaml#L30).
## Prepared databases with roles and default privileges
The `users` section in the manifests only allows for creating database roles

View File

@ -14,6 +14,7 @@ RUN apt-get update \
python3-setuptools \
python3-pip \
curl \
vim \
&& pip3 install --no-cache-dir -r requirements.txt \
&& curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/bin/linux/amd64/kubectl \
&& chmod +x ./kubectl \
@ -21,4 +22,7 @@ RUN apt-get update \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
ENTRYPOINT ["python3", "-m", "unittest", "discover", "--start-directory", ".", "-v"]
# working line
# python3 -m unittest discover -v --failfast -k test_e2e.EndToEndTestCase.test_lazy_spilo_upgrade --start-directory tests
ENTRYPOINT ["python3", "-m", "unittest"]
CMD ["discover","-v","--failfast","--start-directory","/tests"]

View File

@ -12,6 +12,10 @@ Docker.
Docker
Go
# Notice
The `manifest` folder in e2e tests folder is not commited to git, it comes from `/manifests`
## Build test runner
In the directory of the cloned Postgres Operator repository change to the e2e
@ -35,6 +39,46 @@ In the e2e folder you can invoke tests either with `make test` or with:
To run both the build and test step you can invoke `make e2e` from the parent
directory.
To run the end 2 end test and keep the kind state execute:
```bash
NOCLEANUP=True ./run.sh
```
## Run indidual test
After having executed a normal E2E run with `NOCLEANUP=True` Kind still continues to run, allowing you subsequent test runs.
To run an individual test, run the following command in the `e2e` directory
```bash
NOCLEANUP=True ./run.sh main tests.test_e2e.EndToEndTestCase.test_lazy_spilo_upgrade
```
## Inspecting Kind
If you want to inspect Kind/Kubernetes cluster, use the following script to exec into the K8s setup and then use `kubectl`
```bash
./exec_into_env.sh
# use kube ctl
kubectl get pods
# watch relevant objects
./scripts/watch_objects.sh
# get operator logs
./scripts/get_logs.sh
```
## Cleaning up Kind
To cleanup kind and start fresh
```bash
e2e/run.sh cleanup
```
## Covered use cases
The current tests are all bundled in [`test_e2e.py`](tests/test_e2e.py):

14
e2e/exec_into_env.sh Executable file
View File

@ -0,0 +1,14 @@
#!/bin/bash
export cluster_name="postgres-operator-e2e-tests"
export kubeconfig_path="/tmp/kind-config-${cluster_name}"
export operator_image="registry.opensource.zalan.do/acid/postgres-operator:latest"
export e2e_test_runner_image="registry.opensource.zalan.do/acid/postgres-operator-e2e-tests-runner:0.3"
docker run -it --entrypoint /bin/bash --network=host -e "TERM=xterm-256color" \
--mount type=bind,source="$(readlink -f ${kubeconfig_path})",target=/root/.kube/config \
--mount type=bind,source="$(readlink -f manifests)",target=/manifests \
--mount type=bind,source="$(readlink -f tests)",target=/tests \
--mount type=bind,source="$(readlink -f exec.sh)",target=/exec.sh \
--mount type=bind,source="$(readlink -f scripts)",target=/scripts \
-e OPERATOR_IMAGE="${operator_image}" "${e2e_test_runner_image}"

View File

@ -9,6 +9,10 @@ IFS=$'\n\t'
readonly cluster_name="postgres-operator-e2e-tests"
readonly kubeconfig_path="/tmp/kind-config-${cluster_name}"
readonly spilo_image="registry.opensource.zalan.do/acid/spilo-12:1.6-p5"
readonly e2e_test_runner_image="registry.opensource.zalan.do/acid/postgres-operator-e2e-tests-runner:0.3"
export GOPATH=${GOPATH-~/go}
export PATH=${GOPATH}/bin:$PATH
echo "Clustername: ${cluster_name}"
echo "Kubeconfig path: ${kubeconfig_path}"
@ -19,12 +23,7 @@ function pull_images(){
then
docker pull registry.opensource.zalan.do/acid/postgres-operator:latest
fi
operator_image=$(docker images --filter=reference="registry.opensource.zalan.do/acid/postgres-operator" --format "{{.Repository}}:{{.Tag}}" | head -1)
# this image does not contain the tests; a container mounts them from a local "./tests" dir at start time
e2e_test_runner_image="registry.opensource.zalan.do/acid/postgres-operator-e2e-tests-runner:latest"
docker pull ${e2e_test_runner_image}
}
function start_kind(){
@ -36,12 +35,17 @@ function start_kind(){
fi
export KUBECONFIG="${kubeconfig_path}"
kind create cluster --name ${cluster_name} --config kind-cluster-postgres-operator-e2e-tests.yaml
kind load docker-image "${operator_image}" --name ${cluster_name}
kind create cluster --name ${cluster_name} --config kind-cluster-postgres-operator-e2e-tests.yaml
docker pull "${spilo_image}"
kind load docker-image "${spilo_image}" --name ${cluster_name}
}
function load_operator_image() {
echo "Loading operator image"
export KUBECONFIG="${kubeconfig_path}"
kind load docker-image "${operator_image}" --name ${cluster_name}
}
function set_kind_api_server_ip(){
echo "Setting up kind API server ip"
# use the actual kubeconfig to connect to the 'kind' API server
@ -52,8 +56,7 @@ function set_kind_api_server_ip(){
}
function run_tests(){
echo "Running tests..."
echo "Running tests... image: ${e2e_test_runner_image}"
# tests modify files in ./manifests, so we mount a copy of this directory done by the e2e Makefile
docker run --rm --network=host -e "TERM=xterm-256color" \
@ -61,11 +64,11 @@ function run_tests(){
--mount type=bind,source="$(readlink -f manifests)",target=/manifests \
--mount type=bind,source="$(readlink -f tests)",target=/tests \
--mount type=bind,source="$(readlink -f exec.sh)",target=/exec.sh \
-e OPERATOR_IMAGE="${operator_image}" "${e2e_test_runner_image}"
--mount type=bind,source="$(readlink -f scripts)",target=/scripts \
-e OPERATOR_IMAGE="${operator_image}" "${e2e_test_runner_image}" ${E2E_TEST_CASE-} $@
}
function clean_up(){
function cleanup(){
echo "Executing cleanup"
unset KUBECONFIG
kind delete cluster --name ${cluster_name}
@ -73,14 +76,16 @@ function clean_up(){
}
function main(){
echo "Entering main function..."
[[ -z ${NOCLEANUP-} ]] && trap "cleanup" QUIT TERM EXIT
pull_images
[[ ! -f ${kubeconfig_path} ]] && start_kind
load_operator_image
set_kind_api_server_ip
trap "clean_up" QUIT TERM EXIT
time pull_images
time start_kind
time set_kind_api_server_ip
run_tests
shift
run_tests $@
exit 0
}
"$@"
"$1" $@

7
e2e/scripts/cleanup.sh Executable file
View File

@ -0,0 +1,7 @@
#!/bin/bash
kubectl delete postgresql acid-minimal-cluster
kubectl delete deployments -l application=db-connection-pooler,cluster-name=acid-minimal-cluster
kubectl delete statefulsets -l application=spilo,cluster-name=acid-minimal-cluster
kubectl delete services -l application=spilo,cluster-name=acid-minimal-cluster
kubectl delete configmap postgres-operator
kubectl delete deployment postgres-operator

2
e2e/scripts/get_logs.sh Executable file
View File

@ -0,0 +1,2 @@
#!/bin/bash
kubectl logs $(kubectl get pods -l name=postgres-operator --field-selector status.phase=Running -o jsonpath='{.items..metadata.name}')

19
e2e/scripts/watch_objects.sh Executable file
View File

@ -0,0 +1,19 @@
#!/bin/bash
watch -c "
kubectl get postgresql
echo
echo -n 'Rolling upgrade pending: '
kubectl get statefulset -o jsonpath='{.items..metadata.annotations.zalando-postgres-operator-rolling-update-required}'
echo
echo
kubectl get pods -o wide
echo
kubectl get statefulsets
echo
kubectl get deployments
echo
kubectl get pods -l name=postgres-operator -o jsonpath='{.items..metadata.annotations.step}'
echo
kubectl get pods -l application=spilo -o jsonpath='{.items..spec.containers..image}'
"

522
e2e/tests/k8s_api.py Normal file
View File

@ -0,0 +1,522 @@
import json
import unittest
import time
import timeout_decorator
import subprocess
import warnings
import os
import yaml
from datetime import datetime
from kubernetes import client, config
from kubernetes.client.rest import ApiException
def to_selector(labels):
return ",".join(["=".join(l) for l in labels.items()])
class K8sApi:
def __init__(self):
# https://github.com/kubernetes-client/python/issues/309
warnings.simplefilter("ignore", ResourceWarning)
self.config = config.load_kube_config()
self.k8s_client = client.ApiClient()
self.core_v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.batch_v1_beta1 = client.BatchV1beta1Api()
self.custom_objects_api = client.CustomObjectsApi()
self.policy_v1_beta1 = client.PolicyV1beta1Api()
self.storage_v1_api = client.StorageV1Api()
class K8s:
'''
Wraps around K8s api client and helper methods.
'''
RETRY_TIMEOUT_SEC = 1
def __init__(self, labels='x=y', namespace='default'):
self.api = K8sApi()
self.labels=labels
self.namespace=namespace
def get_pg_nodes(self, pg_cluster_name, namespace='default'):
master_pod_node = ''
replica_pod_nodes = []
podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pg_cluster_name)
for pod in podsList.items:
if pod.metadata.labels.get('spilo-role') == 'master':
master_pod_node = pod.spec.node_name
elif pod.metadata.labels.get('spilo-role') == 'replica':
replica_pod_nodes.append(pod.spec.node_name)
return master_pod_node, replica_pod_nodes
def get_cluster_nodes(self, cluster_labels='cluster-name=acid-minimal-cluster', namespace='default'):
m = []
r = []
podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=cluster_labels)
for pod in podsList.items:
if pod.metadata.labels.get('spilo-role') == 'master' and pod.status.phase == 'Running':
m.append(pod.spec.node_name)
elif pod.metadata.labels.get('spilo-role') == 'replica' and pod.status.phase == 'Running':
r.append(pod.spec.node_name)
return m, r
def wait_for_operator_pod_start(self):
self.wait_for_pod_start("name=postgres-operator")
# give operator time to subscribe to objects
time.sleep(1)
return True
def get_operator_pod(self):
pods = self.api.core_v1.list_namespaced_pod(
'default', label_selector='name=postgres-operator'
).items
pods = list(filter(lambda x: x.status.phase=='Running', pods))
if len(pods):
return pods[0]
return None
def get_operator_log(self):
operator_pod = self.get_operator_pod()
pod_name = operator_pod.metadata.name
return self.api.core_v1.read_namespaced_pod_log(
name=pod_name,
namespace='default'
)
def pg_get_status(self, name="acid-minimal-cluster", namespace="default"):
pg = self.api.custom_objects_api.get_namespaced_custom_object(
"acid.zalan.do", "v1", namespace, "postgresqls", name)
return pg.get("status", {}).get("PostgresClusterStatus", None)
def wait_for_pod_start(self, pod_labels, namespace='default'):
pod_phase = 'No pod running'
while pod_phase != 'Running':
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items
if pods:
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_service_type(self, svc_labels, namespace='default'):
svc_type = ''
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
svc_type = svc.spec.type
return svc_type
def check_service_annotations(self, svc_labels, annotations, namespace='default'):
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
for key, value in annotations.items():
if not svc.metadata.annotations or key not in svc.metadata.annotations or svc.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, svc.metadata.annotations))
return False
return True
def check_statefulset_annotations(self, sset_labels, annotations, namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=sset_labels, limit=1).items
for sset in ssets:
for key, value in annotations.items():
if key not in sset.metadata.annotations or sset.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, sset.metadata.annotations))
return False
return True
def scale_cluster(self, number_of_instances, name="acid-minimal-cluster", namespace="default"):
body = {
"spec": {
"numberOfInstances": number_of_instances
}
}
self.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", namespace, "postgresqls", name, body)
def wait_for_running_pods(self, labels, number, namespace=''):
while self.count_pods_with_label(labels) != number:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_pods_to_stop(self, labels, namespace=''):
while self.count_pods_with_label(labels) != 0:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_service(self, labels, namespace='default'):
def get_services():
return self.api.core_v1.list_namespaced_service(
namespace, label_selector=labels
).items
while not get_services():
time.sleep(self.RETRY_TIMEOUT_SEC)
def count_pods_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items)
def count_services_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items)
def count_endpoints_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items)
def count_secrets_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items)
def count_statefulsets_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=labels).items)
def count_deployments_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_deployment(namespace, label_selector=labels).items)
def count_pdbs_with_label(self, labels, namespace='default'):
return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget(
namespace, label_selector=labels).items)
def count_running_pods(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
return len(list(filter(lambda x: x.status.phase=='Running', pods)))
def wait_for_pod_failover(self, failover_targets, labels, namespace='default'):
pod_phase = 'Failing over'
new_pod_node = ''
while (pod_phase != 'Running') or (new_pod_node not in failover_targets):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
if pods:
new_pod_node = pods[0].spec.node_name
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_logical_backup_job(self, namespace='default'):
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
def wait_for_logical_backup_job(self, expected_num_of_jobs):
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_logical_backup_job_deletion(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=0)
def wait_for_logical_backup_job_creation(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
def delete_operator_pod(self, step="Delete operator deplyment"):
operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.apps_v1.patch_namespaced_deployment("postgres-operator","default", {"spec":{"template":{"metadata":{"annotations":{"step":"{}-{}".format(step, time.time())}}}}})
self.wait_for_operator_pod_start()
def update_config(self, config_map_patch, step="Updating operator deployment"):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
self.delete_operator_pod(step=step)
def patch_statefulset(self, data, name="acid-minimal-cluster", namespace="default"):
self.api.apps_v1.patch_namespaced_stateful_set(name, namespace, data)
def create_with_kubectl(self, path):
return subprocess.run(
["kubectl", "apply", "-f", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def exec_with_kubectl(self, pod, cmd):
return subprocess.run(["./exec.sh", pod, cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def get_patroni_state(self, pod):
r = self.exec_with_kubectl(pod, "patronictl list -f json")
if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[":
return []
return json.loads(r.stdout.decode())
def get_patroni_running_members(self, pod="acid-minimal-cluster-0"):
result = self.get_patroni_state(pod)
return list(filter(lambda x: "State" in x and x["State"] == "running", result))
def get_deployment_replica_count(self, name="acid-minimal-cluster-pooler", namespace="default"):
try:
deployment = self.api.apps_v1.read_namespaced_deployment(name, namespace)
return deployment.spec.replicas
except ApiException as e:
return None
def get_statefulset_image(self, label_selector="application=spilo,cluster-name=acid-minimal-cluster", namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=label_selector, limit=1)
if len(ssets.items) == 0:
return None
return ssets.items[0].spec.template.spec.containers[0].image
def get_effective_pod_image(self, pod_name, namespace='default'):
'''
Get the Spilo image pod currently uses. In case of lazy rolling updates
it may differ from the one specified in the stateful set.
'''
pod = self.api.core_v1.list_namespaced_pod(
namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name)
if len(pod.items) == 0:
return None
return pod.items[0].spec.containers[0].image
def get_cluster_leader_pod(self, pg_cluster_name, namespace='default'):
labels = {
'application': 'spilo',
'cluster-name': pg_cluster_name,
'spilo-role': 'master',
}
pods = self.api.core_v1.list_namespaced_pod(
namespace, label_selector=to_selector(labels)).items
if pods:
return pods[0]
class K8sBase:
'''
K8s basic API wrapper class supposed to be inherited by other more specific classes for e2e tests
'''
RETRY_TIMEOUT_SEC = 1
def __init__(self, labels='x=y', namespace='default'):
self.api = K8sApi()
self.labels=labels
self.namespace=namespace
def get_pg_nodes(self, pg_cluster_labels='cluster-name=acid-minimal-cluster', namespace='default'):
master_pod_node = ''
replica_pod_nodes = []
podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pg_cluster_labels)
for pod in podsList.items:
if pod.metadata.labels.get('spilo-role') == 'master':
master_pod_node = pod.spec.node_name
elif pod.metadata.labels.get('spilo-role') == 'replica':
replica_pod_nodes.append(pod.spec.node_name)
return master_pod_node, replica_pod_nodes
def get_cluster_nodes(self, cluster_labels='cluster-name=acid-minimal-cluster', namespace='default'):
m = []
r = []
podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=cluster_labels)
for pod in podsList.items:
if pod.metadata.labels.get('spilo-role') == 'master' and pod.status.phase == 'Running':
m.append(pod.spec.node_name)
elif pod.metadata.labels.get('spilo-role') == 'replica' and pod.status.phase == 'Running':
r.append(pod.spec.node_name)
return m, r
def wait_for_operator_pod_start(self):
self.wait_for_pod_start("name=postgres-operator")
def get_operator_pod(self):
pods = self.api.core_v1.list_namespaced_pod(
'default', label_selector='name=postgres-operator'
).items
if pods:
return pods[0]
return None
def get_operator_log(self):
operator_pod = self.get_operator_pod()
pod_name = operator_pod.metadata.name
return self.api.core_v1.read_namespaced_pod_log(
name=pod_name,
namespace='default'
)
def wait_for_pod_start(self, pod_labels, namespace='default'):
pod_phase = 'No pod running'
while pod_phase != 'Running':
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items
if pods:
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_service_type(self, svc_labels, namespace='default'):
svc_type = ''
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
svc_type = svc.spec.type
return svc_type
def check_service_annotations(self, svc_labels, annotations, namespace='default'):
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
for key, value in annotations.items():
if key not in svc.metadata.annotations or svc.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, svc.metadata.annotation))
return False
return True
def check_statefulset_annotations(self, sset_labels, annotations, namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=sset_labels, limit=1).items
for sset in ssets:
for key, value in annotations.items():
if key not in sset.metadata.annotations or sset.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, sset.metadata.annotation))
return False
return True
def scale_cluster(self, number_of_instances, name="acid-minimal-cluster", namespace="default"):
body = {
"spec": {
"numberOfInstances": number_of_instances
}
}
self.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", namespace, "postgresqls", name, body)
def wait_for_running_pods(self, labels, number, namespace=''):
while self.count_pods_with_label(labels) != number:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_pods_to_stop(self, labels, namespace=''):
while self.count_pods_with_label(labels) != 0:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_service(self, labels, namespace='default'):
def get_services():
return self.api.core_v1.list_namespaced_service(
namespace, label_selector=labels
).items
while not get_services():
time.sleep(self.RETRY_TIMEOUT_SEC)
def count_pods_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items)
def count_services_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items)
def count_endpoints_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items)
def count_secrets_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items)
def count_statefulsets_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=labels).items)
def count_deployments_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_deployment(namespace, label_selector=labels).items)
def count_pdbs_with_label(self, labels, namespace='default'):
return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget(
namespace, label_selector=labels).items)
def count_running_pods(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
return len(list(filter(lambda x: x.status.phase=='Running', pods)))
def wait_for_pod_failover(self, failover_targets, labels, namespace='default'):
pod_phase = 'Failing over'
new_pod_node = ''
while (pod_phase != 'Running') or (new_pod_node not in failover_targets):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
if pods:
new_pod_node = pods[0].spec.node_name
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_logical_backup_job(self, namespace='default'):
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
def wait_for_logical_backup_job(self, expected_num_of_jobs):
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_logical_backup_job_deletion(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=0)
def wait_for_logical_backup_job_creation(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
def delete_operator_pod(self, step="Delete operator deplyment"):
operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.apps_v1.patch_namespaced_deployment("postgres-operator","default", {"spec":{"template":{"metadata":{"annotations":{"step":"{}-{}".format(step, time.time())}}}}})
self.wait_for_operator_pod_start()
def update_config(self, config_map_patch, step="Updating operator deployment"):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
self.delete_operator_pod(step=step)
def create_with_kubectl(self, path):
return subprocess.run(
["kubectl", "apply", "-f", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def exec_with_kubectl(self, pod, cmd):
return subprocess.run(["./exec.sh", pod, cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def get_patroni_state(self, pod):
r = self.exec_with_kubectl(pod, "patronictl list -f json")
if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[":
return []
return json.loads(r.stdout.decode())
def get_patroni_running_members(self, pod):
result = self.get_patroni_state(pod)
return list(filter(lambda x: x["State"]=="running", result))
def get_statefulset_image(self, label_selector="application=spilo,cluster-name=acid-minimal-cluster", namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=label_selector, limit=1)
if len(ssets.items) == 0:
return None
return ssets.items[0].spec.template.spec.containers[0].image
def get_effective_pod_image(self, pod_name, namespace='default'):
'''
Get the Spilo image pod currently uses. In case of lazy rolling updates
it may differ from the one specified in the stateful set.
'''
pod = self.api.core_v1.list_namespaced_pod(
namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name)
if len(pod.items) == 0:
return None
return pod.items[0].spec.containers[0].image
"""
Inspiriational classes towards easier writing of end to end tests with one cluster per test case
"""
class K8sOperator(K8sBase):
def __init__(self, labels="name=postgres-operator", namespace="default"):
super().__init__(labels, namespace)
class K8sPostgres(K8sBase):
def __init__(self, labels="cluster-name=acid-minimal-cluster", namespace="default"):
super().__init__(labels, namespace)
def get_pg_nodes(self):
master_pod_node = ''
replica_pod_nodes = []
podsList = self.api.core_v1.list_namespaced_pod(self.namespace, label_selector=self.labels)
for pod in podsList.items:
if pod.metadata.labels.get('spilo-role') == 'master':
master_pod_node = pod.spec.node_name
elif pod.metadata.labels.get('spilo-role') == 'replica':
replica_pod_nodes.append(pod.spec.node_name)
return master_pod_node, replica_pod_nodes

File diff suppressed because it is too large Load Diff

View File

@ -41,6 +41,8 @@ data:
enable_master_load_balancer: "false"
# enable_pod_antiaffinity: "false"
# enable_pod_disruption_budget: "true"
# enable_postgres_team_crd: "false"
# enable_postgres_team_crd_superusers: "false"
enable_replica_load_balancer: "false"
# enable_shm_volume: "true"
# enable_sidecars: "true"

View File

@ -0,0 +1,13 @@
apiVersion: "acid.zalan.do/v1"
kind: PostgresTeam
metadata:
name: custom-team-membership
spec:
additionalSuperuserTeams:
acid:
- "postgres_superusers"
additionalTeams:
acid: []
additionalMembers:
acid:
- "elephant"

View File

@ -26,6 +26,15 @@ rules:
- patch
- update
- watch
# operator only reads PostgresTeams
- apiGroups:
- acid.zalan.do
resources:
- postgresteams
verbs:
- get
- list
- watch
# to create or get/update CRDs when starting up
- apiGroups:
- apiextensions.k8s.io

View File

@ -325,6 +325,10 @@ spec:
properties:
enable_admin_role_for_users:
type: boolean
enable_postgres_team_crd:
type: boolean
enable_postgres_team_crd_superusers:
type: boolean
enable_team_superuser:
type: boolean
enable_teams_api:

View File

@ -4,6 +4,8 @@ metadata:
name: postgres-operator
spec:
replicas: 1
strategy:
type: "Recreate"
selector:
matchLabels:
name: postgres-operator

View File

@ -122,6 +122,8 @@ configuration:
enable_database_access: true
teams_api:
# enable_admin_role_for_users: true
# enable_postgres_team_crd: false
# enable_postgres_team_crd_superusers: false
enable_team_superuser: false
enable_teams_api: false
# pam_configuration: ""

View File

@ -0,0 +1,63 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: postgresteams.acid.zalan.do
spec:
group: acid.zalan.do
names:
kind: PostgresTeam
listKind: PostgresTeamList
plural: postgresteams
singular: postgresteam
shortNames:
- pgteam
scope: Namespaced
subresources:
status: {}
version: v1
validation:
openAPIV3Schema:
type: object
required:
- kind
- apiVersion
- spec
properties:
kind:
type: string
enum:
- PostgresTeam
apiVersion:
type: string
enum:
- acid.zalan.do/v1
spec:
type: object
properties:
additionalSuperuserTeams:
type: object
description: "Map for teamId and associated additional superuser teams"
additionalProperties:
type: array
nullable: true
description: "List of teams to become Postgres superusers"
items:
type: string
additionalTeams:
type: object
description: "Map for teamId and associated additional teams"
additionalProperties:
type: array
nullable: true
description: "List of teams whose members will also be added to the Postgres cluster"
items:
type: string
additionalMembers:
type: object
description: "Map for teamId and associated additional users"
additionalProperties:
type: array
nullable: true
description: "List of users who will also be added to the Postgres cluster"
items:
type: string

View File

@ -1238,6 +1238,12 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation
"enable_admin_role_for_users": {
Type: "boolean",
},
"enable_postgres_team_crd": {
Type: "boolean",
},
"enable_postgres_team_crd_superusers": {
Type: "boolean",
},
"enable_team_superuser": {
Type: "boolean",
},

View File

@ -135,16 +135,18 @@ type OperatorDebugConfiguration struct {
// TeamsAPIConfiguration defines the configuration of TeamsAPI
type TeamsAPIConfiguration struct {
EnableTeamsAPI bool `json:"enable_teams_api,omitempty"`
TeamsAPIUrl string `json:"teams_api_url,omitempty"`
TeamAPIRoleConfiguration map[string]string `json:"team_api_role_configuration,omitempty"`
EnableTeamSuperuser bool `json:"enable_team_superuser,omitempty"`
EnableAdminRoleForUsers bool `json:"enable_admin_role_for_users,omitempty"`
TeamAdminRole string `json:"team_admin_role,omitempty"`
PamRoleName string `json:"pam_role_name,omitempty"`
PamConfiguration string `json:"pam_configuration,omitempty"`
ProtectedRoles []string `json:"protected_role_names,omitempty"`
PostgresSuperuserTeams []string `json:"postgres_superuser_teams,omitempty"`
EnableTeamsAPI bool `json:"enable_teams_api,omitempty"`
TeamsAPIUrl string `json:"teams_api_url,omitempty"`
TeamAPIRoleConfiguration map[string]string `json:"team_api_role_configuration,omitempty"`
EnableTeamSuperuser bool `json:"enable_team_superuser,omitempty"`
EnableAdminRoleForUsers bool `json:"enable_admin_role_for_users,omitempty"`
TeamAdminRole string `json:"team_admin_role,omitempty"`
PamRoleName string `json:"pam_role_name,omitempty"`
PamConfiguration string `json:"pam_configuration,omitempty"`
ProtectedRoles []string `json:"protected_role_names,omitempty"`
PostgresSuperuserTeams []string `json:"postgres_superuser_teams,omitempty"`
EnablePostgresTeamCRD bool `json:"enable_postgres_team_crd,omitempty"`
EnablePostgresTeamCRDSuperusers bool `json:"enable_postgres_team_crd_superusers,omitempty"`
}
// LoggingRESTAPIConfiguration defines Logging API conf

View File

@ -0,0 +1,33 @@
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// PostgresTeam defines Custom Resource Definition Object for team management.
type PostgresTeam struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec PostgresTeamSpec `json:"spec"`
}
// PostgresTeamSpec defines the specification for the PostgresTeam TPR.
type PostgresTeamSpec struct {
AdditionalSuperuserTeams map[string][]string `json:"additionalSuperuserTeams,omitempty"`
AdditionalTeams map[string][]string `json:"additionalTeams,omitempty"`
AdditionalMembers map[string][]string `json:"additionalMembers,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// PostgresTeamList defines a list of PostgresTeam definitions.
type PostgresTeamList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []PostgresTeam `json:"items"`
}

View File

@ -1,11 +1,10 @@
package v1
import (
acidzalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do"
)
// APIVersion of the `postgresql` and `operator` CRDs
@ -44,6 +43,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
// TODO: User uppercase CRDResourceKind of our types in the next major API version
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresql"), &Postgresql{})
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresqlList"), &PostgresqlList{})
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("PostgresTeam"), &PostgresTeam{})
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("PostgresTeamList"), &PostgresTeamList{})
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("OperatorConfiguration"),
&OperatorConfiguration{})
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("OperatorConfigurationList"),

View File

@ -711,6 +711,127 @@ func (in *PostgresStatus) DeepCopy() *PostgresStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PostgresTeam) DeepCopyInto(out *PostgresTeam) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PostgresTeam.
func (in *PostgresTeam) DeepCopy() *PostgresTeam {
if in == nil {
return nil
}
out := new(PostgresTeam)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *PostgresTeam) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PostgresTeamList) DeepCopyInto(out *PostgresTeamList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]PostgresTeam, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PostgresTeamList.
func (in *PostgresTeamList) DeepCopy() *PostgresTeamList {
if in == nil {
return nil
}
out := new(PostgresTeamList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *PostgresTeamList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PostgresTeamSpec) DeepCopyInto(out *PostgresTeamSpec) {
*out = *in
if in.AdditionalSuperuserTeams != nil {
in, out := &in.AdditionalSuperuserTeams, &out.AdditionalSuperuserTeams
*out = make(map[string][]string, len(*in))
for key, val := range *in {
var outVal []string
if val == nil {
(*out)[key] = nil
} else {
in, out := &val, &outVal
*out = make([]string, len(*in))
copy(*out, *in)
}
(*out)[key] = outVal
}
}
if in.AdditionalTeams != nil {
in, out := &in.AdditionalTeams, &out.AdditionalTeams
*out = make(map[string][]string, len(*in))
for key, val := range *in {
var outVal []string
if val == nil {
(*out)[key] = nil
} else {
in, out := &val, &outVal
*out = make([]string, len(*in))
copy(*out, *in)
}
(*out)[key] = outVal
}
}
if in.AdditionalMembers != nil {
in, out := &in.AdditionalMembers, &out.AdditionalMembers
*out = make(map[string][]string, len(*in))
for key, val := range *in {
var outVal []string
if val == nil {
(*out)[key] = nil
} else {
in, out := &val, &outVal
*out = make([]string, len(*in))
copy(*out, *in)
}
(*out)[key] = outVal
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PostgresTeamSpec.
func (in *PostgresTeamSpec) DeepCopy() *PostgresTeamSpec {
if in == nil {
return nil
}
out := new(PostgresTeamSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PostgresUsersConfiguration) DeepCopyInto(out *PostgresUsersConfiguration) {
*out = *in

View File

@ -13,20 +13,11 @@ import (
"time"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policybeta1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
"github.com/zalando/postgres-operator/pkg/spec"
pgteams "github.com/zalando/postgres-operator/pkg/teams"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
@ -34,7 +25,16 @@ import (
"github.com/zalando/postgres-operator/pkg/util/patroni"
"github.com/zalando/postgres-operator/pkg/util/teams"
"github.com/zalando/postgres-operator/pkg/util/users"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policybeta1 "k8s.io/api/policy/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
)
var (
@ -48,6 +48,7 @@ var (
type Config struct {
OpConfig config.Config
RestConfig *rest.Config
PgTeamMap pgteams.PostgresTeamMap
InfrastructureRoles map[string]spec.PgUser // inherited from the controller
PodServiceAccount *v1.ServiceAccount
PodServiceAccountRoleBinding *rbacv1.RoleBinding
@ -343,11 +344,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
//TODO: improve me
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
match = false
reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one")
reasons = append(reasons, "new statefulset's number of replicas does not match the current one")
}
if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) {
match = false
reasons = append(reasons, "new statefulset's annotations doesn't match the current one")
reasons = append(reasons, "new statefulset's annotations does not match the current one")
}
needsRollUpdate, reasons = c.compareContainers("initContainers", c.Statefulset.Spec.Template.Spec.InitContainers, statefulSet.Spec.Template.Spec.InitContainers, needsRollUpdate, reasons)
@ -364,24 +365,24 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName {
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's serviceAccountName service account name doesn't match the current one")
reasons = append(reasons, "new statefulset's serviceAccountName service account name does not match the current one")
}
if *c.Statefulset.Spec.Template.Spec.TerminationGracePeriodSeconds != *statefulSet.Spec.Template.Spec.TerminationGracePeriodSeconds {
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's terminationGracePeriodSeconds doesn't match the current one")
reasons = append(reasons, "new statefulset's terminationGracePeriodSeconds does not match the current one")
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.Affinity, statefulSet.Spec.Template.Spec.Affinity) {
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod affinity doesn't match the current one")
reasons = append(reasons, "new statefulset's pod affinity does not match the current one")
}
// Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Labels, statefulSet.Spec.Template.Labels) {
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's metadata labels doesn't match the current one")
reasons = append(reasons, "new statefulset's metadata labels does not match the current one")
}
if (c.Statefulset.Spec.Selector != nil) && (statefulSet.Spec.Selector != nil) {
if !reflect.DeepEqual(c.Statefulset.Spec.Selector.MatchLabels, statefulSet.Spec.Selector.MatchLabels) {
@ -392,7 +393,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
return &compareStatefulsetResult{}
}
needsReplace = true
reasons = append(reasons, "new statefulset's selector doesn't match the current one")
reasons = append(reasons, "new statefulset's selector does not match the current one")
}
}
@ -406,7 +407,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
match = false
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod template security context in spec doesn't match the current one")
reasons = append(reasons, "new statefulset's pod template security context in spec does not match the current one")
}
if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) {
needsReplace = true
@ -417,17 +418,17 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
// Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta
if name != statefulSet.Spec.VolumeClaimTemplates[i].Name {
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d doesn't match the current one", i))
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
continue
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q doesn't match the current one", name))
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name))
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %q doesn't match the current one", name))
reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %q does not match the current one", name))
}
}
@ -437,14 +438,14 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
match = false
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod priority class in spec doesn't match the current one")
reasons = append(reasons, "new statefulset's pod priority class in spec does not match the current one")
}
// lazy Spilo update: modify the image in the statefulset itself but let its pods run with the old image
// until they are re-created for other reasons, for example node rotation
if c.OpConfig.EnableLazySpiloUpgrade && !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.Containers[0].Image, statefulSet.Spec.Template.Spec.Containers[0].Image) {
needsReplace = true
reasons = append(reasons, "lazy Spilo update: new statefulset's pod image doesn't match the current one")
reasons = append(reasons, "lazy Spilo update: new statefulset's pod image does not match the current one")
}
if needsRollUpdate || needsReplace {
@ -554,7 +555,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
return fmt.Errorf("could not compare defined CPU limit %s with configured minimum value %s: %v", cpuLimit, minCPULimit, err)
}
if isSmaller {
c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be increased", cpuLimit, minCPULimit)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
spec.Resources.ResourceLimits.CPU = minCPULimit
}
@ -567,7 +568,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
return fmt.Errorf("could not compare defined memory limit %s with configured minimum value %s: %v", memoryLimit, minMemoryLimit, err)
}
if isSmaller {
c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
c.logger.Warningf("defined memory limit %s is below required minimum %s and will be increased", memoryLimit, minMemoryLimit)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
spec.Resources.ResourceLimits.Memory = minMemoryLimit
}
@ -1095,7 +1096,7 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e
if c.shouldAvoidProtectedOrSystemRole(username, "API role") {
continue
}
if c.OpConfig.EnableTeamSuperuser || isPostgresSuperuserTeam {
if (c.OpConfig.EnableTeamSuperuser && teamID == c.Spec.TeamID) || isPostgresSuperuserTeam {
flags = append(flags, constants.RoleFlagSuperuser)
} else {
if c.OpConfig.TeamAdminRole != "" {
@ -1124,17 +1125,38 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e
func (c *Cluster) initHumanUsers() error {
var clusterIsOwnedBySuperuserTeam bool
superuserTeams := []string{}
if c.OpConfig.EnablePostgresTeamCRDSuperusers {
superuserTeams = c.PgTeamMap.GetAdditionalSuperuserTeams(c.Spec.TeamID, true)
}
for _, postgresSuperuserTeam := range c.OpConfig.PostgresSuperuserTeams {
err := c.initTeamMembers(postgresSuperuserTeam, true)
if err != nil {
return fmt.Errorf("Cannot create a team %q of Postgres superusers: %v", postgresSuperuserTeam, err)
if !(util.SliceContains(superuserTeams, postgresSuperuserTeam)) {
superuserTeams = append(superuserTeams, postgresSuperuserTeam)
}
if postgresSuperuserTeam == c.Spec.TeamID {
}
for _, superuserTeam := range superuserTeams {
err := c.initTeamMembers(superuserTeam, true)
if err != nil {
return fmt.Errorf("Cannot initialize members for team %q of Postgres superusers: %v", superuserTeam, err)
}
if superuserTeam == c.Spec.TeamID {
clusterIsOwnedBySuperuserTeam = true
}
}
additionalTeams := c.PgTeamMap.GetAdditionalTeams(c.Spec.TeamID, true)
for _, additionalTeam := range additionalTeams {
if !(util.SliceContains(superuserTeams, additionalTeam)) {
err := c.initTeamMembers(additionalTeam, false)
if err != nil {
return fmt.Errorf("Cannot initialize members for additional team %q for cluster owned by %q: %v", additionalTeam, c.Spec.TeamID, err)
}
}
}
if clusterIsOwnedBySuperuserTeam {
c.logger.Infof("Team %q owning the cluster is also a team of superusers. Created superuser roles for its members instead of admin roles.", c.Spec.TeamID)
return nil
@ -1142,7 +1164,7 @@ func (c *Cluster) initHumanUsers() error {
err := c.initTeamMembers(c.Spec.TeamID, false)
if err != nil {
return fmt.Errorf("Cannot create a team %q of admins owning the PG cluster: %v", c.Spec.TeamID, err)
return fmt.Errorf("Cannot initialize members for team %q who owns the Postgres cluster: %v", c.Spec.TeamID, err)
}
return nil

View File

@ -131,12 +131,12 @@ func (c *Cluster) initDbConnWithName(dbname string) error {
}
if _, ok := err.(*net.OpError); ok {
c.logger.Errorf("could not connect to PostgreSQL database: %v", err)
c.logger.Warningf("could not connect to Postgres database: %v", err)
return false, nil
}
if err2 := conn.Close(); err2 != nil {
c.logger.Errorf("error when closing PostgreSQL connection after another error: %v", err)
c.logger.Errorf("error when closing Postgres connection after another error: %v", err)
return false, err2
}
@ -151,7 +151,7 @@ func (c *Cluster) initDbConnWithName(dbname string) error {
conn.SetMaxIdleConns(-1)
if c.pgDb != nil {
msg := "Closing an existing connection before opening a new one to %s"
msg := "closing an existing connection before opening a new one to %s"
c.logger.Warningf(msg, dbname)
c.closeDbConn()
}
@ -166,7 +166,7 @@ func (c *Cluster) connectionIsClosed() bool {
}
func (c *Cluster) closeDbConn() (err error) {
c.setProcessName("closing db connection")
c.setProcessName("closing database connection")
if c.pgDb != nil {
c.logger.Debug("closing database connection")
if err = c.pgDb.Close(); err != nil {
@ -181,7 +181,7 @@ func (c *Cluster) closeDbConn() (err error) {
}
func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUserMap, err error) {
c.setProcessName("reading users from the db")
c.setProcessName("reading users from the database")
var rows *sql.Rows
users = make(spec.PgUserMap)
if rows, err = c.pgDb.Query(getUserSQL, pq.Array(userNames)); err != nil {
@ -527,7 +527,7 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role Po
continue
}
c.logger.Infof("Install pooler lookup function into %s", dbname)
c.logger.Infof("install pooler lookup function into database '%s'", dbname)
// golang sql will do retries couple of times if pq driver reports
// connections issues (driver.ErrBadConn), but since our query is

View File

@ -1133,7 +1133,9 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
}
// generate the spilo container
c.logger.Debugf("Generating Spilo container, environment variables: %v", spiloEnvVars)
c.logger.Debugf("Generating Spilo container, environment variables")
c.logger.Debugf("%v", spiloEnvVars)
spiloContainer := generateContainer(c.containerName(),
&effectiveDockerImage,
resourceRequirements,
@ -2031,7 +2033,8 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
envVars = append(envVars, v1.EnvVar{Name: "AWS_SECRET_ACCESS_KEY", Value: c.OpConfig.LogicalBackup.LogicalBackupS3SecretAccessKey})
}
c.logger.Debugf("Generated logical backup env vars %v", envVars)
c.logger.Debugf("Generated logical backup env vars")
c.logger.Debugf("%v", envVars)
return envVars
}

View File

@ -304,9 +304,16 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
after this check succeeds but before a pod is re-created
*/
for _, pod := range pods.Items {
c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP)
}
for _, pod := range pods.Items {
state, err := c.patroni.GetPatroniMemberState(&pod)
if err != nil || state == "creating replica" {
if err != nil {
c.logger.Errorf("failed to get Patroni state for pod: %s", err)
return false
} else if state == "creating replica" {
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
return false
}

View File

@ -149,7 +149,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
// setRollingUpdateFlagForStatefulSet sets the indicator or the rolling update requirement
// in the StatefulSet annotation.
func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *appsv1.StatefulSet, val bool) {
func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *appsv1.StatefulSet, val bool, msg string) {
anno := sset.GetAnnotations()
if anno == nil {
anno = make(map[string]string)
@ -157,13 +157,13 @@ func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *appsv1.StatefulSet, v
anno[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val)
sset.SetAnnotations(anno)
c.logger.Debugf("statefulset's rolling update annotation has been set to %t", val)
c.logger.Debugf("set statefulset's rolling update annotation to %t: caller/reason %s", val, msg)
}
// applyRollingUpdateFlagforStatefulSet sets the rolling update flag for the cluster's StatefulSet
// and applies that setting to the actual running cluster.
func (c *Cluster) applyRollingUpdateFlagforStatefulSet(val bool) error {
c.setRollingUpdateFlagForStatefulSet(c.Statefulset, val)
c.setRollingUpdateFlagForStatefulSet(c.Statefulset, val, "applyRollingUpdateFlag")
sset, err := c.updateStatefulSetAnnotations(c.Statefulset.GetAnnotations())
if err != nil {
return err
@ -215,14 +215,13 @@ func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *appsv1.St
podsRollingUpdateRequired = false
} else {
c.logger.Infof("found a statefulset with an unfinished rolling update of the pods")
}
}
return podsRollingUpdateRequired
}
func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) {
c.logger.Debugf("updating statefulset annotations")
c.logger.Debugf("patching statefulset annotations")
patchData, err := metaAnnotationsPatch(annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for the statefulset metadata: %v", err)

View File

@ -348,13 +348,13 @@ func (c *Cluster) syncStatefulSet() error {
if err != nil {
return fmt.Errorf("could not generate statefulset: %v", err)
}
c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired)
c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "from cache")
cmp := c.compareStatefulSetWith(desiredSS)
if !cmp.match {
if cmp.rollingUpdate && !podsRollingUpdateRequired {
podsRollingUpdateRequired = true
c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired)
c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "statefulset changes")
}
c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
@ -497,11 +497,11 @@ func (c *Cluster) syncSecrets() error {
return fmt.Errorf("could not get current secret: %v", err)
}
if secretUsername != string(secret.Data["username"]) {
c.logger.Warningf("secret %q does not contain the role %q", secretSpec.Name, secretUsername)
c.logger.Warningf("secret %s does not contain the role %q", secretSpec.Name, secretUsername)
continue
}
c.Secrets[secret.UID] = secret
c.logger.Debugf("secret %q already exists, fetching its password", util.NameFromMeta(secret.ObjectMeta))
c.logger.Debugf("secret %s already exists, fetching its password", util.NameFromMeta(secret.ObjectMeta))
if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name {
secretUsername = constants.SuperuserKeyName
userMap = c.systemUsers
@ -804,7 +804,7 @@ func (c *Cluster) syncLogicalBackupJob() error {
return fmt.Errorf("could not generate the desired logical backup job state: %v", err)
}
if match, reason := k8sutil.SameLogicalBackupJob(job, desiredJob); !match {
c.logger.Infof("logical job %q is not in the desired state and needs to be updated",
c.logger.Infof("logical job %s is not in the desired state and needs to be updated",
c.getLogicalBackupJobName(),
)
if reason != "" {
@ -825,12 +825,12 @@ func (c *Cluster) syncLogicalBackupJob() error {
c.logger.Info("could not find the cluster's logical backup job")
if err = c.createLogicalBackupJob(); err == nil {
c.logger.Infof("created missing logical backup job %q", jobName)
c.logger.Infof("created missing logical backup job %s", jobName)
} else {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create missing logical backup job: %v", err)
}
c.logger.Infof("logical backup job %q already exists", jobName)
c.logger.Infof("logical backup job %s already exists", jobName)
if _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(context.TODO(), jobName, metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing logical backup job: %v", err)
}

View File

@ -18,12 +18,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"github.com/sirupsen/logrus"
acidzalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/nicediff"
"github.com/zalando/postgres-operator/pkg/util/retryutil"
)
@ -166,40 +168,59 @@ func (c *Cluster) logPDBChanges(old, new *policybeta1.PodDisruptionBudget, isUpd
)
}
c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec))
logNiceDiff(c.logger, old.Spec, new.Spec)
}
func logNiceDiff(log *logrus.Entry, old, new interface{}) {
o, erro := json.MarshalIndent(old, "", " ")
n, errn := json.MarshalIndent(new, "", " ")
if erro != nil || errn != nil {
panic("could not marshal API objects, should not happen")
}
nice := nicediff.Diff(string(o), string(n), true)
for _, s := range strings.Split(nice, "\n") {
// " is not needed in the value to understand
log.Debugf(strings.ReplaceAll(s, "\"", ""))
}
}
func (c *Cluster) logStatefulSetChanges(old, new *appsv1.StatefulSet, isUpdate bool, reasons []string) {
if isUpdate {
c.logger.Infof("statefulset %q has been changed", util.NameFromMeta(old.ObjectMeta))
c.logger.Infof("statefulset %s has been changed", util.NameFromMeta(old.ObjectMeta))
} else {
c.logger.Infof("statefulset %q is not in the desired state and needs to be updated",
c.logger.Infof("statefulset %s is not in the desired state and needs to be updated",
util.NameFromMeta(old.ObjectMeta),
)
}
logNiceDiff(c.logger, old.Spec, new.Spec)
if !reflect.DeepEqual(old.Annotations, new.Annotations) {
c.logger.Debugf("metadata.annotation diff\n%s\n", util.PrettyDiff(old.Annotations, new.Annotations))
c.logger.Debugf("metadata.annotation are different")
logNiceDiff(c.logger, old.Annotations, new.Annotations)
}
c.logger.Debugf("spec diff between old and new statefulsets: \n%s\n", util.PrettyDiff(old.Spec, new.Spec))
if len(reasons) > 0 {
for _, reason := range reasons {
c.logger.Infof("reason: %q", reason)
c.logger.Infof("reason: %s", reason)
}
}
}
func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) {
if isUpdate {
c.logger.Infof("%s service %q has been changed",
c.logger.Infof("%s service %s has been changed",
role, util.NameFromMeta(old.ObjectMeta),
)
} else {
c.logger.Infof("%s service %q is not in the desired state and needs to be updated",
c.logger.Infof("%s service %s is not in the desired state and needs to be updated",
role, util.NameFromMeta(old.ObjectMeta),
)
}
c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec))
logNiceDiff(c.logger, old.Spec, new.Spec)
if reason != "" {
c.logger.Infof("reason: %s", reason)
@ -208,7 +229,7 @@ func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isU
func (c *Cluster) logVolumeChanges(old, new acidv1.Volume) {
c.logger.Infof("volume specification has been changed")
c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old, new))
logNiceDiff(c.logger, old, new)
}
func (c *Cluster) getTeamMembers(teamID string) ([]string, error) {
@ -217,24 +238,37 @@ func (c *Cluster) getTeamMembers(teamID string) ([]string, error) {
return nil, fmt.Errorf("no teamId specified")
}
c.logger.Debugf("fetching possible additional team members for team %q", teamID)
members := []string{}
additionalMembers := c.PgTeamMap[c.Spec.TeamID].AdditionalMembers
for _, member := range additionalMembers {
members = append(members, member)
}
if !c.OpConfig.EnableTeamsAPI {
c.logger.Debugf("team API is disabled, returning empty list of members for team %q", teamID)
return []string{}, nil
c.logger.Debugf("team API is disabled, only returning %d members for team %q", len(members), teamID)
return members, nil
}
token, err := c.oauthTokenGetter.getOAuthToken()
if err != nil {
c.logger.Warnf("could not get oauth token to authenticate to team service API, returning empty list of team members: %v", err)
return []string{}, nil
c.logger.Warnf("could not get oauth token to authenticate to team service API, only returning %d members for team %q: %v", len(members), teamID, err)
return members, nil
}
teamInfo, err := c.teamsAPIClient.TeamInfo(teamID, token)
if err != nil {
c.logger.Warnf("could not get team info for team %q, returning empty list of team members: %v", teamID, err)
return []string{}, nil
c.logger.Warnf("could not get team info for team %q, only returning %d members: %v", teamID, len(members), err)
return members, nil
}
return teamInfo.Members, nil
for _, member := range teamInfo.Members {
if !(util.SliceContains(members, member)) {
members = append(members, member)
}
}
return members, nil
}
func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopChan chan struct{}, role *PostgresRole) (*v1.Pod, error) {

View File

@ -1,9 +1,12 @@
package controller
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"
@ -13,6 +16,7 @@ import (
"github.com/zalando/postgres-operator/pkg/cluster"
acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/teams"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
@ -31,8 +35,9 @@ import (
// Controller represents operator controller
type Controller struct {
config spec.ControllerConfig
opConfig *config.Config
config spec.ControllerConfig
opConfig *config.Config
pgTeamMap teams.PostgresTeamMap
logger *logrus.Entry
KubeClient k8sutil.KubernetesClient
@ -53,10 +58,11 @@ type Controller struct {
clusterHistory map[spec.NamespacedName]ringlog.RingLogger // history of the cluster changes
teamClusters map[string][]spec.NamespacedName
postgresqlInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
nodesInformer cache.SharedIndexInformer
podCh chan cluster.PodEvent
postgresqlInformer cache.SharedIndexInformer
postgresTeamInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
nodesInformer cache.SharedIndexInformer
podCh chan cluster.PodEvent
clusterEventQueues []*cache.FIFO // [workerID]Queue
lastClusterSyncTime int64
@ -73,6 +79,10 @@ func NewController(controllerConfig *spec.ControllerConfig, controllerId string)
logger := logrus.New()
if controllerConfig.EnableJsonLogging {
logger.SetFormatter(&logrus.JSONFormatter{})
} else {
if os.Getenv("LOG_NOQUOTE") != "" {
logger.SetFormatter(&logrus.TextFormatter{PadLevelText: true, DisableQuote: true})
}
}
var myComponentName = "postgres-operator"
@ -81,7 +91,10 @@ func NewController(controllerConfig *spec.ControllerConfig, controllerId string)
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logger.Infof)
// disabling the sending of events also to the logoutput
// the operator currently duplicates a lot of log entries with this setup
// eventBroadcaster.StartLogging(logger.Infof)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: myComponentName})
c := &Controller{
@ -190,10 +203,18 @@ func (c *Controller) warnOnDeprecatedOperatorParameters() {
}
}
func compactValue(v string) string {
var compact bytes.Buffer
if err := json.Compact(&compact, []byte(v)); err != nil {
panic("Hard coded json strings broken!")
}
return compact.String()
}
func (c *Controller) initPodServiceAccount() {
if c.opConfig.PodServiceAccountDefinition == "" {
c.opConfig.PodServiceAccountDefinition = `
stringValue := `
{
"apiVersion": "v1",
"kind": "ServiceAccount",
@ -201,6 +222,9 @@ func (c *Controller) initPodServiceAccount() {
"name": "postgres-pod"
}
}`
c.opConfig.PodServiceAccountDefinition = compactValue(stringValue)
}
// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
@ -230,7 +254,7 @@ func (c *Controller) initRoleBinding() {
// operator binds it to the cluster role with sufficient privileges
// we assume the role is created by the k8s administrator
if c.opConfig.PodServiceAccountRoleBindingDefinition == "" {
c.opConfig.PodServiceAccountRoleBindingDefinition = fmt.Sprintf(`
stringValue := fmt.Sprintf(`
{
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "RoleBinding",
@ -249,6 +273,7 @@ func (c *Controller) initRoleBinding() {
}
]
}`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name)
c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue)
}
c.logger.Info("Parse role bindings")
// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
@ -267,7 +292,14 @@ func (c *Controller) initRoleBinding() {
}
// actual roles bindings are deployed at the time of Postgres/Spilo cluster creation
// actual roles bindings ar*logrus.Entrye deployed at the time of Postgres/Spilo cluster creation
}
func logMultiLineConfig(log *logrus.Entry, config string) {
lines := strings.Split(config, "\n")
for _, l := range lines {
log.Infof("%s", l)
}
}
func (c *Controller) initController() {
@ -297,11 +329,17 @@ func (c *Controller) initController() {
c.initSharedInformers()
if c.opConfig.EnablePostgresTeamCRD {
c.loadPostgresTeams()
} else {
c.pgTeamMap = teams.PostgresTeamMap{}
}
if c.opConfig.DebugLogging {
c.logger.Logger.Level = logrus.DebugLevel
}
c.logger.Infof("config: %s", c.opConfig.MustMarshal())
logMultiLineConfig(c.logger, c.opConfig.MustMarshal())
roleDefs := c.getInfrastructureRoleDefinitions()
if infraRoles, err := c.getInfrastructureRoles(roleDefs); err != nil {
@ -328,6 +366,7 @@ func (c *Controller) initController() {
func (c *Controller) initSharedInformers() {
// Postgresqls
c.postgresqlInformer = acidv1informer.NewPostgresqlInformer(
c.KubeClient.AcidV1ClientSet,
c.opConfig.WatchedNamespace,
@ -340,6 +379,20 @@ func (c *Controller) initSharedInformers() {
DeleteFunc: c.postgresqlDelete,
})
// PostgresTeams
if c.opConfig.EnablePostgresTeamCRD {
c.postgresTeamInformer = acidv1informer.NewPostgresTeamInformer(
c.KubeClient.AcidV1ClientSet,
c.opConfig.WatchedNamespace,
constants.QueueResyncPeriodTPR*6, // 30 min
cache.Indexers{})
c.postgresTeamInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.postgresTeamAdd,
UpdateFunc: c.postgresTeamUpdate,
})
}
// Pods
podLw := &cache.ListWatch{
ListFunc: c.podListFunc,
@ -400,6 +453,10 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
go c.apiserver.Run(stopCh, wg)
go c.kubeNodesInformer(stopCh, wg)
if c.opConfig.EnablePostgresTeamCRD {
go c.runPostgresTeamInformer(stopCh, wg)
}
c.logger.Info("started working in background")
}
@ -415,6 +472,12 @@ func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.Wait
c.postgresqlInformer.Run(stopCh)
}
func (c *Controller) runPostgresTeamInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
c.postgresTeamInformer.Run(stopCh)
}
func queueClusterKey(eventType EventType, uid types.UID) string {
return fmt.Sprintf("%s-%s", eventType, uid)
}

View File

@ -42,7 +42,7 @@ func (c *Controller) nodeAdd(obj interface{}) {
return
}
c.logger.Debugf("new node has been added: %q (%s)", util.NameFromMeta(node.ObjectMeta), node.Spec.ProviderID)
c.logger.Debugf("new node has been added: %s (%s)", util.NameFromMeta(node.ObjectMeta), node.Spec.ProviderID)
// check if the node became not ready while the operator was down (otherwise we would have caught it in nodeUpdate)
if !c.nodeIsReady(node) {
@ -76,7 +76,7 @@ func (c *Controller) nodeUpdate(prev, cur interface{}) {
}
func (c *Controller) nodeIsReady(node *v1.Node) bool {
return (!node.Spec.Unschedulable || util.MapContains(node.Labels, c.opConfig.NodeReadinessLabel) ||
return (!node.Spec.Unschedulable || (len(c.opConfig.NodeReadinessLabel) > 0 && util.MapContains(node.Labels, c.opConfig.NodeReadinessLabel)) ||
util.MapContains(node.Labels, map[string]string{"master": "true"}))
}

View File

@ -15,7 +15,6 @@ const (
func newNodeTestController() *Controller {
var controller = NewController(&spec.ControllerConfig{}, "node-test")
controller.opConfig.NodeReadinessLabel = map[string]string{readyLabel: readyValue}
return controller
}
@ -36,27 +35,58 @@ var nodeTestController = newNodeTestController()
func TestNodeIsReady(t *testing.T) {
testName := "TestNodeIsReady"
var testTable = []struct {
in *v1.Node
out bool
in *v1.Node
out bool
readinessLabel map[string]string
}{
{
in: makeNode(map[string]string{"foo": "bar"}, true),
out: true,
in: makeNode(map[string]string{"foo": "bar"}, true),
out: true,
readinessLabel: map[string]string{readyLabel: readyValue},
},
{
in: makeNode(map[string]string{"foo": "bar"}, false),
out: false,
in: makeNode(map[string]string{"foo": "bar"}, false),
out: false,
readinessLabel: map[string]string{readyLabel: readyValue},
},
{
in: makeNode(map[string]string{readyLabel: readyValue}, false),
out: true,
in: makeNode(map[string]string{readyLabel: readyValue}, false),
out: true,
readinessLabel: map[string]string{readyLabel: readyValue},
},
{
in: makeNode(map[string]string{"foo": "bar", "master": "true"}, false),
out: true,
in: makeNode(map[string]string{"foo": "bar", "master": "true"}, false),
out: true,
readinessLabel: map[string]string{readyLabel: readyValue},
},
{
in: makeNode(map[string]string{"foo": "bar", "master": "true"}, false),
out: true,
readinessLabel: map[string]string{readyLabel: readyValue},
},
{
in: makeNode(map[string]string{"foo": "bar"}, true),
out: true,
readinessLabel: map[string]string{},
},
{
in: makeNode(map[string]string{"foo": "bar"}, false),
out: false,
readinessLabel: map[string]string{},
},
{
in: makeNode(map[string]string{readyLabel: readyValue}, false),
out: false,
readinessLabel: map[string]string{},
},
{
in: makeNode(map[string]string{"foo": "bar", "master": "true"}, false),
out: true,
readinessLabel: map[string]string{},
},
}
for _, tt := range testTable {
nodeTestController.opConfig.NodeReadinessLabel = tt.readinessLabel
if isReady := nodeTestController.nodeIsReady(tt.in); isReady != tt.out {
t.Errorf("%s: expected response %t doesn't match the actual %t for the node %#v",
testName, tt.out, isReady, tt.in)

View File

@ -163,6 +163,8 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
result.PamConfiguration = util.Coalesce(fromCRD.TeamsAPI.PamConfiguration, "https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees")
result.ProtectedRoles = util.CoalesceStrArr(fromCRD.TeamsAPI.ProtectedRoles, []string{"admin"})
result.PostgresSuperuserTeams = fromCRD.TeamsAPI.PostgresSuperuserTeams
result.EnablePostgresTeamCRD = fromCRD.TeamsAPI.EnablePostgresTeamCRD
result.EnablePostgresTeamCRDSuperusers = fromCRD.TeamsAPI.EnablePostgresTeamCRDSuperusers
// logging REST API config
result.APIPort = util.CoalesceInt(fromCRD.LoggingRESTAPI.APIPort, 8080)

View File

@ -225,11 +225,11 @@ func (c *Controller) processEvent(event ClusterEvent) {
switch event.EventType {
case EventAdd:
if clusterFound {
lg.Debugf("cluster already exists")
lg.Infof("Recieved add event for already existing Postgres cluster")
return
}
lg.Infof("creation of the cluster started")
lg.Infof("creating a new Postgres cluster")
cl = c.addCluster(lg, clusterName, event.NewSpec)

View File

@ -15,6 +15,7 @@ import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/cluster"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/teams"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
@ -30,6 +31,7 @@ func (c *Controller) makeClusterConfig() cluster.Config {
return cluster.Config{
RestConfig: c.config.RestConfig,
OpConfig: config.Copy(c.opConfig),
PgTeamMap: c.pgTeamMap,
InfrastructureRoles: infrastructureRoles,
PodServiceAccount: c.PodServiceAccount,
}
@ -394,6 +396,37 @@ func (c *Controller) getInfrastructureRole(
return roles, nil
}
func (c *Controller) loadPostgresTeams() {
// reset team map
c.pgTeamMap = teams.PostgresTeamMap{}
pgTeams, err := c.KubeClient.AcidV1ClientSet.AcidV1().PostgresTeams(c.opConfig.WatchedNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
c.logger.Errorf("could not list postgres team objects: %v", err)
}
c.pgTeamMap.Load(pgTeams)
c.logger.Debugf("Internal Postgres Team Cache: %#v", c.pgTeamMap)
}
func (c *Controller) postgresTeamAdd(obj interface{}) {
pgTeam, ok := obj.(*acidv1.PostgresTeam)
if !ok {
c.logger.Errorf("could not cast to PostgresTeam spec")
}
c.logger.Debugf("PostgreTeam %q added. Reloading postgres team CRDs and overwriting cached map", pgTeam.Name)
c.loadPostgresTeams()
}
func (c *Controller) postgresTeamUpdate(prev, obj interface{}) {
pgTeam, ok := obj.(*acidv1.PostgresTeam)
if !ok {
c.logger.Errorf("could not cast to PostgresTeam spec")
}
c.logger.Debugf("PostgreTeam %q updated. Reloading postgres team CRDs and overwriting cached map", pgTeam.Name)
c.loadPostgresTeams()
}
func (c *Controller) podClusterName(pod *v1.Pod) spec.NamespacedName {
if name, ok := pod.Labels[c.opConfig.ClusterNameLabel]; ok {
return spec.NamespacedName{

View File

@ -33,6 +33,7 @@ import (
type AcidV1Interface interface {
RESTClient() rest.Interface
OperatorConfigurationsGetter
PostgresTeamsGetter
PostgresqlsGetter
}
@ -45,6 +46,10 @@ func (c *AcidV1Client) OperatorConfigurations(namespace string) OperatorConfigur
return newOperatorConfigurations(c, namespace)
}
func (c *AcidV1Client) PostgresTeams(namespace string) PostgresTeamInterface {
return newPostgresTeams(c, namespace)
}
func (c *AcidV1Client) Postgresqls(namespace string) PostgresqlInterface {
return newPostgresqls(c, namespace)
}

View File

@ -38,6 +38,10 @@ func (c *FakeAcidV1) OperatorConfigurations(namespace string) v1.OperatorConfigu
return &FakeOperatorConfigurations{c, namespace}
}
func (c *FakeAcidV1) PostgresTeams(namespace string) v1.PostgresTeamInterface {
return &FakePostgresTeams{c, namespace}
}
func (c *FakeAcidV1) Postgresqls(namespace string) v1.PostgresqlInterface {
return &FakePostgresqls{c, namespace}
}

View File

@ -0,0 +1,136 @@
/*
Copyright 2020 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
"context"
acidzalandov1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
schema "k8s.io/apimachinery/pkg/runtime/schema"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
testing "k8s.io/client-go/testing"
)
// FakePostgresTeams implements PostgresTeamInterface
type FakePostgresTeams struct {
Fake *FakeAcidV1
ns string
}
var postgresteamsResource = schema.GroupVersionResource{Group: "acid.zalan.do", Version: "v1", Resource: "postgresteams"}
var postgresteamsKind = schema.GroupVersionKind{Group: "acid.zalan.do", Version: "v1", Kind: "PostgresTeam"}
// Get takes name of the postgresTeam, and returns the corresponding postgresTeam object, and an error if there is any.
func (c *FakePostgresTeams) Get(ctx context.Context, name string, options v1.GetOptions) (result *acidzalandov1.PostgresTeam, err error) {
obj, err := c.Fake.
Invokes(testing.NewGetAction(postgresteamsResource, c.ns, name), &acidzalandov1.PostgresTeam{})
if obj == nil {
return nil, err
}
return obj.(*acidzalandov1.PostgresTeam), err
}
// List takes label and field selectors, and returns the list of PostgresTeams that match those selectors.
func (c *FakePostgresTeams) List(ctx context.Context, opts v1.ListOptions) (result *acidzalandov1.PostgresTeamList, err error) {
obj, err := c.Fake.
Invokes(testing.NewListAction(postgresteamsResource, postgresteamsKind, c.ns, opts), &acidzalandov1.PostgresTeamList{})
if obj == nil {
return nil, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &acidzalandov1.PostgresTeamList{ListMeta: obj.(*acidzalandov1.PostgresTeamList).ListMeta}
for _, item := range obj.(*acidzalandov1.PostgresTeamList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
// Watch returns a watch.Interface that watches the requested postgresTeams.
func (c *FakePostgresTeams) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return c.Fake.
InvokesWatch(testing.NewWatchAction(postgresteamsResource, c.ns, opts))
}
// Create takes the representation of a postgresTeam and creates it. Returns the server's representation of the postgresTeam, and an error, if there is any.
func (c *FakePostgresTeams) Create(ctx context.Context, postgresTeam *acidzalandov1.PostgresTeam, opts v1.CreateOptions) (result *acidzalandov1.PostgresTeam, err error) {
obj, err := c.Fake.
Invokes(testing.NewCreateAction(postgresteamsResource, c.ns, postgresTeam), &acidzalandov1.PostgresTeam{})
if obj == nil {
return nil, err
}
return obj.(*acidzalandov1.PostgresTeam), err
}
// Update takes the representation of a postgresTeam and updates it. Returns the server's representation of the postgresTeam, and an error, if there is any.
func (c *FakePostgresTeams) Update(ctx context.Context, postgresTeam *acidzalandov1.PostgresTeam, opts v1.UpdateOptions) (result *acidzalandov1.PostgresTeam, err error) {
obj, err := c.Fake.
Invokes(testing.NewUpdateAction(postgresteamsResource, c.ns, postgresTeam), &acidzalandov1.PostgresTeam{})
if obj == nil {
return nil, err
}
return obj.(*acidzalandov1.PostgresTeam), err
}
// Delete takes name of the postgresTeam and deletes it. Returns an error if one occurs.
func (c *FakePostgresTeams) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error {
_, err := c.Fake.
Invokes(testing.NewDeleteAction(postgresteamsResource, c.ns, name), &acidzalandov1.PostgresTeam{})
return err
}
// DeleteCollection deletes a collection of objects.
func (c *FakePostgresTeams) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error {
action := testing.NewDeleteCollectionAction(postgresteamsResource, c.ns, listOpts)
_, err := c.Fake.Invokes(action, &acidzalandov1.PostgresTeamList{})
return err
}
// Patch applies the patch and returns the patched postgresTeam.
func (c *FakePostgresTeams) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *acidzalandov1.PostgresTeam, err error) {
obj, err := c.Fake.
Invokes(testing.NewPatchSubresourceAction(postgresteamsResource, c.ns, name, pt, data, subresources...), &acidzalandov1.PostgresTeam{})
if obj == nil {
return nil, err
}
return obj.(*acidzalandov1.PostgresTeam), err
}

View File

@ -26,4 +26,6 @@ package v1
type OperatorConfigurationExpansion interface{}
type PostgresTeamExpansion interface{}
type PostgresqlExpansion interface{}

View File

@ -0,0 +1,184 @@
/*
Copyright 2020 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
// Code generated by client-gen. DO NOT EDIT.
package v1
import (
"context"
"time"
v1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
scheme "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
rest "k8s.io/client-go/rest"
)
// PostgresTeamsGetter has a method to return a PostgresTeamInterface.
// A group's client should implement this interface.
type PostgresTeamsGetter interface {
PostgresTeams(namespace string) PostgresTeamInterface
}
// PostgresTeamInterface has methods to work with PostgresTeam resources.
type PostgresTeamInterface interface {
Create(ctx context.Context, postgresTeam *v1.PostgresTeam, opts metav1.CreateOptions) (*v1.PostgresTeam, error)
Update(ctx context.Context, postgresTeam *v1.PostgresTeam, opts metav1.UpdateOptions) (*v1.PostgresTeam, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.PostgresTeam, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.PostgresTeamList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.PostgresTeam, err error)
PostgresTeamExpansion
}
// postgresTeams implements PostgresTeamInterface
type postgresTeams struct {
client rest.Interface
ns string
}
// newPostgresTeams returns a PostgresTeams
func newPostgresTeams(c *AcidV1Client, namespace string) *postgresTeams {
return &postgresTeams{
client: c.RESTClient(),
ns: namespace,
}
}
// Get takes name of the postgresTeam, and returns the corresponding postgresTeam object, and an error if there is any.
func (c *postgresTeams) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.PostgresTeam, err error) {
result = &v1.PostgresTeam{}
err = c.client.Get().
Namespace(c.ns).
Resource("postgresteams").
Name(name).
VersionedParams(&options, scheme.ParameterCodec).
Do(ctx).
Into(result)
return
}
// List takes label and field selectors, and returns the list of PostgresTeams that match those selectors.
func (c *postgresTeams) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PostgresTeamList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1.PostgresTeamList{}
err = c.client.Get().
Namespace(c.ns).
Resource("postgresteams").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
// Watch returns a watch.Interface that watches the requested postgresTeams.
func (c *postgresTeams) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("postgresteams").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch(ctx)
}
// Create takes the representation of a postgresTeam and creates it. Returns the server's representation of the postgresTeam, and an error, if there is any.
func (c *postgresTeams) Create(ctx context.Context, postgresTeam *v1.PostgresTeam, opts metav1.CreateOptions) (result *v1.PostgresTeam, err error) {
result = &v1.PostgresTeam{}
err = c.client.Post().
Namespace(c.ns).
Resource("postgresteams").
VersionedParams(&opts, scheme.ParameterCodec).
Body(postgresTeam).
Do(ctx).
Into(result)
return
}
// Update takes the representation of a postgresTeam and updates it. Returns the server's representation of the postgresTeam, and an error, if there is any.
func (c *postgresTeams) Update(ctx context.Context, postgresTeam *v1.PostgresTeam, opts metav1.UpdateOptions) (result *v1.PostgresTeam, err error) {
result = &v1.PostgresTeam{}
err = c.client.Put().
Namespace(c.ns).
Resource("postgresteams").
Name(postgresTeam.Name).
VersionedParams(&opts, scheme.ParameterCodec).
Body(postgresTeam).
Do(ctx).
Into(result)
return
}
// Delete takes name of the postgresTeam and deletes it. Returns an error if one occurs.
func (c *postgresTeams) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
return c.client.Delete().
Namespace(c.ns).
Resource("postgresteams").
Name(name).
Body(&opts).
Do(ctx).
Error()
}
// DeleteCollection deletes a collection of objects.
func (c *postgresTeams) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error {
var timeout time.Duration
if listOpts.TimeoutSeconds != nil {
timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second
}
return c.client.Delete().
Namespace(c.ns).
Resource("postgresteams").
VersionedParams(&listOpts, scheme.ParameterCodec).
Timeout(timeout).
Body(&opts).
Do(ctx).
Error()
}
// Patch applies the patch and returns the patched postgresTeam.
func (c *postgresTeams) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.PostgresTeam, err error) {
result = &v1.PostgresTeam{}
err = c.client.Patch(pt).
Namespace(c.ns).
Resource("postgresteams").
Name(name).
SubResource(subresources...).
VersionedParams(&opts, scheme.ParameterCodec).
Body(data).
Do(ctx).
Into(result)
return
}

View File

@ -30,6 +30,8 @@ import (
// Interface provides access to all the informers in this group version.
type Interface interface {
// PostgresTeams returns a PostgresTeamInformer.
PostgresTeams() PostgresTeamInformer
// Postgresqls returns a PostgresqlInformer.
Postgresqls() PostgresqlInformer
}
@ -45,6 +47,11 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// PostgresTeams returns a PostgresTeamInformer.
func (v *version) PostgresTeams() PostgresTeamInformer {
return &postgresTeamInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// Postgresqls returns a PostgresqlInformer.
func (v *version) Postgresqls() PostgresqlInformer {
return &postgresqlInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}

View File

@ -0,0 +1,96 @@
/*
Copyright 2020 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
// Code generated by informer-gen. DO NOT EDIT.
package v1
import (
"context"
time "time"
acidzalandov1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
versioned "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned"
internalinterfaces "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/internalinterfaces"
v1 "github.com/zalando/postgres-operator/pkg/generated/listers/acid.zalan.do/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
watch "k8s.io/apimachinery/pkg/watch"
cache "k8s.io/client-go/tools/cache"
)
// PostgresTeamInformer provides access to a shared informer and lister for
// PostgresTeams.
type PostgresTeamInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PostgresTeamLister
}
type postgresTeamInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
// NewPostgresTeamInformer constructs a new informer for PostgresTeam type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewPostgresTeamInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredPostgresTeamInformer(client, namespace, resyncPeriod, indexers, nil)
}
// NewFilteredPostgresTeamInformer constructs a new informer for PostgresTeam type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPostgresTeamInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AcidV1().PostgresTeams(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AcidV1().PostgresTeams(namespace).Watch(context.TODO(), options)
},
},
&acidzalandov1.PostgresTeam{},
resyncPeriod,
indexers,
)
}
func (f *postgresTeamInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPostgresTeamInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *postgresTeamInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&acidzalandov1.PostgresTeam{}, f.defaultInformer)
}
func (f *postgresTeamInformer) Lister() v1.PostgresTeamLister {
return v1.NewPostgresTeamLister(f.Informer().GetIndexer())
}

View File

@ -59,6 +59,8 @@ func (f *genericInformer) Lister() cache.GenericLister {
func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) {
switch resource {
// Group=acid.zalan.do, Version=v1
case v1.SchemeGroupVersion.WithResource("postgresteams"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Acid().V1().PostgresTeams().Informer()}, nil
case v1.SchemeGroupVersion.WithResource("postgresqls"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Acid().V1().Postgresqls().Informer()}, nil

View File

@ -24,6 +24,14 @@ SOFTWARE.
package v1
// PostgresTeamListerExpansion allows custom methods to be added to
// PostgresTeamLister.
type PostgresTeamListerExpansion interface{}
// PostgresTeamNamespaceListerExpansion allows custom methods to be added to
// PostgresTeamNamespaceLister.
type PostgresTeamNamespaceListerExpansion interface{}
// PostgresqlListerExpansion allows custom methods to be added to
// PostgresqlLister.
type PostgresqlListerExpansion interface{}

View File

@ -0,0 +1,105 @@
/*
Copyright 2020 Compose, Zalando SE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
// Code generated by lister-gen. DO NOT EDIT.
package v1
import (
v1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
)
// PostgresTeamLister helps list PostgresTeams.
// All objects returned here must be treated as read-only.
type PostgresTeamLister interface {
// List lists all PostgresTeams in the indexer.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*v1.PostgresTeam, err error)
// PostgresTeams returns an object that can list and get PostgresTeams.
PostgresTeams(namespace string) PostgresTeamNamespaceLister
PostgresTeamListerExpansion
}
// postgresTeamLister implements the PostgresTeamLister interface.
type postgresTeamLister struct {
indexer cache.Indexer
}
// NewPostgresTeamLister returns a new PostgresTeamLister.
func NewPostgresTeamLister(indexer cache.Indexer) PostgresTeamLister {
return &postgresTeamLister{indexer: indexer}
}
// List lists all PostgresTeams in the indexer.
func (s *postgresTeamLister) List(selector labels.Selector) (ret []*v1.PostgresTeam, err error) {
err = cache.ListAll(s.indexer, selector, func(m interface{}) {
ret = append(ret, m.(*v1.PostgresTeam))
})
return ret, err
}
// PostgresTeams returns an object that can list and get PostgresTeams.
func (s *postgresTeamLister) PostgresTeams(namespace string) PostgresTeamNamespaceLister {
return postgresTeamNamespaceLister{indexer: s.indexer, namespace: namespace}
}
// PostgresTeamNamespaceLister helps list and get PostgresTeams.
// All objects returned here must be treated as read-only.
type PostgresTeamNamespaceLister interface {
// List lists all PostgresTeams in the indexer for a given namespace.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*v1.PostgresTeam, err error)
// Get retrieves the PostgresTeam from the indexer for a given namespace and name.
// Objects returned here must be treated as read-only.
Get(name string) (*v1.PostgresTeam, error)
PostgresTeamNamespaceListerExpansion
}
// postgresTeamNamespaceLister implements the PostgresTeamNamespaceLister
// interface.
type postgresTeamNamespaceLister struct {
indexer cache.Indexer
namespace string
}
// List lists all PostgresTeams in the indexer for a given namespace.
func (s postgresTeamNamespaceLister) List(selector labels.Selector) (ret []*v1.PostgresTeam, err error) {
err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*v1.PostgresTeam))
})
return ret, err
}
// Get retrieves the PostgresTeam from the indexer for a given namespace and name.
func (s postgresTeamNamespaceLister) Get(name string) (*v1.PostgresTeam, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(v1.Resource("postgresteam"), name)
}
return obj.(*v1.PostgresTeam), nil
}

118
pkg/teams/postgres_team.go Normal file
View File

@ -0,0 +1,118 @@
package teams
import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/util"
)
// PostgresTeamMap is the operator's internal representation of all PostgresTeam CRDs
type PostgresTeamMap map[string]postgresTeamMembership
type postgresTeamMembership struct {
AdditionalSuperuserTeams []string
AdditionalTeams []string
AdditionalMembers []string
}
type teamHashSet map[string]map[string]struct{}
func (ths *teamHashSet) has(team string) bool {
_, ok := (*ths)[team]
return ok
}
func (ths *teamHashSet) add(newTeam string, newSet []string) {
set := make(map[string]struct{})
if ths.has(newTeam) {
set = (*ths)[newTeam]
}
for _, t := range newSet {
set[t] = struct{}{}
}
(*ths)[newTeam] = set
}
func (ths *teamHashSet) toMap() map[string][]string {
newTeamMap := make(map[string][]string)
for team, items := range *ths {
list := []string{}
for item := range items {
list = append(list, item)
}
newTeamMap[team] = list
}
return newTeamMap
}
func (ths *teamHashSet) mergeCrdMap(crdTeamMap map[string][]string) {
for t, at := range crdTeamMap {
ths.add(t, at)
}
}
func fetchTeams(teamset *map[string]struct{}, set teamHashSet) {
for key := range set {
(*teamset)[key] = struct{}{}
}
}
func (ptm *PostgresTeamMap) fetchAdditionalTeams(team string, superuserTeams bool, transitive bool, exclude []string) []string {
var teams []string
if superuserTeams {
teams = (*ptm)[team].AdditionalSuperuserTeams
} else {
teams = (*ptm)[team].AdditionalTeams
}
if transitive {
exclude = append(exclude, team)
for _, additionalTeam := range teams {
if !(util.SliceContains(exclude, additionalTeam)) {
transitiveTeams := (*ptm).fetchAdditionalTeams(additionalTeam, superuserTeams, transitive, exclude)
for _, transitiveTeam := range transitiveTeams {
if !(util.SliceContains(exclude, transitiveTeam)) && !(util.SliceContains(teams, transitiveTeam)) {
teams = append(teams, transitiveTeam)
}
}
}
}
}
return teams
}
// GetAdditionalTeams function to retrieve list of additional teams
func (ptm *PostgresTeamMap) GetAdditionalTeams(team string, transitive bool) []string {
return ptm.fetchAdditionalTeams(team, false, transitive, []string{})
}
// GetAdditionalSuperuserTeams function to retrieve list of additional superuser teams
func (ptm *PostgresTeamMap) GetAdditionalSuperuserTeams(team string, transitive bool) []string {
return ptm.fetchAdditionalTeams(team, true, transitive, []string{})
}
// Load function to import data from PostgresTeam CRD
func (ptm *PostgresTeamMap) Load(pgTeams *acidv1.PostgresTeamList) {
superuserTeamSet := teamHashSet{}
teamSet := teamHashSet{}
teamMemberSet := teamHashSet{}
teamIDs := make(map[string]struct{})
for _, pgTeam := range pgTeams.Items {
superuserTeamSet.mergeCrdMap(pgTeam.Spec.AdditionalSuperuserTeams)
teamSet.mergeCrdMap(pgTeam.Spec.AdditionalTeams)
teamMemberSet.mergeCrdMap(pgTeam.Spec.AdditionalMembers)
}
fetchTeams(&teamIDs, superuserTeamSet)
fetchTeams(&teamIDs, teamSet)
fetchTeams(&teamIDs, teamMemberSet)
for teamID := range teamIDs {
(*ptm)[teamID] = postgresTeamMembership{
AdditionalSuperuserTeams: util.CoalesceStrArr(superuserTeamSet.toMap()[teamID], []string{}),
AdditionalTeams: util.CoalesceStrArr(teamSet.toMap()[teamID], []string{}),
AdditionalMembers: util.CoalesceStrArr(teamMemberSet.toMap()[teamID], []string{}),
}
}
}

View File

@ -0,0 +1,194 @@
package teams
import (
"testing"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/util"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var (
True = true
False = false
pgTeamList = acidv1.PostgresTeamList{
TypeMeta: metav1.TypeMeta{
Kind: "List",
APIVersion: "v1",
},
Items: []acidv1.PostgresTeam{
{
TypeMeta: metav1.TypeMeta{
Kind: "PostgresTeam",
APIVersion: "acid.zalan.do/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "teamAB",
},
Spec: acidv1.PostgresTeamSpec{
AdditionalSuperuserTeams: map[string][]string{"teamA": []string{"teamB", "team24x7"}, "teamB": []string{"teamA", "teamC", "team24x7"}},
AdditionalTeams: map[string][]string{"teamA": []string{"teamC"}, "teamB": []string{}},
AdditionalMembers: map[string][]string{"team24x7": []string{"optimusprime"}, "teamB": []string{"drno"}},
},
}, {
TypeMeta: metav1.TypeMeta{
Kind: "PostgresTeam",
APIVersion: "acid.zalan.do/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "teamC",
},
Spec: acidv1.PostgresTeamSpec{
AdditionalSuperuserTeams: map[string][]string{"teamC": []string{"team24x7"}},
AdditionalTeams: map[string][]string{"teamA": []string{"teamC"}, "teamC": []string{"teamA", "teamB", "acid"}},
AdditionalMembers: map[string][]string{"acid": []string{"batman"}},
},
},
},
}
)
// PostgresTeamMap is the operator's internal representation of all PostgresTeam CRDs
func TestLoadingPostgresTeamCRD(t *testing.T) {
tests := []struct {
name string
crd acidv1.PostgresTeamList
ptm PostgresTeamMap
error string
}{
{
"Check that CRD is imported correctly into the internal format",
pgTeamList,
PostgresTeamMap{
"teamA": {
AdditionalSuperuserTeams: []string{"teamB", "team24x7"},
AdditionalTeams: []string{"teamC"},
AdditionalMembers: []string{},
},
"teamB": {
AdditionalSuperuserTeams: []string{"teamA", "teamC", "team24x7"},
AdditionalTeams: []string{},
AdditionalMembers: []string{"drno"},
},
"teamC": {
AdditionalSuperuserTeams: []string{"team24x7"},
AdditionalTeams: []string{"teamA", "teamB", "acid"},
AdditionalMembers: []string{},
},
"team24x7": {
AdditionalSuperuserTeams: []string{},
AdditionalTeams: []string{},
AdditionalMembers: []string{"optimusprime"},
},
"acid": {
AdditionalSuperuserTeams: []string{},
AdditionalTeams: []string{},
AdditionalMembers: []string{"batman"},
},
},
"Mismatch between PostgresTeam CRD and internal map",
},
}
for _, tt := range tests {
postgresTeamMap := PostgresTeamMap{}
postgresTeamMap.Load(&tt.crd)
for team, ptmeamMembership := range postgresTeamMap {
if !util.IsEqualIgnoreOrder(ptmeamMembership.AdditionalSuperuserTeams, tt.ptm[team].AdditionalSuperuserTeams) {
t.Errorf("%s: %v: expected additional members %#v, got %#v", tt.name, tt.error, tt.ptm, postgresTeamMap)
}
if !util.IsEqualIgnoreOrder(ptmeamMembership.AdditionalTeams, tt.ptm[team].AdditionalTeams) {
t.Errorf("%s: %v: expected additional teams %#v, got %#v", tt.name, tt.error, tt.ptm, postgresTeamMap)
}
if !util.IsEqualIgnoreOrder(ptmeamMembership.AdditionalMembers, tt.ptm[team].AdditionalMembers) {
t.Errorf("%s: %v: expected additional superuser teams %#v, got %#v", tt.name, tt.error, tt.ptm, postgresTeamMap)
}
}
}
}
// TestGetAdditionalTeams if returns teams with and without transitive dependencies
func TestGetAdditionalTeams(t *testing.T) {
tests := []struct {
name string
team string
transitive bool
teams []string
error string
}{
{
"Check that additional teams are returned",
"teamA",
false,
[]string{"teamC"},
"GetAdditionalTeams returns wrong list",
},
{
"Check that additional teams are returned incl. transitive teams",
"teamA",
true,
[]string{"teamC", "teamB", "acid"},
"GetAdditionalTeams returns wrong list",
},
{
"Check that empty list is returned",
"teamB",
false,
[]string{},
"GetAdditionalTeams returns wrong list",
},
}
postgresTeamMap := PostgresTeamMap{}
postgresTeamMap.Load(&pgTeamList)
for _, tt := range tests {
additionalTeams := postgresTeamMap.GetAdditionalTeams(tt.team, tt.transitive)
if !util.IsEqualIgnoreOrder(additionalTeams, tt.teams) {
t.Errorf("%s: %v: expected additional teams %#v, got %#v", tt.name, tt.error, tt.teams, additionalTeams)
}
}
}
// TestGetAdditionalSuperuserTeams if returns teams with and without transitive dependencies
func TestGetAdditionalSuperuserTeams(t *testing.T) {
tests := []struct {
name string
team string
transitive bool
teams []string
error string
}{
{
"Check that additional superuser teams are returned",
"teamA",
false,
[]string{"teamB", "team24x7"},
"GetAdditionalSuperuserTeams returns wrong list",
},
{
"Check that additional superuser teams are returned incl. transitive superuser teams",
"teamA",
true,
[]string{"teamB", "teamC", "team24x7"},
"GetAdditionalSuperuserTeams returns wrong list",
},
{
"Check that empty list is returned",
"team24x7",
false,
[]string{},
"GetAdditionalSuperuserTeams returns wrong list",
},
}
postgresTeamMap := PostgresTeamMap{}
postgresTeamMap.Load(&pgTeamList)
for _, tt := range tests {
additionalTeams := postgresTeamMap.GetAdditionalSuperuserTeams(tt.team, tt.transitive)
if !util.IsEqualIgnoreOrder(additionalTeams, tt.teams) {
t.Errorf("%s: %v: expected additional teams %#v, got %#v", tt.name, tt.error, tt.teams, additionalTeams)
}
}
}

View File

@ -169,6 +169,8 @@ type Config struct {
EnableTeamSuperuser bool `name:"enable_team_superuser" default:"false"`
TeamAdminRole string `name:"team_admin_role" default:"admin"`
EnableAdminRoleForUsers bool `name:"enable_admin_role_for_users" default:"true"`
EnablePostgresTeamCRD bool `name:"enable_postgres_team_crd" default:"false"`
EnablePostgresTeamCRDSuperusers bool `name:"enable_postgres_team_crd_superusers" default:"false"`
EnableMasterLoadBalancer bool `name:"enable_master_load_balancer" default:"true"`
EnableReplicaLoadBalancer bool `name:"enable_replica_load_balancer" default:"false"`
CustomServiceAnnotations map[string]string `name:"custom_service_annotations"`
@ -199,7 +201,7 @@ type Config struct {
// MustMarshal marshals the config or panics
func (c Config) MustMarshal() string {
b, err := json.MarshalIndent(c, "", "\t")
b, err := json.MarshalIndent(c, "", " ")
if err != nil {
panic(err)
}

191
pkg/util/nicediff/diff.go Normal file
View File

@ -0,0 +1,191 @@
// Copyright 2013 Google Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package diff implements a linewise diff algorithm.
package nicediff
import (
"fmt"
"strings"
)
// Chunk represents a piece of the diff. A chunk will not have both added and
// deleted lines. Equal lines are always after any added or deleted lines.
// A Chunk may or may not have any lines in it, especially for the first or last
// chunk in a computation.
type Chunk struct {
Added []string
Deleted []string
Equal []string
}
func (c *Chunk) empty() bool {
return len(c.Added) == 0 && len(c.Deleted) == 0 && len(c.Equal) == 0
}
// Diff returns a string containing a line-by-line unified diff of the linewise
// changes required to make A into B. Each line is prefixed with '+', '-', or
// ' ' to indicate if it should be added, removed, or is correct respectively.
func Diff(A, B string, skipEqual bool) string {
aLines := strings.Split(A, "\n")
bLines := strings.Split(B, "\n")
return Render(DiffChunks(aLines, bLines), skipEqual)
}
// Render renders the slice of chunks into a representation that prefixes
// the lines with '+', '-', or ' ' depending on whether the line was added,
// removed, or equal (respectively).
func Render(chunks []Chunk, skipEqual bool) string {
buf := new(strings.Builder)
for _, c := range chunks {
for _, line := range c.Added {
fmt.Fprintf(buf, "+%s\n", line)
}
for _, line := range c.Deleted {
fmt.Fprintf(buf, "-%s\n", line)
}
if !skipEqual {
for _, line := range c.Equal {
fmt.Fprintf(buf, " %s\n", line)
}
}
}
return strings.TrimRight(buf.String(), "\n")
}
// DiffChunks uses an O(D(N+M)) shortest-edit-script algorithm
// to compute the edits required from A to B and returns the
// edit chunks.
func DiffChunks(a, b []string) []Chunk {
// algorithm: http://www.xmailserver.org/diff2.pdf
// We'll need these quantities a lot.
alen, blen := len(a), len(b) // M, N
// At most, it will require len(a) deletions and len(b) additions
// to transform a into b.
maxPath := alen + blen // MAX
if maxPath == 0 {
// degenerate case: two empty lists are the same
return nil
}
// Store the endpoint of the path for diagonals.
// We store only the a index, because the b index on any diagonal
// (which we know during the loop below) is aidx-diag.
// endpoint[maxPath] represents the 0 diagonal.
//
// Stated differently:
// endpoint[d] contains the aidx of a furthest reaching path in diagonal d
endpoint := make([]int, 2*maxPath+1) // V
saved := make([][]int, 0, 8) // Vs
save := func() {
dup := make([]int, len(endpoint))
copy(dup, endpoint)
saved = append(saved, dup)
}
var editDistance int // D
dLoop:
for editDistance = 0; editDistance <= maxPath; editDistance++ {
// The 0 diag(onal) represents equality of a and b. Each diagonal to
// the left is numbered one lower, to the right is one higher, from
// -alen to +blen. Negative diagonals favor differences from a,
// positive diagonals favor differences from b. The edit distance to a
// diagonal d cannot be shorter than d itself.
//
// The iterations of this loop cover either odds or evens, but not both,
// If odd indices are inputs, even indices are outputs and vice versa.
for diag := -editDistance; diag <= editDistance; diag += 2 { // k
var aidx int // x
switch {
case diag == -editDistance:
// This is a new diagonal; copy from previous iter
aidx = endpoint[maxPath-editDistance+1] + 0
case diag == editDistance:
// This is a new diagonal; copy from previous iter
aidx = endpoint[maxPath+editDistance-1] + 1
case endpoint[maxPath+diag+1] > endpoint[maxPath+diag-1]:
// diagonal d+1 was farther along, so use that
aidx = endpoint[maxPath+diag+1] + 0
default:
// diagonal d-1 was farther (or the same), so use that
aidx = endpoint[maxPath+diag-1] + 1
}
// On diagonal d, we can compute bidx from aidx.
bidx := aidx - diag // y
// See how far we can go on this diagonal before we find a difference.
for aidx < alen && bidx < blen && a[aidx] == b[bidx] {
aidx++
bidx++
}
// Store the end of the current edit chain.
endpoint[maxPath+diag] = aidx
// If we've found the end of both inputs, we're done!
if aidx >= alen && bidx >= blen {
save() // save the final path
break dLoop
}
}
save() // save the current path
}
if editDistance == 0 {
return nil
}
chunks := make([]Chunk, editDistance+1)
x, y := alen, blen
for d := editDistance; d > 0; d-- {
endpoint := saved[d]
diag := x - y
insert := diag == -d || (diag != d && endpoint[maxPath+diag-1] < endpoint[maxPath+diag+1])
x1 := endpoint[maxPath+diag]
var x0, xM, kk int
if insert {
kk = diag + 1
x0 = endpoint[maxPath+kk]
xM = x0
} else {
kk = diag - 1
x0 = endpoint[maxPath+kk]
xM = x0 + 1
}
y0 := x0 - kk
var c Chunk
if insert {
c.Added = b[y0:][:1]
} else {
c.Deleted = a[x0:][:1]
}
if xM < x1 {
c.Equal = a[xM:][:x1-xM]
}
x, y = x0, y0
chunks[d] = c
}
if x > 0 {
chunks[0].Equal = a[:x]
}
if chunks[0].empty() {
chunks = chunks[1:]
}
if len(chunks) == 0 {
return nil
}
return chunks
}

View File

@ -10,7 +10,9 @@ import (
"fmt"
"math/big"
"math/rand"
"reflect"
"regexp"
"sort"
"strings"
"time"
@ -134,6 +136,21 @@ func PrettyDiff(a, b interface{}) string {
return strings.Join(Diff(a, b), "\n")
}
// Compare two string slices while ignoring the order of elements
func IsEqualIgnoreOrder(a, b []string) bool {
if len(a) != len(b) {
return false
}
a_copy := make([]string, len(a))
b_copy := make([]string, len(b))
copy(a_copy, a)
copy(b_copy, b)
sort.Strings(a_copy)
sort.Strings(b_copy)
return reflect.DeepEqual(a_copy, b_copy)
}
// SubstractStringSlices finds elements in a that are not in b and return them as a result slice.
func SubstractStringSlices(a []string, b []string) (result []string, equal bool) {
// Slices are assumed to contain unique elements only
@ -176,6 +193,20 @@ func FindNamedStringSubmatch(r *regexp.Regexp, s string) map[string]string {
return res
}
// SliceContains
func SliceContains(slice interface{}, item interface{}) bool {
s := reflect.ValueOf(slice)
if s.Kind() != reflect.Slice {
panic("Invalid data-type")
}
for i := 0; i < s.Len(); i++ {
if s.Index(i).Interface() == item {
return true
}
}
return false
}
// MapContains returns true if and only if haystack contains all the keys from the needle with matching corresponding values
func MapContains(haystack, needle map[string]string) bool {
if len(haystack) < len(needle) {

View File

@ -43,6 +43,17 @@ var prettyDiffTest = []struct {
{[]int{1, 2, 3, 4}, []int{1, 2, 3, 4}, ""},
}
var isEqualIgnoreOrderTest = []struct {
inA []string
inB []string
outEqual bool
}{
{[]string{"a", "b", "c"}, []string{"a", "b", "c"}, true},
{[]string{"a", "b", "c"}, []string{"a", "c", "b"}, true},
{[]string{"a", "b"}, []string{"a", "c", "b"}, false},
{[]string{"a", "b", "c"}, []string{"a", "d", "c"}, false},
}
var substractTest = []struct {
inA []string
inB []string
@ -53,6 +64,16 @@ var substractTest = []struct {
{[]string{"a", "b", "c", "d"}, []string{"a", "bb", "c", "d"}, []string{"b"}, false},
}
var sliceContaintsTest = []struct {
slice []string
item string
out bool
}{
{[]string{"a", "b", "c"}, "a", true},
{[]string{"a", "b", "c"}, "d", false},
{[]string{}, "d", false},
}
var mapContaintsTest = []struct {
inA map[string]string
inB map[string]string
@ -136,6 +157,15 @@ func TestPrettyDiff(t *testing.T) {
}
}
func TestIsEqualIgnoreOrder(t *testing.T) {
for _, tt := range isEqualIgnoreOrderTest {
actualEqual := IsEqualIgnoreOrder(tt.inA, tt.inB)
if actualEqual != tt.outEqual {
t.Errorf("IsEqualIgnoreOrder expected: %t, got: %t", tt.outEqual, actualEqual)
}
}
}
func TestSubstractSlices(t *testing.T) {
for _, tt := range substractTest {
actualRes, actualEqual := SubstractStringSlices(tt.inA, tt.inB)
@ -160,6 +190,15 @@ func TestFindNamedStringSubmatch(t *testing.T) {
}
}
func TestSliceContains(t *testing.T) {
for _, tt := range sliceContaintsTest {
res := SliceContains(tt.slice, tt.item)
if res != tt.out {
t.Errorf("SliceContains expected: %#v, got: %#v", tt.out, res)
}
}
}
func TestMapContains(t *testing.T) {
for _, tt := range mapContaintsTest {
res := MapContains(tt.inA, tt.inB)
@ -180,3 +219,13 @@ func TestIsSmallerQuantity(t *testing.T) {
}
}
}
/*
func TestNiceDiff(t *testing.T) {
o := "a\nb\nc\n"
n := "b\nd\n"
d := nicediff.Diff(o, n, true)
t.Log(d)
// t.Errorf("Lets see output")
}
*/

View File

@ -104,6 +104,8 @@ USE_AWS_INSTANCE_PROFILE = (
getenv('USE_AWS_INSTANCE_PROFILE', 'false').lower() != 'false'
)
AWS_ENDPOINT = getenv('AWS_ENDPOINT')
tokens.configure()
tokens.manage('read-only')
tokens.start()
@ -1066,6 +1068,7 @@ def main(port, secret_key, debug, clusters: list):
logger.info(f'Tokeninfo URL: {TOKENINFO_URL}')
logger.info(f'Use AWS instance_profile: {USE_AWS_INSTANCE_PROFILE}')
logger.info(f'WAL-E S3 endpoint: {WALE_S3_ENDPOINT}')
logger.info(f'AWS S3 endpoint: {AWS_ENDPOINT}')
if TARGET_NAMESPACE is None:
@on_exception(

View File

@ -16,6 +16,8 @@ logger = getLogger(__name__)
session = Session()
AWS_ENDPOINT = getenv('AWS_ENDPOINT')
OPERATOR_CLUSTER_NAME_LABEL = getenv('OPERATOR_CLUSTER_NAME_LABEL', 'cluster-name')
COMMON_CLUSTER_LABEL = getenv('COMMON_CLUSTER_LABEL', '{"application":"spilo"}')
@ -266,7 +268,7 @@ def read_stored_clusters(bucket, prefix, delimiter='/'):
return [
prefix['Prefix'].split('/')[-2]
for prefix in these(
client('s3').list_objects(
client('s3', endpoint_url=AWS_ENDPOINT).list_objects(
Bucket=bucket,
Delimiter=delimiter,
Prefix=prefix,
@ -287,7 +289,7 @@ def read_versions(
return [
'base' if uid == 'wal' else uid
for prefix in these(
client('s3').list_objects(
client('s3', endpoint_url=AWS_ENDPOINT).list_objects(
Bucket=bucket,
Delimiter=delimiter,
Prefix=prefix + pg_cluster + delimiter,