fix e2e and some flake8

This commit is contained in:
Rafia Sabih 2020-11-04 10:32:36 +01:00
parent 9de57f0182
commit 0a0f1c7488
2 changed files with 161 additions and 119 deletions

View File

@ -1,18 +1,14 @@
import json import json
import unittest
import time import time
import timeout_decorator
import subprocess import subprocess
import warnings import warnings
import os
import yaml
from datetime import datetime
from kubernetes import client, config from kubernetes import client, config
from kubernetes.client.rest import ApiException from kubernetes.client.rest import ApiException
def to_selector(labels): def to_selector(labels):
return ",".join(["=".join(l) for l in labels.items()]) return ",".join(["=".join(lbl) for lbl in labels.items()])
class K8sApi: class K8sApi:
@ -108,7 +104,6 @@ class K8s:
time.sleep(self.RETRY_TIMEOUT_SEC) time.sleep(self.RETRY_TIMEOUT_SEC)
def get_service_type(self, svc_labels, namespace='default'): def get_service_type(self, svc_labels, namespace='default'):
svc_type = '' svc_type = ''
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
@ -211,7 +206,7 @@ class K8s:
self.wait_for_logical_backup_job(expected_num_of_jobs=1) self.wait_for_logical_backup_job(expected_num_of_jobs=1)
def delete_operator_pod(self, step="Delete operator deplyment"): def delete_operator_pod(self, step="Delete operator deplyment"):
operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name # operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.apps_v1.patch_namespaced_deployment("postgres-operator", "default", {"spec": {"template": {"metadata": {"annotations": {"step": "{}-{}".format(step, time.time())}}}}}) self.api.apps_v1.patch_namespaced_deployment("postgres-operator", "default", {"spec": {"template": {"metadata": {"annotations": {"step": "{}-{}".format(step, time.time())}}}}})
self.wait_for_operator_pod_start() self.wait_for_operator_pod_start()
@ -251,7 +246,6 @@ class K8s:
return json.loads(r.stdout.decode()) return json.loads(r.stdout.decode())
def get_patroni_running_members(self, pod="acid-minimal-cluster-0"): def get_patroni_running_members(self, pod="acid-minimal-cluster-0"):
result = self.get_patroni_state(pod) result = self.get_patroni_state(pod)
return list(filter(lambda x: "State" in x and x["State"] == "running", result)) return list(filter(lambda x: "State" in x and x["State"] == "running", result))
@ -463,7 +457,7 @@ class K8sBase:
self.wait_for_logical_backup_job(expected_num_of_jobs=1) self.wait_for_logical_backup_job(expected_num_of_jobs=1)
def delete_operator_pod(self, step="Delete operator deplyment"): def delete_operator_pod(self, step="Delete operator deplyment"):
operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name # operator_pod = self.api.core_v1.list_namespaced_pod('default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.apps_v1.patch_namespaced_deployment("postgres-operator", "default", {"spec": {"template": {"metadata": {"annotations": {"step": "{}-{}".format(step, time.time())}}}}}) self.api.apps_v1.patch_namespaced_deployment("postgres-operator", "default", {"spec": {"template": {"metadata": {"annotations": {"step": "{}-{}".format(step, time.time())}}}}})
self.wait_for_operator_pod_start() self.wait_for_operator_pod_start()
@ -514,10 +508,13 @@ class K8sBase:
""" """
Inspiriational classes towards easier writing of end to end tests with one cluster per test case Inspiriational classes towards easier writing of end to end tests with one cluster per test case
""" """
class K8sOperator(K8sBase): class K8sOperator(K8sBase):
def __init__(self, labels="name=postgres-operator", namespace="default"): def __init__(self, labels="name=postgres-operator", namespace="default"):
super().__init__(labels, namespace) super().__init__(labels, namespace)
class K8sPostgres(K8sBase): class K8sPostgres(K8sBase):
def __init__(self, labels="cluster-name=acid-minimal-cluster", namespace="default"): def __init__(self, labels="cluster-name=acid-minimal-cluster", namespace="default"):
super().__init__(labels, namespace) super().__init__(labels, namespace)

View File

@ -2,21 +2,20 @@ import json
import unittest import unittest
import time import time
import timeout_decorator import timeout_decorator
import subprocess
import warnings
import os import os
import yaml import yaml
from datetime import datetime from datetime import datetime
from kubernetes import client, config from kubernetes import client
from tests.k8s_api import K8s from tests.k8s_api import K8s
SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-12:1.6-p5" SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-12:1.6-p5"
SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-cdp-12:1.6-p114" SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-cdp-12:1.6-p114"
def to_selector(labels): def to_selector(labels):
return ",".join(["=".join(l) for l in labels.items()]) return ",".join(["=".join(lbl) for lbl in labels.items()])
def clean_list(values): def clean_list(values):
@ -163,15 +162,15 @@ class EndToEndTestCase(unittest.TestCase):
the end turn connection pooler off to not interfere with other tests. the end turn connection pooler off to not interfere with other tests.
''' '''
k8s = self.k8s k8s = self.k8s
service_labels = { # service_labels = {
'cluster-name': 'acid-minimal-cluster', # 'cluster-name': 'acid-minimal-cluster',
} # }
pod_labels = dict({ # pod_labels = dict({
'connection-pooler': 'acid-minimal-cluster-pooler', # 'connection-pooler': 'acid-minimal-cluster-pooler',
}) # })
pod_selector = to_selector(pod_labels) # pod_selector = to_selector(pod_labels)
service_selector = to_selector(service_labels) # service_selector = to_selector(service_labels)
# enable connection pooler # enable connection pooler
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
@ -184,10 +183,17 @@ class EndToEndTestCase(unittest.TestCase):
} }
}) })
self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2, "Deployment replicas is 2 default") self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2,
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), 2, "No pooler pods found") "Deployment replicas is 2 default")
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler-repl"), 2, "No pooler replica pods found") self.eventuallyEqual(lambda: k8s.count_running_pods(
self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), 2, "No pooler service found") "connection-pooler=acid-minimal-cluster-pooler"),
2, "No pooler pods found")
self.eventuallyEqual(lambda: k8s.count_running_pods(
"connection-pooler=acid-minimal-cluster-pooler-repl"),
2, "No pooler replica pods found")
self.eventuallyEqual(lambda: k8s.count_services_with_label(
'application=db-connection-pooler,cluster-name=acid-minimal-cluster'),
2, "No pooler service found")
# Turn off only master connection pooler # Turn off only master connection pooler
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
@ -200,11 +206,19 @@ class EndToEndTestCase(unittest.TestCase):
} }
}) })
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0":"idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2, "Deployment replicas is 2 default") "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), 0, "Master pooler pods not deleted") self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2,
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler-repl"), 2, "Pooler replica pods not found") "Deployment replicas is 2 default")
self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), 1, "No pooler service found") self.eventuallyEqual(lambda: k8s.count_running_pods(
"connection-pooler=acid-minimal-cluster-pooler"),
0, "Master pooler pods not deleted")
self.eventuallyEqual(lambda: k8s.count_running_pods(
"connection-pooler=acid-minimal-cluster-pooler-repl"),
2, "Pooler replica pods not found")
self.eventuallyEqual(lambda: k8s.count_services_with_label(
'application=db-connection-pooler,cluster-name=acid-minimal-cluster'),
1, "No pooler service found")
# Turn off only replica connection pooler # Turn off only replica connection pooler
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
@ -217,12 +231,16 @@ class EndToEndTestCase(unittest.TestCase):
} }
}) })
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0":"idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2, "Deployment replicas is 2 default") "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), 2, "Master pooler pods not found") self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2,
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler-repl"), 0, "Pooler replica pods not deleted") "Deployment replicas is 2 default")
self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), 1, "No pooler service found") self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"),
2, "Master pooler pods not found")
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler-repl"),
0, "Pooler replica pods not deleted")
self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'),
1, "No pooler service found")
# scale up connection pooler deployment # scale up connection pooler deployment
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
@ -236,8 +254,10 @@ class EndToEndTestCase(unittest.TestCase):
} }
}) })
self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 3, "Deployment replicas is scaled to 3") self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 3,
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), 3, "Scale up of pooler pods does not work") "Deployment replicas is scaled to 3")
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"),
3, "Scale up of pooler pods does not work")
# turn it off, keeping config should be overwritten by false # turn it off, keeping config should be overwritten by false
k8s.api.custom_objects_api.patch_namespaced_custom_object( k8s.api.custom_objects_api.patch_namespaced_custom_object(
@ -251,8 +271,10 @@ class EndToEndTestCase(unittest.TestCase):
}) })
self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), 0, "Pooler pods not scaled down") self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"),
self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), 0, "Pooler service not removed") 0, "Pooler pods not scaled down")
self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'),
0, "Pooler service not removed")
# Verify that all the databases have pooler schema installed. # Verify that all the databases have pooler schema installed.
# Do this via psql, since otherwise we need to deal with # Do this via psql, since otherwise we need to deal with
@ -379,7 +401,8 @@ class EndToEndTestCase(unittest.TestCase):
k8s = self.k8s k8s = self.k8s
# update infrastructure roles description # update infrastructure roles description
secret_name = "postgresql-infrastructure-roles" secret_name = "postgresql-infrastructure-roles"
roles = "secretname: postgresql-infrastructure-roles-new, userkey: user, rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon" roles = "secretname: postgresql-infrastructure-roles-new, userkey: user,"\
"rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon"
patch_infrastructure_roles = { patch_infrastructure_roles = {
"data": { "data": {
"infrastructure_roles_secret_name": secret_name, "infrastructure_roles_secret_name": secret_name,
@ -387,7 +410,8 @@ class EndToEndTestCase(unittest.TestCase):
}, },
} }
k8s.update_config(patch_infrastructure_roles) k8s.update_config(patch_infrastructure_roles)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0":"idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
"Operator does not get in sync")
try: try:
# check that new roles are represented in the config by requesting the # check that new roles are represented in the config by requesting the
@ -397,7 +421,8 @@ class EndToEndTestCase(unittest.TestCase):
try: try:
operator_pod = k8s.get_operator_pod() operator_pod = k8s.get_operator_pod()
get_config_cmd = "wget --quiet -O - localhost:8080/config" get_config_cmd = "wget --quiet -O - localhost:8080/config"
result = k8s.exec_with_kubectl(operator_pod.metadata.name, get_config_cmd) result = k8s.exec_with_kubectl(operator_pod.metadata.name,
get_config_cmd)
try: try:
roles_dict = (json.loads(result.stdout) roles_dict = (json.loads(result.stdout)
.get("controller", {}) .get("controller", {})
@ -424,7 +449,6 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyTrue(verify_role, "infrastructure role setup is not loaded") self.eventuallyTrue(verify_role, "infrastructure role setup is not loaded")
except timeout_decorator.TimeoutError: except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log())) print('Operator log: {}'.format(k8s.get_operator_log()))
raise raise
@ -432,13 +456,15 @@ class EndToEndTestCase(unittest.TestCase):
@timeout_decorator.timeout(TEST_TIMEOUT_SEC) @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_lazy_spilo_upgrade(self): def test_lazy_spilo_upgrade(self):
''' '''
Test lazy upgrade for the Spilo image: operator changes a stateful set but lets pods run with the old image Test lazy upgrade for the Spilo image: operator changes a stateful set
until they are recreated for reasons other than operator's activity. That works because the operator configures but lets pods run with the old image until they are recreated for
stateful sets to use "onDelete" pod update policy. reasons other than operator's activity. That works because the operator
configures stateful sets to use "onDelete" pod update policy.
The test covers: The test covers:
1) enabling lazy upgrade in existing operator deployment 1) enabling lazy upgrade in existing operator deployment
2) forcing the normal rolling upgrade by changing the operator configmap and restarting its pod 2) forcing the normal rolling upgrade by changing the operator
configmap and restarting its pod
''' '''
k8s = self.k8s k8s = self.k8s
@ -446,8 +472,10 @@ class EndToEndTestCase(unittest.TestCase):
pod0 = 'acid-minimal-cluster-0' pod0 = 'acid-minimal-cluster-0'
pod1 = 'acid-minimal-cluster-1' pod1 = 'acid-minimal-cluster-1'
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2,
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running") "No 2 pods running")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)),
2, "Postgres status did not enter running")
patch_lazy_spilo_upgrade = { patch_lazy_spilo_upgrade = {
"data": { "data": {
@ -455,14 +483,20 @@ class EndToEndTestCase(unittest.TestCase):
"enable_lazy_spilo_upgrade": "false" "enable_lazy_spilo_upgrade": "false"
} }
} }
k8s.update_config(patch_lazy_spilo_upgrade, step="Init baseline image version") k8s.update_config(patch_lazy_spilo_upgrade,
step="Init baseline image version")
self.eventuallyEqual(lambda: k8s.get_statefulset_image(), SPILO_CURRENT, "Statefulset not updated initially") self.eventuallyEqual(lambda: k8s.get_statefulset_image(), SPILO_CURRENT,
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") "Statefulset not updated initially")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2,
"No 2 pods running")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)),
2, "Postgres status did not enter running")
self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), SPILO_CURRENT, "Rolling upgrade was not executed") self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0),
self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), SPILO_CURRENT, "Rolling upgrade was not executed") SPILO_CURRENT, "Rolling upgrade was not executed")
self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1),
SPILO_CURRENT, "Rolling upgrade was not executed")
# update docker image in config and enable the lazy upgrade # update docker image in config and enable the lazy upgrade
conf_image = SPILO_LAZY conf_image = SPILO_LAZY
@ -472,18 +506,25 @@ class EndToEndTestCase(unittest.TestCase):
"enable_lazy_spilo_upgrade": "true" "enable_lazy_spilo_upgrade": "true"
} }
} }
k8s.update_config(patch_lazy_spilo_upgrade,step="patch image and lazy upgrade") k8s.update_config(patch_lazy_spilo_upgrade,
self.eventuallyEqual(lambda: k8s.get_statefulset_image(), conf_image, "Statefulset not updated to next Docker image") step="patch image and lazy upgrade")
self.eventuallyEqual(lambda: k8s.get_statefulset_image(), conf_image,
"Statefulset not updated to next Docker image")
try: try:
# restart the pod to get a container with the new image # restart the pod to get a container with the new image
k8s.api.core_v1.delete_namespaced_pod(pod0, 'default') k8s.api.core_v1.delete_namespaced_pod(pod0, 'default')
# verify only pod-0 which was deleted got new image from statefulset # verify only pod-0 which was deleted got new image from statefulset
self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), conf_image, "Delete pod-0 did not get new spilo image") self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0),
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") conf_image, "Delete pod-0 did not get new spilo image")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running") self.eventuallyEqual(lambda: k8s.count_running_pods(), 2,
self.assertNotEqual(lambda: k8s.get_effective_pod_image(pod1), SPILO_CURRENT, "pod-1 should not have change Docker image to {}".format(SPILO_CURRENT)) "No two pods running after lazy rolling upgrade")
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)),
2, "Postgres status did not enter running")
self.assertNotEqual(lambda: k8s.get_effective_pod_image(pod1),
SPILO_CURRENT,
"pod-1 should not have change Docker image to {}".format(SPILO_CURRENT))
# clean up # clean up
unpatch_lazy_spilo_upgrade = { unpatch_lazy_spilo_upgrade = {
@ -495,9 +536,14 @@ class EndToEndTestCase(unittest.TestCase):
# at this point operator will complete the normal rolling upgrade # at this point operator will complete the normal rolling upgrade
# so we additonally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works # so we additonally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works
self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), conf_image, "Rolling upgrade was not executed", 50, 3) self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0),
self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), conf_image, "Rolling upgrade was not executed", 50, 3) conf_image, "Rolling upgrade was not executed",
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), 2, "Postgres status did not enter running") 50, 3)
self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1),
conf_image, "Rolling upgrade was not executed",
50, 3)
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)),
2, "Postgres status did not enter running")
except timeout_decorator.TimeoutError: except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log())) print('Operator log: {}'.format(k8s.get_operator_log()))
@ -794,7 +840,6 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing") self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing")
@timeout_decorator.timeout(TEST_TIMEOUT_SEC) @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
@unittest.skip("Skipping this test until fixed") @unittest.skip("Skipping this test until fixed")
def test_zzz_taint_based_eviction(self): def test_zzz_taint_based_eviction(self):
@ -863,7 +908,7 @@ class EndToEndTestCase(unittest.TestCase):
} }
} }
k8s.update_config(patch_delete_annotations) k8s.update_config(patch_delete_annotations)
time.sleep(5) time.sleep(15)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
try: try: