Printing config as multi log line entity, makes it readable and grepable on startup

This commit is contained in:
Jan Mußler 2020-10-21 08:33:31 +02:00
parent c6c4c4cc8a
commit 668ef51d9f
6 changed files with 510 additions and 232 deletions

View File

@ -18,6 +18,7 @@ RUN apt-get update \
&& curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/bin/linux/amd64/kubectl \ && curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/bin/linux/amd64/kubectl \
&& chmod +x ./kubectl \ && chmod +x ./kubectl \
&& mv ./kubectl /usr/local/bin/kubectl \ && mv ./kubectl /usr/local/bin/kubectl \
&& apt-get vim
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*

View File

@ -33,11 +33,15 @@ function start_kind(){
export KUBECONFIG="${kubeconfig_path}" export KUBECONFIG="${kubeconfig_path}"
kind create cluster --name ${cluster_name} --config kind-cluster-postgres-operator-e2e-tests.yaml kind create cluster --name ${cluster_name} --config kind-cluster-postgres-operator-e2e-tests.yaml
kind load docker-image "${operator_image}" --name ${cluster_name}
docker pull "${spilo_image}" docker pull "${spilo_image}"
kind load docker-image "${spilo_image}" --name ${cluster_name} kind load docker-image "${spilo_image}" --name ${cluster_name}
} }
function load_operator_image() {
export KUBECONFIG="${kubeconfig_path}"
kind load docker-image "${operator_image}" --name ${cluster_name}
}
function set_kind_api_server_ip(){ function set_kind_api_server_ip(){
echo "Setting up kind API server ip" echo "Setting up kind API server ip"
# use the actual kubeconfig to connect to the 'kind' API server # use the actual kubeconfig to connect to the 'kind' API server
@ -72,6 +76,7 @@ function main(){
[[ -z ${NOCLEANUP-} ]] && trap "clean_up" QUIT TERM EXIT [[ -z ${NOCLEANUP-} ]] && trap "clean_up" QUIT TERM EXIT
pull_images pull_images
[[ ! -f ${kubeconfig_path} ]] && start_kind [[ ! -f ${kubeconfig_path} ]] && start_kind
load_operator_image
set_kind_api_server_ip set_kind_api_server_ip
shift shift
run_tests $@ run_tests $@

468
e2e/tests/k8s_api.py Normal file
View File

@ -0,0 +1,468 @@
import json
import unittest
import time
import timeout_decorator
import subprocess
import warnings
import os
import yaml
from datetime import datetime
from kubernetes import client, config
class K8sApi:
def __init__(self):
# https://github.com/kubernetes-client/python/issues/309
warnings.simplefilter("ignore", ResourceWarning)
self.config = config.load_kube_config()
self.k8s_client = client.ApiClient()
self.core_v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.batch_v1_beta1 = client.BatchV1beta1Api()
self.custom_objects_api = client.CustomObjectsApi()
self.policy_v1_beta1 = client.PolicyV1beta1Api()
self.storage_v1_api = client.StorageV1Api()
class K8s:
'''
Wraps around K8s api client and helper methods.
'''
RETRY_TIMEOUT_SEC = 1
def __init__(self, labels='x=y', namespace='default'):
self.api = K8sApi()
self.labels=labels
self.namespace=namespace
def get_pg_nodes(self, pg_cluster_name, namespace='default'):
master_pod_node = ''
replica_pod_nodes = []
podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pg_cluster_name)
for pod in podsList.items:
if pod.metadata.labels.get('spilo-role') == 'master':
master_pod_node = pod.spec.node_name
elif pod.metadata.labels.get('spilo-role') == 'replica':
replica_pod_nodes.append(pod.spec.node_name)
return master_pod_node, replica_pod_nodes
def wait_for_operator_pod_start(self):
self. wait_for_pod_start("name=postgres-operator")
def get_operator_pod(self):
pods = self.api.core_v1.list_namespaced_pod(
'default', label_selector='name=postgres-operator'
).items
if pods:
return pods[0]
return None
def get_operator_log(self):
operator_pod = self.get_operator_pod()
pod_name = operator_pod.metadata.name
return self.api.core_v1.read_namespaced_pod_log(
name=pod_name,
namespace='default'
)
def wait_for_pod_start(self, pod_labels, namespace='default'):
pod_phase = 'No pod running'
while pod_phase != 'Running':
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items
if pods:
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_service_type(self, svc_labels, namespace='default'):
svc_type = ''
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
svc_type = svc.spec.type
return svc_type
def check_service_annotations(self, svc_labels, annotations, namespace='default'):
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
for key, value in annotations.items():
if key not in svc.metadata.annotations or svc.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, svc.metadata.annotation))
return False
return True
def check_statefulset_annotations(self, sset_labels, annotations, namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=sset_labels, limit=1).items
for sset in ssets:
for key, value in annotations.items():
if key not in sset.metadata.annotations or sset.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, sset.metadata.annotation))
return False
return True
def wait_for_pg_to_scale(self, number_of_instances, namespace='default'):
body = {
"spec": {
"numberOfInstances": number_of_instances
}
}
_ = self.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body)
labels = 'application=spilo,cluster-name=acid-minimal-cluster'
while self.count_pods_with_label(labels) != number_of_instances:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_running_pods(self, labels, number, namespace=''):
while self.count_pods_with_label(labels) != number:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_pods_to_stop(self, labels, namespace=''):
while self.count_pods_with_label(labels) != 0:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_service(self, labels, namespace='default'):
def get_services():
return self.api.core_v1.list_namespaced_service(
namespace, label_selector=labels
).items
while not get_services():
time.sleep(self.RETRY_TIMEOUT_SEC)
def count_pods_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items)
def count_services_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items)
def count_endpoints_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items)
def count_secrets_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items)
def count_statefulsets_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=labels).items)
def count_deployments_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_deployment(namespace, label_selector=labels).items)
def count_pdbs_with_label(self, labels, namespace='default'):
return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget(
namespace, label_selector=labels).items)
def count_running_pods(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
return len(list(filter(lambda x: x.status.phase=='Running', pods)))
def wait_for_pod_failover(self, failover_targets, labels, namespace='default'):
pod_phase = 'Failing over'
new_pod_node = ''
while (pod_phase != 'Running') or (new_pod_node not in failover_targets):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
if pods:
new_pod_node = pods[0].spec.node_name
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_logical_backup_job(self, namespace='default'):
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
def wait_for_logical_backup_job(self, expected_num_of_jobs):
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_logical_backup_job_deletion(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=0)
def wait_for_logical_backup_job_creation(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
def delete_operator_pod(self, step="Delete operator deplyment"):
operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.apps_v1.patch_namespaced_deployment("postgres-operator","default", {"spec":{"template":{"metadata":{"annotations":{"step":"{}-{}".format(step, time.time())}}}}})
self.wait_for_operator_pod_start()
def update_config(self, config_map_patch, step="Updating operator deployment"):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
self.delete_operator_pod(step=step)
def create_with_kubectl(self, path):
return subprocess.run(
["kubectl", "apply", "-f", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def exec_with_kubectl(self, pod, cmd):
return subprocess.run(["./exec.sh", pod, cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def get_patroni_state(self, pod):
r = self.exec_with_kubectl(pod, "patronictl list -f json")
if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[":
return []
return json.loads(r.stdout.decode())
def get_patroni_running_members(self, pod):
result = self.get_patroni_state(pod)
return list(filter(lambda x: x["State"]=="running", result))
def get_statefulset_image(self, label_selector="application=spilo,cluster-name=acid-minimal-cluster", namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=label_selector, limit=1)
if len(ssets.items) == 0:
return None
return ssets.items[0].spec.template.spec.containers[0].image
def get_effective_pod_image(self, pod_name, namespace='default'):
'''
Get the Spilo image pod currently uses. In case of lazy rolling updates
it may differ from the one specified in the stateful set.
'''
pod = self.api.core_v1.list_namespaced_pod(
namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name)
if len(pod.items) == 0:
return None
return pod.items[0].spec.containers[0].image
class K8sBase:
'''
To not fix all it once it is a copy of above
Wraps around K8s api client and helper methods.
'''
RETRY_TIMEOUT_SEC = 1
def __init__(self, labels='x=y', namespace='default'):
self.api = K8sApi()
self.labels=labels
self.namespace=namespace
def get_pg_nodes(self, pg_cluster_name, namespace='default'):
master_pod_node = ''
replica_pod_nodes = []
podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pg_cluster_name)
for pod in podsList.items:
if pod.metadata.labels.get('spilo-role') == 'master':
master_pod_node = pod.spec.node_name
elif pod.metadata.labels.get('spilo-role') == 'replica':
replica_pod_nodes.append(pod.spec.node_name)
return master_pod_node, replica_pod_nodes
def wait_for_operator_pod_start(self):
self. wait_for_pod_start("name=postgres-operator")
def get_operator_pod(self):
pods = self.api.core_v1.list_namespaced_pod(
'default', label_selector='name=postgres-operator'
).items
if pods:
return pods[0]
return None
def get_operator_log(self):
operator_pod = self.get_operator_pod()
pod_name = operator_pod.metadata.name
return self.api.core_v1.read_namespaced_pod_log(
name=pod_name,
namespace='default'
)
def wait_for_pod_start(self, pod_labels, namespace='default'):
pod_phase = 'No pod running'
while pod_phase != 'Running':
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items
if pods:
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_service_type(self, svc_labels, namespace='default'):
svc_type = ''
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
svc_type = svc.spec.type
return svc_type
def check_service_annotations(self, svc_labels, annotations, namespace='default'):
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
for key, value in annotations.items():
if key not in svc.metadata.annotations or svc.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, svc.metadata.annotation))
return False
return True
def check_statefulset_annotations(self, sset_labels, annotations, namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=sset_labels, limit=1).items
for sset in ssets:
for key, value in annotations.items():
if key not in sset.metadata.annotations or sset.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, sset.metadata.annotation))
return False
return True
def wait_for_pg_to_scale(self, number_of_instances, namespace='default'):
body = {
"spec": {
"numberOfInstances": number_of_instances
}
}
_ = self.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body)
labels = 'application=spilo,cluster-name=acid-minimal-cluster'
while self.count_pods_with_label(labels) != number_of_instances:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_running_pods(self, labels, number, namespace=''):
while self.count_pods_with_label(labels) != number:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_pods_to_stop(self, labels, namespace=''):
while self.count_pods_with_label(labels) != 0:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_service(self, labels, namespace='default'):
def get_services():
return self.api.core_v1.list_namespaced_service(
namespace, label_selector=labels
).items
while not get_services():
time.sleep(self.RETRY_TIMEOUT_SEC)
def count_pods_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items)
def count_services_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items)
def count_endpoints_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items)
def count_secrets_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items)
def count_statefulsets_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=labels).items)
def count_deployments_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_deployment(namespace, label_selector=labels).items)
def count_pdbs_with_label(self, labels, namespace='default'):
return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget(
namespace, label_selector=labels).items)
def count_running_pods(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
return len(list(filter(lambda x: x.status.phase=='Running', pods)))
def wait_for_pod_failover(self, failover_targets, labels, namespace='default'):
pod_phase = 'Failing over'
new_pod_node = ''
while (pod_phase != 'Running') or (new_pod_node not in failover_targets):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
if pods:
new_pod_node = pods[0].spec.node_name
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_logical_backup_job(self, namespace='default'):
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
def wait_for_logical_backup_job(self, expected_num_of_jobs):
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_logical_backup_job_deletion(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=0)
def wait_for_logical_backup_job_creation(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
def delete_operator_pod(self, step="Delete operator deplyment"):
operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.apps_v1.patch_namespaced_deployment("postgres-operator","default", {"spec":{"template":{"metadata":{"annotations":{"step":"{}-{}".format(step, time.time())}}}}})
self.wait_for_operator_pod_start()
def update_config(self, config_map_patch, step="Updating operator deployment"):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
self.delete_operator_pod(step=step)
def create_with_kubectl(self, path):
return subprocess.run(
["kubectl", "apply", "-f", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def exec_with_kubectl(self, pod, cmd):
return subprocess.run(["./exec.sh", pod, cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def get_patroni_state(self, pod):
r = self.exec_with_kubectl(pod, "patronictl list -f json")
if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[":
return []
return json.loads(r.stdout.decode())
def get_patroni_running_members(self, pod):
result = self.get_patroni_state(pod)
return list(filter(lambda x: x["State"]=="running", result))
def get_statefulset_image(self, label_selector="application=spilo,cluster-name=acid-minimal-cluster", namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=label_selector, limit=1)
if len(ssets.items) == 0:
return None
return ssets.items[0].spec.template.spec.containers[0].image
def get_effective_pod_image(self, pod_name, namespace='default'):
'''
Get the Spilo image pod currently uses. In case of lazy rolling updates
it may differ from the one specified in the stateful set.
'''
pod = self.api.core_v1.list_namespaced_pod(
namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name)
if len(pod.items) == 0:
return None
return pod.items[0].spec.containers[0].image
class K8sOperator(K8sBase):
def __init__(self, labels="name=postgres-operator", namespace="default"):
super().__init__(labels, namespace)
class K8sPostgres(K8sBase):
def __init__(self, labels="cluster-name=acid-minimal-cluster", namespace="default"):
super().__init__(labels, namespace)
def get_pg_nodes(self):
master_pod_node = ''
replica_pod_nodes = []
podsList = self.api.core_v1.list_namespaced_pod(self.namespace, label_selector=self.labels)
for pod in podsList.items:
if pod.metadata.labels.get('spilo-role') == 'master':
master_pod_node = pod.spec.node_name
elif pod.metadata.labels.get('spilo-role') == 'replica':
replica_pod_nodes.append(pod.spec.node_name)
return master_pod_node, replica_pod_nodes

View File

@ -10,6 +10,8 @@ import yaml
from datetime import datetime from datetime import datetime
from kubernetes import client, config from kubernetes import client, config
from k8s_api import K8s
SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-12:1.6-p5" SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-12:1.6-p5"
SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-cdp-12:1.6-p114" SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-cdp-12:1.6-p114"
@ -260,8 +262,7 @@ class EndToEndTestCase(unittest.TestCase):
k8s = self.k8s k8s = self.k8s
# update infrastructure roles description # update infrastructure roles description
secret_name = "postgresql-infrastructure-roles" secret_name = "postgresql-infrastructure-roles"
roles = "secretname: postgresql-infrastructure-roles-new, \ roles = "secretname: postgresql-infrastructure-roles-new,userkey: user, rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon"
userkey: user, rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon"
patch_infrastructure_roles = { patch_infrastructure_roles = {
"data": { "data": {
"infrastructure_roles_secret_name": secret_name, "infrastructure_roles_secret_name": secret_name,
@ -297,8 +298,8 @@ class EndToEndTestCase(unittest.TestCase):
"Origin": 2, "Origin": 2,
}) })
return True return True
else:
return False return False
self.eventuallyTrue(verify_role, "infrastructure role setup is not loaded") self.eventuallyTrue(verify_role, "infrastructure role setup is not loaded")
@ -361,6 +362,7 @@ class EndToEndTestCase(unittest.TestCase):
# verify only pod-0 which was deleted got new image from statefulset # verify only pod-0 which was deleted got new image from statefulset
self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), conf_image, "Delete pod-0 did not get new spilo image") self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), conf_image, "Delete pod-0 did not get new spilo image")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running")
self.assertNotEqual(lambda: k8s.get_effective_pod_image(pod1), SPILO_CURRENT, "pod-1 should not have change Docker image to {}".format(SPILO_CURRENT)) self.assertNotEqual(lambda: k8s.get_effective_pod_image(pod1), SPILO_CURRENT, "pod-1 should not have change Docker image to {}".format(SPILO_CURRENT))
# clean up # clean up
@ -872,229 +874,9 @@ class EndToEndTestCase(unittest.TestCase):
return True return True
class K8sApi:
def __init__(self):
# https://github.com/kubernetes-client/python/issues/309
warnings.simplefilter("ignore", ResourceWarning)
self.config = config.load_kube_config()
self.k8s_client = client.ApiClient()
self.core_v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.batch_v1_beta1 = client.BatchV1beta1Api()
self.custom_objects_api = client.CustomObjectsApi()
self.policy_v1_beta1 = client.PolicyV1beta1Api()
self.storage_v1_api = client.StorageV1Api()
class K8s:
'''
Wraps around K8s api client and helper methods.
'''
RETRY_TIMEOUT_SEC = 1
def __init__(self):
self.api = K8sApi()
def get_pg_nodes(self, pg_cluster_name, namespace='default'):
master_pod_node = ''
replica_pod_nodes = []
podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pg_cluster_name)
for pod in podsList.items:
if pod.metadata.labels.get('spilo-role') == 'master':
master_pod_node = pod.spec.node_name
elif pod.metadata.labels.get('spilo-role') == 'replica':
replica_pod_nodes.append(pod.spec.node_name)
return master_pod_node, replica_pod_nodes
def wait_for_operator_pod_start(self):
self. wait_for_pod_start("name=postgres-operator")
def get_operator_pod(self):
pods = self.api.core_v1.list_namespaced_pod(
'default', label_selector='name=postgres-operator'
).items
if pods:
return pods[0]
return None
def get_operator_log(self):
operator_pod = self.get_operator_pod()
pod_name = operator_pod.metadata.name
return self.api.core_v1.read_namespaced_pod_log(
name=pod_name,
namespace='default'
)
def wait_for_pod_start(self, pod_labels, namespace='default'):
pod_phase = 'No pod running'
while pod_phase != 'Running':
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items
if pods:
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_service_type(self, svc_labels, namespace='default'):
svc_type = ''
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
svc_type = svc.spec.type
return svc_type
def check_service_annotations(self, svc_labels, annotations, namespace='default'):
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
for key, value in annotations.items():
if key not in svc.metadata.annotations or svc.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, svc.metadata.annotation))
return False
return True
def check_statefulset_annotations(self, sset_labels, annotations, namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=sset_labels, limit=1).items
for sset in ssets:
for key, value in annotations.items():
if key not in sset.metadata.annotations or sset.metadata.annotations[key] != value:
print("Expected key {} not found in annotations {}".format(key, sset.metadata.annotation))
return False
return True
def wait_for_pg_to_scale(self, number_of_instances, namespace='default'):
body = {
"spec": {
"numberOfInstances": number_of_instances
}
}
_ = self.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body)
labels = 'application=spilo,cluster-name=acid-minimal-cluster'
while self.count_pods_with_label(labels) != number_of_instances:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_running_pods(self, labels, number, namespace=''):
while self.count_pods_with_label(labels) != number:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_pods_to_stop(self, labels, namespace=''):
while self.count_pods_with_label(labels) != 0:
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_service(self, labels, namespace='default'):
def get_services():
return self.api.core_v1.list_namespaced_service(
namespace, label_selector=labels
).items
while not get_services():
time.sleep(self.RETRY_TIMEOUT_SEC)
def count_pods_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items)
def count_services_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items)
def count_endpoints_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items)
def count_secrets_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items)
def count_statefulsets_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=labels).items)
def count_deployments_with_label(self, labels, namespace='default'):
return len(self.api.apps_v1.list_namespaced_deployment(namespace, label_selector=labels).items)
def count_pdbs_with_label(self, labels, namespace='default'):
return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget(
namespace, label_selector=labels).items)
def count_running_pods(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
return len(list(filter(lambda x: x.status.phase=='Running', pods)))
def wait_for_pod_failover(self, failover_targets, labels, namespace='default'):
pod_phase = 'Failing over'
new_pod_node = ''
while (pod_phase != 'Running') or (new_pod_node not in failover_targets):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
if pods:
new_pod_node = pods[0].spec.node_name
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_logical_backup_job(self, namespace='default'):
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
def wait_for_logical_backup_job(self, expected_num_of_jobs):
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
time.sleep(self.RETRY_TIMEOUT_SEC)
def wait_for_logical_backup_job_deletion(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=0)
def wait_for_logical_backup_job_creation(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
def delete_operator_pod(self, step="Delete operator deplyment"):
operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.apps_v1.patch_namespaced_deployment("postgres-operator","default", {"spec":{"template":{"metadata":{"annotations":{"step":"{}-{}".format(step, time.time())}}}}})
self.wait_for_operator_pod_start()
def update_config(self, config_map_patch, step="Updating operator deployment"):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
self.delete_operator_pod(step=step)
def create_with_kubectl(self, path):
return subprocess.run(
["kubectl", "apply", "-f", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def exec_with_kubectl(self, pod, cmd):
return subprocess.run(["./exec.sh", pod, cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def get_patroni_state(self, pod):
r = self.exec_with_kubectl(pod, "patronictl list -f json")
if not r.returncode == 0 or not r.stdout.decode()[0:1]=="[":
return []
return json.loads(r.stdout.decode())
def get_patroni_running_members(self, pod):
result = self.get_patroni_state(pod)
return list(filter(lambda x: x["State"]=="running", result))
def get_statefulset_image(self, label_selector="application=spilo,cluster-name=acid-minimal-cluster", namespace='default'):
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=label_selector, limit=1)
if len(ssets.items) == 0:
return None
return ssets.items[0].spec.template.spec.containers[0].image
def get_effective_pod_image(self, pod_name, namespace='default'):
'''
Get the Spilo image pod currently uses. In case of lazy rolling updates
it may differ from the one specified in the stateful set.
'''
pod = self.api.core_v1.list_namespaced_pod(
namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name)
if len(pod.items) == 0:
return None
return pod.items[0].spec.containers[0].image
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,9 +1,12 @@
package controller package controller
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"os" "os"
"strings"
"sync" "sync"
"time" "time"
@ -190,10 +193,18 @@ func (c *Controller) warnOnDeprecatedOperatorParameters() {
} }
} }
func compactValue(v string) string {
var compact bytes.Buffer
if err := json.Compact(&compact, []byte(v)); err != nil {
panic("Hard coded json strings broken!")
}
return compact.String()
}
func (c *Controller) initPodServiceAccount() { func (c *Controller) initPodServiceAccount() {
if c.opConfig.PodServiceAccountDefinition == "" { if c.opConfig.PodServiceAccountDefinition == "" {
c.opConfig.PodServiceAccountDefinition = ` stringValue := `
{ {
"apiVersion": "v1", "apiVersion": "v1",
"kind": "ServiceAccount", "kind": "ServiceAccount",
@ -201,6 +212,9 @@ func (c *Controller) initPodServiceAccount() {
"name": "postgres-pod" "name": "postgres-pod"
} }
}` }`
c.opConfig.PodServiceAccountDefinition = compactValue(stringValue)
} }
// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation // re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
@ -230,7 +244,7 @@ func (c *Controller) initRoleBinding() {
// operator binds it to the cluster role with sufficient privileges // operator binds it to the cluster role with sufficient privileges
// we assume the role is created by the k8s administrator // we assume the role is created by the k8s administrator
if c.opConfig.PodServiceAccountRoleBindingDefinition == "" { if c.opConfig.PodServiceAccountRoleBindingDefinition == "" {
c.opConfig.PodServiceAccountRoleBindingDefinition = fmt.Sprintf(` stringValue := fmt.Sprintf(`
{ {
"apiVersion": "rbac.authorization.k8s.io/v1", "apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "RoleBinding", "kind": "RoleBinding",
@ -249,6 +263,7 @@ func (c *Controller) initRoleBinding() {
} }
] ]
}`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name) }`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name)
c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue)
} }
c.logger.Info("Parse role bindings") c.logger.Info("Parse role bindings")
// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation // re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
@ -267,7 +282,14 @@ func (c *Controller) initRoleBinding() {
} }
// actual roles bindings are deployed at the time of Postgres/Spilo cluster creation // actual roles bindings ar*logrus.Entrye deployed at the time of Postgres/Spilo cluster creation
}
func logMultiLineConfig(log *logrus.Entry, config string) {
lines := strings.Split(config, "\n")
for _, l := range lines {
log.Infof("%s", l)
}
} }
func (c *Controller) initController() { func (c *Controller) initController() {
@ -302,7 +324,7 @@ func (c *Controller) initController() {
c.logger.Logger.Level = logrus.DebugLevel c.logger.Logger.Level = logrus.DebugLevel
} }
c.logger.Infof("config: %s", c.opConfig.MustMarshal()) logMultiLineConfig(c.logger, c.opConfig.MustMarshal())
roleDefs := c.getInfrastructureRoleDefinitions() roleDefs := c.getInfrastructureRoleDefinitions()
if infraRoles, err := c.getInfrastructureRoles(roleDefs); err != nil { if infraRoles, err := c.getInfrastructureRoles(roleDefs); err != nil {

View File

@ -199,7 +199,7 @@ type Config struct {
// MustMarshal marshals the config or panics // MustMarshal marshals the config or panics
func (c Config) MustMarshal() string { func (c Config) MustMarshal() string {
b, err := json.MarshalIndent(c, "", "\t") b, err := json.MarshalIndent(c, "", " ")
if err != nil { if err != nil {
panic(err) panic(err)
} }