From 668ef51d9fa76a1adda16efc5ae9610d50433718 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Mu=C3=9Fler?= Date: Wed, 21 Oct 2020 08:33:31 +0200 Subject: [PATCH] Printing config as multi log line entity, makes it readable and grepable on startup --- e2e/Dockerfile | 1 + e2e/run.sh | 11 +- e2e/tests/k8s_api.py | 468 +++++++++++++++++++++++++++++++++++ e2e/tests/test_e2e.py | 230 +---------------- pkg/controller/controller.go | 30 ++- pkg/util/config/config.go | 2 +- 6 files changed, 510 insertions(+), 232 deletions(-) create mode 100644 e2e/tests/k8s_api.py diff --git a/e2e/Dockerfile b/e2e/Dockerfile index d35a62618..5881ca5f2 100644 --- a/e2e/Dockerfile +++ b/e2e/Dockerfile @@ -18,6 +18,7 @@ RUN apt-get update \ && curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/bin/linux/amd64/kubectl \ && chmod +x ./kubectl \ && mv ./kubectl /usr/local/bin/kubectl \ + && apt-get vim && apt-get clean \ && rm -rf /var/lib/apt/lists/* diff --git a/e2e/run.sh b/e2e/run.sh index abdefcd1d..a96a1add5 100755 --- a/e2e/run.sh +++ b/e2e/run.sh @@ -32,11 +32,15 @@ function start_kind(){ fi export KUBECONFIG="${kubeconfig_path}" - kind create cluster --name ${cluster_name} --config kind-cluster-postgres-operator-e2e-tests.yaml - kind load docker-image "${operator_image}" --name ${cluster_name} + kind create cluster --name ${cluster_name} --config kind-cluster-postgres-operator-e2e-tests.yaml docker pull "${spilo_image}" kind load docker-image "${spilo_image}" --name ${cluster_name} -} +} + +function load_operator_image() { + export KUBECONFIG="${kubeconfig_path}" + kind load docker-image "${operator_image}" --name ${cluster_name} +} function set_kind_api_server_ip(){ echo "Setting up kind API server ip" @@ -72,6 +76,7 @@ function main(){ [[ -z ${NOCLEANUP-} ]] && trap "clean_up" QUIT TERM EXIT pull_images [[ ! -f ${kubeconfig_path} ]] && start_kind + load_operator_image set_kind_api_server_ip shift run_tests $@ diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py new file mode 100644 index 000000000..2b455ff19 --- /dev/null +++ b/e2e/tests/k8s_api.py @@ -0,0 +1,468 @@ +import json +import unittest +import time +import timeout_decorator +import subprocess +import warnings +import os +import yaml + +from datetime import datetime +from kubernetes import client, config + +class K8sApi: + + def __init__(self): + + # https://github.com/kubernetes-client/python/issues/309 + warnings.simplefilter("ignore", ResourceWarning) + + self.config = config.load_kube_config() + self.k8s_client = client.ApiClient() + + self.core_v1 = client.CoreV1Api() + self.apps_v1 = client.AppsV1Api() + self.batch_v1_beta1 = client.BatchV1beta1Api() + self.custom_objects_api = client.CustomObjectsApi() + self.policy_v1_beta1 = client.PolicyV1beta1Api() + self.storage_v1_api = client.StorageV1Api() + + +class K8s: + ''' + Wraps around K8s api client and helper methods. + ''' + + RETRY_TIMEOUT_SEC = 1 + + def __init__(self, labels='x=y', namespace='default'): + self.api = K8sApi() + self.labels=labels + self.namespace=namespace + + def get_pg_nodes(self, pg_cluster_name, namespace='default'): + master_pod_node = '' + replica_pod_nodes = [] + podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pg_cluster_name) + for pod in podsList.items: + if pod.metadata.labels.get('spilo-role') == 'master': + master_pod_node = pod.spec.node_name + elif pod.metadata.labels.get('spilo-role') == 'replica': + replica_pod_nodes.append(pod.spec.node_name) + + return master_pod_node, replica_pod_nodes + + def wait_for_operator_pod_start(self): + self. wait_for_pod_start("name=postgres-operator") + + def get_operator_pod(self): + pods = self.api.core_v1.list_namespaced_pod( + 'default', label_selector='name=postgres-operator' + ).items + + if pods: + return pods[0] + + return None + + def get_operator_log(self): + operator_pod = self.get_operator_pod() + pod_name = operator_pod.metadata.name + return self.api.core_v1.read_namespaced_pod_log( + name=pod_name, + namespace='default' + ) + + def wait_for_pod_start(self, pod_labels, namespace='default'): + pod_phase = 'No pod running' + while pod_phase != 'Running': + pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items + if pods: + pod_phase = pods[0].status.phase + + time.sleep(self.RETRY_TIMEOUT_SEC) + + def get_service_type(self, svc_labels, namespace='default'): + svc_type = '' + svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items + for svc in svcs: + svc_type = svc.spec.type + return svc_type + + def check_service_annotations(self, svc_labels, annotations, namespace='default'): + svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items + for svc in svcs: + for key, value in annotations.items(): + if key not in svc.metadata.annotations or svc.metadata.annotations[key] != value: + print("Expected key {} not found in annotations {}".format(key, svc.metadata.annotation)) + return False + return True + + def check_statefulset_annotations(self, sset_labels, annotations, namespace='default'): + ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=sset_labels, limit=1).items + for sset in ssets: + for key, value in annotations.items(): + if key not in sset.metadata.annotations or sset.metadata.annotations[key] != value: + print("Expected key {} not found in annotations {}".format(key, sset.metadata.annotation)) + return False + return True + + def wait_for_pg_to_scale(self, number_of_instances, namespace='default'): + + body = { + "spec": { + "numberOfInstances": number_of_instances + } + } + _ = self.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body) + + labels = 'application=spilo,cluster-name=acid-minimal-cluster' + while self.count_pods_with_label(labels) != number_of_instances: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_running_pods(self, labels, number, namespace=''): + while self.count_pods_with_label(labels) != number: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_pods_to_stop(self, labels, namespace=''): + while self.count_pods_with_label(labels) != 0: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_service(self, labels, namespace='default'): + def get_services(): + return self.api.core_v1.list_namespaced_service( + namespace, label_selector=labels + ).items + + while not get_services(): + time.sleep(self.RETRY_TIMEOUT_SEC) + + def count_pods_with_label(self, labels, namespace='default'): + return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items) + + def count_services_with_label(self, labels, namespace='default'): + return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items) + + def count_endpoints_with_label(self, labels, namespace='default'): + return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items) + + def count_secrets_with_label(self, labels, namespace='default'): + return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items) + + def count_statefulsets_with_label(self, labels, namespace='default'): + return len(self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=labels).items) + + def count_deployments_with_label(self, labels, namespace='default'): + return len(self.api.apps_v1.list_namespaced_deployment(namespace, label_selector=labels).items) + + def count_pdbs_with_label(self, labels, namespace='default'): + return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget( + namespace, label_selector=labels).items) + + def count_running_pods(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'): + pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items + return len(list(filter(lambda x: x.status.phase=='Running', pods))) + + def wait_for_pod_failover(self, failover_targets, labels, namespace='default'): + pod_phase = 'Failing over' + new_pod_node = '' + + while (pod_phase != 'Running') or (new_pod_node not in failover_targets): + pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items + if pods: + new_pod_node = pods[0].spec.node_name + pod_phase = pods[0].status.phase + time.sleep(self.RETRY_TIMEOUT_SEC) + + def get_logical_backup_job(self, namespace='default'): + return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo") + + def wait_for_logical_backup_job(self, expected_num_of_jobs): + while (len(self.get_logical_backup_job().items) != expected_num_of_jobs): + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_logical_backup_job_deletion(self): + self.wait_for_logical_backup_job(expected_num_of_jobs=0) + + def wait_for_logical_backup_job_creation(self): + self.wait_for_logical_backup_job(expected_num_of_jobs=1) + + def delete_operator_pod(self, step="Delete operator deplyment"): + operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name + self.api.apps_v1.patch_namespaced_deployment("postgres-operator","default", {"spec":{"template":{"metadata":{"annotations":{"step":"{}-{}".format(step, time.time())}}}}}) + self.wait_for_operator_pod_start() + + def update_config(self, config_map_patch, step="Updating operator deployment"): + self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) + self.delete_operator_pod(step=step) + + def create_with_kubectl(self, path): + return subprocess.run( + ["kubectl", "apply", "-f", path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + def exec_with_kubectl(self, pod, cmd): + return subprocess.run(["./exec.sh", pod, cmd], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + def get_patroni_state(self, pod): + r = self.exec_with_kubectl(pod, "patronictl list -f json") + if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[": + return [] + return json.loads(r.stdout.decode()) + + def get_patroni_running_members(self, pod): + result = self.get_patroni_state(pod) + return list(filter(lambda x: x["State"]=="running", result)) + + def get_statefulset_image(self, label_selector="application=spilo,cluster-name=acid-minimal-cluster", namespace='default'): + ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=label_selector, limit=1) + if len(ssets.items) == 0: + return None + return ssets.items[0].spec.template.spec.containers[0].image + + def get_effective_pod_image(self, pod_name, namespace='default'): + ''' + Get the Spilo image pod currently uses. In case of lazy rolling updates + it may differ from the one specified in the stateful set. + ''' + pod = self.api.core_v1.list_namespaced_pod( + namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name) + + if len(pod.items) == 0: + return None + return pod.items[0].spec.containers[0].image + + +class K8sBase: + ''' + To not fix all it once it is a copy of above + Wraps around K8s api client and helper methods. + ''' + + RETRY_TIMEOUT_SEC = 1 + + def __init__(self, labels='x=y', namespace='default'): + self.api = K8sApi() + self.labels=labels + self.namespace=namespace + + def get_pg_nodes(self, pg_cluster_name, namespace='default'): + master_pod_node = '' + replica_pod_nodes = [] + podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pg_cluster_name) + for pod in podsList.items: + if pod.metadata.labels.get('spilo-role') == 'master': + master_pod_node = pod.spec.node_name + elif pod.metadata.labels.get('spilo-role') == 'replica': + replica_pod_nodes.append(pod.spec.node_name) + + return master_pod_node, replica_pod_nodes + + def wait_for_operator_pod_start(self): + self. wait_for_pod_start("name=postgres-operator") + + def get_operator_pod(self): + pods = self.api.core_v1.list_namespaced_pod( + 'default', label_selector='name=postgres-operator' + ).items + + if pods: + return pods[0] + + return None + + def get_operator_log(self): + operator_pod = self.get_operator_pod() + pod_name = operator_pod.metadata.name + return self.api.core_v1.read_namespaced_pod_log( + name=pod_name, + namespace='default' + ) + + def wait_for_pod_start(self, pod_labels, namespace='default'): + pod_phase = 'No pod running' + while pod_phase != 'Running': + pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items + if pods: + pod_phase = pods[0].status.phase + + time.sleep(self.RETRY_TIMEOUT_SEC) + + def get_service_type(self, svc_labels, namespace='default'): + svc_type = '' + svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items + for svc in svcs: + svc_type = svc.spec.type + return svc_type + + def check_service_annotations(self, svc_labels, annotations, namespace='default'): + svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items + for svc in svcs: + for key, value in annotations.items(): + if key not in svc.metadata.annotations or svc.metadata.annotations[key] != value: + print("Expected key {} not found in annotations {}".format(key, svc.metadata.annotation)) + return False + return True + + def check_statefulset_annotations(self, sset_labels, annotations, namespace='default'): + ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=sset_labels, limit=1).items + for sset in ssets: + for key, value in annotations.items(): + if key not in sset.metadata.annotations or sset.metadata.annotations[key] != value: + print("Expected key {} not found in annotations {}".format(key, sset.metadata.annotation)) + return False + return True + + def wait_for_pg_to_scale(self, number_of_instances, namespace='default'): + + body = { + "spec": { + "numberOfInstances": number_of_instances + } + } + _ = self.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body) + + labels = 'application=spilo,cluster-name=acid-minimal-cluster' + while self.count_pods_with_label(labels) != number_of_instances: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_running_pods(self, labels, number, namespace=''): + while self.count_pods_with_label(labels) != number: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_pods_to_stop(self, labels, namespace=''): + while self.count_pods_with_label(labels) != 0: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_service(self, labels, namespace='default'): + def get_services(): + return self.api.core_v1.list_namespaced_service( + namespace, label_selector=labels + ).items + + while not get_services(): + time.sleep(self.RETRY_TIMEOUT_SEC) + + def count_pods_with_label(self, labels, namespace='default'): + return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items) + + def count_services_with_label(self, labels, namespace='default'): + return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items) + + def count_endpoints_with_label(self, labels, namespace='default'): + return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items) + + def count_secrets_with_label(self, labels, namespace='default'): + return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items) + + def count_statefulsets_with_label(self, labels, namespace='default'): + return len(self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=labels).items) + + def count_deployments_with_label(self, labels, namespace='default'): + return len(self.api.apps_v1.list_namespaced_deployment(namespace, label_selector=labels).items) + + def count_pdbs_with_label(self, labels, namespace='default'): + return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget( + namespace, label_selector=labels).items) + + def count_running_pods(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'): + pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items + return len(list(filter(lambda x: x.status.phase=='Running', pods))) + + def wait_for_pod_failover(self, failover_targets, labels, namespace='default'): + pod_phase = 'Failing over' + new_pod_node = '' + + while (pod_phase != 'Running') or (new_pod_node not in failover_targets): + pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items + if pods: + new_pod_node = pods[0].spec.node_name + pod_phase = pods[0].status.phase + time.sleep(self.RETRY_TIMEOUT_SEC) + + def get_logical_backup_job(self, namespace='default'): + return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo") + + def wait_for_logical_backup_job(self, expected_num_of_jobs): + while (len(self.get_logical_backup_job().items) != expected_num_of_jobs): + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_logical_backup_job_deletion(self): + self.wait_for_logical_backup_job(expected_num_of_jobs=0) + + def wait_for_logical_backup_job_creation(self): + self.wait_for_logical_backup_job(expected_num_of_jobs=1) + + def delete_operator_pod(self, step="Delete operator deplyment"): + operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name + self.api.apps_v1.patch_namespaced_deployment("postgres-operator","default", {"spec":{"template":{"metadata":{"annotations":{"step":"{}-{}".format(step, time.time())}}}}}) + self.wait_for_operator_pod_start() + + def update_config(self, config_map_patch, step="Updating operator deployment"): + self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) + self.delete_operator_pod(step=step) + + def create_with_kubectl(self, path): + return subprocess.run( + ["kubectl", "apply", "-f", path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + def exec_with_kubectl(self, pod, cmd): + return subprocess.run(["./exec.sh", pod, cmd], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + def get_patroni_state(self, pod): + r = self.exec_with_kubectl(pod, "patronictl list -f json") + if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[": + return [] + return json.loads(r.stdout.decode()) + + def get_patroni_running_members(self, pod): + result = self.get_patroni_state(pod) + return list(filter(lambda x: x["State"]=="running", result)) + + def get_statefulset_image(self, label_selector="application=spilo,cluster-name=acid-minimal-cluster", namespace='default'): + ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=label_selector, limit=1) + if len(ssets.items) == 0: + return None + return ssets.items[0].spec.template.spec.containers[0].image + + def get_effective_pod_image(self, pod_name, namespace='default'): + ''' + Get the Spilo image pod currently uses. In case of lazy rolling updates + it may differ from the one specified in the stateful set. + ''' + pod = self.api.core_v1.list_namespaced_pod( + namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name) + + if len(pod.items) == 0: + return None + return pod.items[0].spec.containers[0].image + + +class K8sOperator(K8sBase): + def __init__(self, labels="name=postgres-operator", namespace="default"): + super().__init__(labels, namespace) + +class K8sPostgres(K8sBase): + def __init__(self, labels="cluster-name=acid-minimal-cluster", namespace="default"): + super().__init__(labels, namespace) + + def get_pg_nodes(self): + master_pod_node = '' + replica_pod_nodes = [] + podsList = self.api.core_v1.list_namespaced_pod(self.namespace, label_selector=self.labels) + for pod in podsList.items: + if pod.metadata.labels.get('spilo-role') == 'master': + master_pod_node = pod.spec.node_name + elif pod.metadata.labels.get('spilo-role') == 'replica': + replica_pod_nodes.append(pod.spec.node_name) + + return master_pod_node, replica_pod_nodes \ No newline at end of file diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 1c5aad81f..f06ebef88 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -10,6 +10,8 @@ import yaml from datetime import datetime from kubernetes import client, config +from k8s_api import K8s + SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-12:1.6-p5" SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-cdp-12:1.6-p114" @@ -260,8 +262,7 @@ class EndToEndTestCase(unittest.TestCase): k8s = self.k8s # update infrastructure roles description secret_name = "postgresql-infrastructure-roles" - roles = "secretname: postgresql-infrastructure-roles-new, \ - userkey: user, rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon" + roles = "secretname: postgresql-infrastructure-roles-new,userkey: user, rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon" patch_infrastructure_roles = { "data": { "infrastructure_roles_secret_name": secret_name, @@ -297,8 +298,8 @@ class EndToEndTestCase(unittest.TestCase): "Origin": 2, }) return True - else: - return False + + return False self.eventuallyTrue(verify_role, "infrastructure role setup is not loaded") @@ -361,6 +362,7 @@ class EndToEndTestCase(unittest.TestCase): # verify only pod-0 which was deleted got new image from statefulset self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), conf_image, "Delete pod-0 did not get new spilo image") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running") self.assertNotEqual(lambda: k8s.get_effective_pod_image(pod1), SPILO_CURRENT, "pod-1 should not have change Docker image to {}".format(SPILO_CURRENT)) # clean up @@ -872,229 +874,9 @@ class EndToEndTestCase(unittest.TestCase): return True -class K8sApi: - - def __init__(self): - - # https://github.com/kubernetes-client/python/issues/309 - warnings.simplefilter("ignore", ResourceWarning) - - self.config = config.load_kube_config() - self.k8s_client = client.ApiClient() - - self.core_v1 = client.CoreV1Api() - self.apps_v1 = client.AppsV1Api() - self.batch_v1_beta1 = client.BatchV1beta1Api() - self.custom_objects_api = client.CustomObjectsApi() - self.policy_v1_beta1 = client.PolicyV1beta1Api() - self.storage_v1_api = client.StorageV1Api() -class K8s: - ''' - Wraps around K8s api client and helper methods. - ''' - RETRY_TIMEOUT_SEC = 1 - - def __init__(self): - self.api = K8sApi() - - def get_pg_nodes(self, pg_cluster_name, namespace='default'): - master_pod_node = '' - replica_pod_nodes = [] - podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pg_cluster_name) - for pod in podsList.items: - if pod.metadata.labels.get('spilo-role') == 'master': - master_pod_node = pod.spec.node_name - elif pod.metadata.labels.get('spilo-role') == 'replica': - replica_pod_nodes.append(pod.spec.node_name) - - return master_pod_node, replica_pod_nodes - - def wait_for_operator_pod_start(self): - self. wait_for_pod_start("name=postgres-operator") - - def get_operator_pod(self): - pods = self.api.core_v1.list_namespaced_pod( - 'default', label_selector='name=postgres-operator' - ).items - - if pods: - return pods[0] - - return None - - def get_operator_log(self): - operator_pod = self.get_operator_pod() - pod_name = operator_pod.metadata.name - return self.api.core_v1.read_namespaced_pod_log( - name=pod_name, - namespace='default' - ) - - def wait_for_pod_start(self, pod_labels, namespace='default'): - pod_phase = 'No pod running' - while pod_phase != 'Running': - pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items - if pods: - pod_phase = pods[0].status.phase - - time.sleep(self.RETRY_TIMEOUT_SEC) - - def get_service_type(self, svc_labels, namespace='default'): - svc_type = '' - svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items - for svc in svcs: - svc_type = svc.spec.type - return svc_type - - def check_service_annotations(self, svc_labels, annotations, namespace='default'): - svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items - for svc in svcs: - for key, value in annotations.items(): - if key not in svc.metadata.annotations or svc.metadata.annotations[key] != value: - print("Expected key {} not found in annotations {}".format(key, svc.metadata.annotation)) - return False - return True - - def check_statefulset_annotations(self, sset_labels, annotations, namespace='default'): - ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=sset_labels, limit=1).items - for sset in ssets: - for key, value in annotations.items(): - if key not in sset.metadata.annotations or sset.metadata.annotations[key] != value: - print("Expected key {} not found in annotations {}".format(key, sset.metadata.annotation)) - return False - return True - - def wait_for_pg_to_scale(self, number_of_instances, namespace='default'): - - body = { - "spec": { - "numberOfInstances": number_of_instances - } - } - _ = self.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body) - - labels = 'application=spilo,cluster-name=acid-minimal-cluster' - while self.count_pods_with_label(labels) != number_of_instances: - time.sleep(self.RETRY_TIMEOUT_SEC) - - def wait_for_running_pods(self, labels, number, namespace=''): - while self.count_pods_with_label(labels) != number: - time.sleep(self.RETRY_TIMEOUT_SEC) - - def wait_for_pods_to_stop(self, labels, namespace=''): - while self.count_pods_with_label(labels) != 0: - time.sleep(self.RETRY_TIMEOUT_SEC) - - def wait_for_service(self, labels, namespace='default'): - def get_services(): - return self.api.core_v1.list_namespaced_service( - namespace, label_selector=labels - ).items - - while not get_services(): - time.sleep(self.RETRY_TIMEOUT_SEC) - - def count_pods_with_label(self, labels, namespace='default'): - return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items) - - def count_services_with_label(self, labels, namespace='default'): - return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items) - - def count_endpoints_with_label(self, labels, namespace='default'): - return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items) - - def count_secrets_with_label(self, labels, namespace='default'): - return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items) - - def count_statefulsets_with_label(self, labels, namespace='default'): - return len(self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=labels).items) - - def count_deployments_with_label(self, labels, namespace='default'): - return len(self.api.apps_v1.list_namespaced_deployment(namespace, label_selector=labels).items) - - def count_pdbs_with_label(self, labels, namespace='default'): - return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget( - namespace, label_selector=labels).items) - - def count_running_pods(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'): - pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items - return len(list(filter(lambda x: x.status.phase=='Running', pods))) - - def wait_for_pod_failover(self, failover_targets, labels, namespace='default'): - pod_phase = 'Failing over' - new_pod_node = '' - - while (pod_phase != 'Running') or (new_pod_node not in failover_targets): - pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items - if pods: - new_pod_node = pods[0].spec.node_name - pod_phase = pods[0].status.phase - time.sleep(self.RETRY_TIMEOUT_SEC) - - def get_logical_backup_job(self, namespace='default'): - return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo") - - def wait_for_logical_backup_job(self, expected_num_of_jobs): - while (len(self.get_logical_backup_job().items) != expected_num_of_jobs): - time.sleep(self.RETRY_TIMEOUT_SEC) - - def wait_for_logical_backup_job_deletion(self): - self.wait_for_logical_backup_job(expected_num_of_jobs=0) - - def wait_for_logical_backup_job_creation(self): - self.wait_for_logical_backup_job(expected_num_of_jobs=1) - - def delete_operator_pod(self, step="Delete operator deplyment"): - operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name - self.api.apps_v1.patch_namespaced_deployment("postgres-operator","default", {"spec":{"template":{"metadata":{"annotations":{"step":"{}-{}".format(step, time.time())}}}}}) - self.wait_for_operator_pod_start() - - def update_config(self, config_map_patch, step="Updating operator deployment"): - self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) - self.delete_operator_pod(step=step) - - def create_with_kubectl(self, path): - return subprocess.run( - ["kubectl", "apply", "-f", path], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - def exec_with_kubectl(self, pod, cmd): - return subprocess.run(["./exec.sh", pod, cmd], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - def get_patroni_state(self, pod): - r = self.exec_with_kubectl(pod, "patronictl list -f json") - if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[": - return [] - return json.loads(r.stdout.decode()) - - def get_patroni_running_members(self, pod): - result = self.get_patroni_state(pod) - return list(filter(lambda x: x["State"]=="running", result)) - - def get_statefulset_image(self, label_selector="application=spilo,cluster-name=acid-minimal-cluster", namespace='default'): - ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=label_selector, limit=1) - if len(ssets.items) == 0: - return None - return ssets.items[0].spec.template.spec.containers[0].image - - def get_effective_pod_image(self, pod_name, namespace='default'): - ''' - Get the Spilo image pod currently uses. In case of lazy rolling updates - it may differ from the one specified in the stateful set. - ''' - pod = self.api.core_v1.list_namespaced_pod( - namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name) - - if len(pod.items) == 0: - return None - return pod.items[0].spec.containers[0].image if __name__ == '__main__': diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8e9f02029..78aac3392 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -1,9 +1,12 @@ package controller import ( + "bytes" "context" + "encoding/json" "fmt" "os" + "strings" "sync" "time" @@ -190,10 +193,18 @@ func (c *Controller) warnOnDeprecatedOperatorParameters() { } } +func compactValue(v string) string { + var compact bytes.Buffer + if err := json.Compact(&compact, []byte(v)); err != nil { + panic("Hard coded json strings broken!") + } + return compact.String() +} + func (c *Controller) initPodServiceAccount() { if c.opConfig.PodServiceAccountDefinition == "" { - c.opConfig.PodServiceAccountDefinition = ` + stringValue := ` { "apiVersion": "v1", "kind": "ServiceAccount", @@ -201,6 +212,9 @@ func (c *Controller) initPodServiceAccount() { "name": "postgres-pod" } }` + + c.opConfig.PodServiceAccountDefinition = compactValue(stringValue) + } // re-uses k8s internal parsing. See k8s client-go issue #193 for explanation @@ -230,7 +244,7 @@ func (c *Controller) initRoleBinding() { // operator binds it to the cluster role with sufficient privileges // we assume the role is created by the k8s administrator if c.opConfig.PodServiceAccountRoleBindingDefinition == "" { - c.opConfig.PodServiceAccountRoleBindingDefinition = fmt.Sprintf(` + stringValue := fmt.Sprintf(` { "apiVersion": "rbac.authorization.k8s.io/v1", "kind": "RoleBinding", @@ -249,6 +263,7 @@ func (c *Controller) initRoleBinding() { } ] }`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name) + c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue) } c.logger.Info("Parse role bindings") // re-uses k8s internal parsing. See k8s client-go issue #193 for explanation @@ -267,7 +282,14 @@ func (c *Controller) initRoleBinding() { } - // actual roles bindings are deployed at the time of Postgres/Spilo cluster creation + // actual roles bindings ar*logrus.Entrye deployed at the time of Postgres/Spilo cluster creation +} + +func logMultiLineConfig(log *logrus.Entry, config string) { + lines := strings.Split(config, "\n") + for _, l := range lines { + log.Infof("%s", l) + } } func (c *Controller) initController() { @@ -302,7 +324,7 @@ func (c *Controller) initController() { c.logger.Logger.Level = logrus.DebugLevel } - c.logger.Infof("config: %s", c.opConfig.MustMarshal()) + logMultiLineConfig(c.logger, c.opConfig.MustMarshal()) roleDefs := c.getInfrastructureRoleDefinitions() if infraRoles, err := c.getInfrastructureRoles(roleDefs); err != nil { diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 7a1ae8a41..35991248b 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -199,7 +199,7 @@ type Config struct { // MustMarshal marshals the config or panics func (c Config) MustMarshal() string { - b, err := json.MarshalIndent(c, "", "\t") + b, err := json.MarshalIndent(c, "", " ") if err != nil { panic(err) }