1030 lines
42 KiB
Python
1030 lines
42 KiB
Python
import json
|
|
import unittest
|
|
import time
|
|
import timeout_decorator
|
|
import subprocess
|
|
import warnings
|
|
import os
|
|
import yaml
|
|
|
|
from datetime import datetime
|
|
from kubernetes import client, config
|
|
|
|
|
|
def to_selector(labels):
|
|
return ",".join(["=".join(l) for l in labels.items()])
|
|
|
|
|
|
class EndToEndTestCase(unittest.TestCase):
|
|
'''
|
|
Test interaction of the operator with multiple K8s components.
|
|
'''
|
|
|
|
# `kind` pods may stuck in the `Terminating` phase for a few minutes; hence high test timeout
|
|
TEST_TIMEOUT_SEC = 600
|
|
|
|
@classmethod
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def setUpClass(cls):
|
|
'''
|
|
Deploy operator to a "kind" cluster created by run.sh using examples from /manifests.
|
|
This operator deployment is to be shared among all tests.
|
|
|
|
run.sh deletes the 'kind' cluster after successful run along with all operator-related entities.
|
|
In the case of test failure the cluster will stay to enable manual examination;
|
|
next invocation of "make test" will re-create it.
|
|
'''
|
|
|
|
# set a single K8s wrapper for all tests
|
|
k8s = cls.k8s = K8s()
|
|
|
|
# remove existing local storage class and create hostpath class
|
|
k8s.api.storage_v1_api.delete_storage_class("standard")
|
|
|
|
# operator deploys pod service account there on start up
|
|
# needed for test_multi_namespace_support()
|
|
cls.namespace = "test"
|
|
v1_namespace = client.V1Namespace(metadata=client.V1ObjectMeta(name=cls.namespace))
|
|
k8s.api.core_v1.create_namespace(v1_namespace)
|
|
|
|
# submit the most recent operator image built on the Docker host
|
|
with open("manifests/postgres-operator.yaml", 'r+') as f:
|
|
operator_deployment = yaml.safe_load(f)
|
|
operator_deployment["spec"]["template"]["spec"]["containers"][0]["image"] = os.environ['OPERATOR_IMAGE']
|
|
yaml.dump(operator_deployment, f, Dumper=yaml.Dumper)
|
|
|
|
for filename in ["operator-service-account-rbac.yaml",
|
|
"configmap.yaml",
|
|
"postgres-operator.yaml",
|
|
"infrastructure-roles.yaml",
|
|
"infrastructure-roles-new.yaml",
|
|
"e2e-storage-class.yaml"]:
|
|
result = k8s.create_with_kubectl("manifests/" + filename)
|
|
print("stdout: {}, stderr: {}".format(result.stdout, result.stderr))
|
|
|
|
k8s.wait_for_operator_pod_start()
|
|
|
|
actual_operator_image = k8s.api.core_v1.list_namespaced_pod(
|
|
'default', label_selector='name=postgres-operator').items[0].spec.containers[0].image
|
|
print("Tested operator image: {}".format(actual_operator_image)) # shows up after tests finish
|
|
|
|
result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml")
|
|
print('stdout: {}, stderr: {}'.format(result.stdout, result.stderr))
|
|
try:
|
|
k8s.wait_for_pod_start('spilo-role=master')
|
|
k8s.wait_for_pod_start('spilo-role=replica')
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_enable_disable_connection_pooler(self):
|
|
'''
|
|
For a database without connection pooler, then turns it on, scale up,
|
|
turn off and on again. Test with different ways of doing this (via
|
|
enableConnectionPooler or connectionPooler configuration section). At
|
|
the end turn connection pooler off to not interfere with other tests.
|
|
'''
|
|
k8s = self.k8s
|
|
service_labels = {
|
|
'cluster-name': 'acid-minimal-cluster',
|
|
}
|
|
pod_labels = dict({
|
|
'connection-pooler': 'acid-minimal-cluster-pooler',
|
|
})
|
|
|
|
pod_selector = to_selector(pod_labels)
|
|
service_selector = to_selector(service_labels)
|
|
|
|
try:
|
|
# enable connection pooler
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
'acid.zalan.do', 'v1', 'default',
|
|
'postgresqls', 'acid-minimal-cluster',
|
|
{
|
|
'spec': {
|
|
'enableConnectionPooler': True,
|
|
}
|
|
})
|
|
k8s.wait_for_pod_start(pod_selector)
|
|
|
|
pods = k8s.api.core_v1.list_namespaced_pod(
|
|
'default', label_selector=pod_selector
|
|
).items
|
|
|
|
self.assertTrue(pods, 'No connection pooler pods')
|
|
|
|
k8s.wait_for_service(service_selector)
|
|
services = k8s.api.core_v1.list_namespaced_service(
|
|
'default', label_selector=service_selector
|
|
).items
|
|
services = [
|
|
s for s in services
|
|
if s.metadata.name.endswith('pooler')
|
|
]
|
|
|
|
self.assertTrue(services, 'No connection pooler service')
|
|
|
|
# scale up connection pooler deployment
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
'acid.zalan.do', 'v1', 'default',
|
|
'postgresqls', 'acid-minimal-cluster',
|
|
{
|
|
'spec': {
|
|
'connectionPooler': {
|
|
'numberOfInstances': 2,
|
|
},
|
|
}
|
|
})
|
|
|
|
k8s.wait_for_running_pods(pod_selector, 2)
|
|
|
|
# turn it off, keeping configuration section
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
'acid.zalan.do', 'v1', 'default',
|
|
'postgresqls', 'acid-minimal-cluster',
|
|
{
|
|
'spec': {
|
|
'enableConnectionPooler': False,
|
|
}
|
|
})
|
|
k8s.wait_for_pods_to_stop(pod_selector)
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_enable_load_balancer(self):
|
|
'''
|
|
Test if services are updated when enabling/disabling load balancers
|
|
'''
|
|
|
|
k8s = self.k8s
|
|
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
|
|
|
|
try:
|
|
# enable load balancer services
|
|
pg_patch_enable_lbs = {
|
|
"spec": {
|
|
"enableMasterLoadBalancer": True,
|
|
"enableReplicaLoadBalancer": True
|
|
}
|
|
}
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_enable_lbs)
|
|
# wait for service recreation
|
|
time.sleep(60)
|
|
|
|
master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master')
|
|
self.assertEqual(master_svc_type, 'LoadBalancer',
|
|
"Expected LoadBalancer service type for master, found {}".format(master_svc_type))
|
|
|
|
repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica')
|
|
self.assertEqual(repl_svc_type, 'LoadBalancer',
|
|
"Expected LoadBalancer service type for replica, found {}".format(repl_svc_type))
|
|
|
|
# disable load balancer services again
|
|
pg_patch_disable_lbs = {
|
|
"spec": {
|
|
"enableMasterLoadBalancer": False,
|
|
"enableReplicaLoadBalancer": False
|
|
}
|
|
}
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_lbs)
|
|
# wait for service recreation
|
|
time.sleep(60)
|
|
|
|
master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master')
|
|
self.assertEqual(master_svc_type, 'ClusterIP',
|
|
"Expected ClusterIP service type for master, found {}".format(master_svc_type))
|
|
|
|
repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica')
|
|
self.assertEqual(repl_svc_type, 'ClusterIP',
|
|
"Expected ClusterIP service type for replica, found {}".format(repl_svc_type))
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_infrastructure_roles(self):
|
|
'''
|
|
Test using external secrets for infrastructure roles
|
|
'''
|
|
k8s = self.k8s
|
|
# update infrastructure roles description
|
|
secret_name = "postgresql-infrastructure-roles"
|
|
roles = "secretname: postgresql-infrastructure-roles-new, userkey: user, rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon"
|
|
patch_infrastructure_roles = {
|
|
"data": {
|
|
"infrastructure_roles_secret_name": secret_name,
|
|
"infrastructure_roles_secrets": roles,
|
|
},
|
|
}
|
|
k8s.update_config(patch_infrastructure_roles)
|
|
|
|
# wait a little before proceeding
|
|
time.sleep(30)
|
|
|
|
try:
|
|
# check that new roles are represented in the config by requesting the
|
|
# operator configuration via API
|
|
operator_pod = k8s.get_operator_pod()
|
|
get_config_cmd = "wget --quiet -O - localhost:8080/config"
|
|
result = k8s.exec_with_kubectl(operator_pod.metadata.name, get_config_cmd)
|
|
roles_dict = (json.loads(result.stdout)
|
|
.get("controller", {})
|
|
.get("InfrastructureRoles"))
|
|
|
|
self.assertTrue("robot_zmon_acid_monitoring_new" in roles_dict)
|
|
role = roles_dict["robot_zmon_acid_monitoring_new"]
|
|
role.pop("Password", None)
|
|
self.assertDictEqual(role, {
|
|
"Name": "robot_zmon_acid_monitoring_new",
|
|
"Flags": None,
|
|
"MemberOf": ["robot_zmon"],
|
|
"Parameters": None,
|
|
"AdminRole": "",
|
|
"Origin": 2,
|
|
})
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_lazy_spilo_upgrade(self):
|
|
'''
|
|
Test lazy upgrade for the Spilo image: operator changes a stateful set but lets pods run with the old image
|
|
until they are recreated for reasons other than operator's activity. That works because the operator configures
|
|
stateful sets to use "onDelete" pod update policy.
|
|
|
|
The test covers:
|
|
1) enabling lazy upgrade in existing operator deployment
|
|
2) forcing the normal rolling upgrade by changing the operator configmap and restarting its pod
|
|
'''
|
|
|
|
k8s = self.k8s
|
|
|
|
# update docker image in config and enable the lazy upgrade
|
|
conf_image = "registry.opensource.zalan.do/acid/spilo-cdp-12:1.6-p114"
|
|
patch_lazy_spilo_upgrade = {
|
|
"data": {
|
|
"docker_image": conf_image,
|
|
"enable_lazy_spilo_upgrade": "true"
|
|
}
|
|
}
|
|
k8s.update_config(patch_lazy_spilo_upgrade)
|
|
|
|
pod0 = 'acid-minimal-cluster-0'
|
|
pod1 = 'acid-minimal-cluster-1'
|
|
|
|
try:
|
|
# restart the pod to get a container with the new image
|
|
k8s.api.core_v1.delete_namespaced_pod(pod0, 'default')
|
|
time.sleep(60)
|
|
|
|
# lazy update works if the restarted pod and older pods run different Spilo versions
|
|
new_image = k8s.get_effective_pod_image(pod0)
|
|
old_image = k8s.get_effective_pod_image(pod1)
|
|
self.assertNotEqual(new_image, old_image,
|
|
"Lazy updated failed: pods have the same image {}".format(new_image))
|
|
|
|
# sanity check
|
|
assert_msg = "Image {} of a new pod differs from {} in operator conf".format(new_image, conf_image)
|
|
self.assertEqual(new_image, conf_image, assert_msg)
|
|
|
|
# clean up
|
|
unpatch_lazy_spilo_upgrade = {
|
|
"data": {
|
|
"enable_lazy_spilo_upgrade": "false",
|
|
}
|
|
}
|
|
k8s.update_config(unpatch_lazy_spilo_upgrade)
|
|
|
|
# at this point operator will complete the normal rolling upgrade
|
|
# so we additonally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works
|
|
|
|
# XXX there is no easy way to wait until the end of Sync()
|
|
time.sleep(60)
|
|
|
|
image0 = k8s.get_effective_pod_image(pod0)
|
|
image1 = k8s.get_effective_pod_image(pod1)
|
|
|
|
assert_msg = "Disabling lazy upgrade failed: pods still have different images {} and {}".format(image0, image1)
|
|
self.assertEqual(image0, image1, assert_msg)
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_logical_backup_cron_job(self):
|
|
'''
|
|
Ensure we can (a) create the cron job at user request for a specific PG cluster
|
|
(b) update the cluster-wide image for the logical backup pod
|
|
(c) delete the job at user request
|
|
|
|
Limitations:
|
|
(a) Does not run the actual batch job because there is no S3 mock to upload backups to
|
|
(b) Assumes 'acid-minimal-cluster' exists as defined in setUp
|
|
'''
|
|
|
|
k8s = self.k8s
|
|
|
|
# create the cron job
|
|
schedule = "7 7 7 7 *"
|
|
pg_patch_enable_backup = {
|
|
"spec": {
|
|
"enableLogicalBackup": True,
|
|
"logicalBackupSchedule": schedule
|
|
}
|
|
}
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_enable_backup)
|
|
|
|
try:
|
|
k8s.wait_for_logical_backup_job_creation()
|
|
|
|
jobs = k8s.get_logical_backup_job().items
|
|
self.assertEqual(1, len(jobs), "Expected 1 logical backup job, found {}".format(len(jobs)))
|
|
|
|
job = jobs[0]
|
|
self.assertEqual(job.metadata.name, "logical-backup-acid-minimal-cluster",
|
|
"Expected job name {}, found {}"
|
|
.format("logical-backup-acid-minimal-cluster", job.metadata.name))
|
|
self.assertEqual(job.spec.schedule, schedule,
|
|
"Expected {} schedule, found {}"
|
|
.format(schedule, job.spec.schedule))
|
|
|
|
# update the cluster-wide image of the logical backup pod
|
|
image = "test-image-name"
|
|
patch_logical_backup_image = {
|
|
"data": {
|
|
"logical_backup_docker_image": image,
|
|
}
|
|
}
|
|
k8s.update_config(patch_logical_backup_image)
|
|
|
|
jobs = k8s.get_logical_backup_job().items
|
|
actual_image = jobs[0].spec.job_template.spec.template.spec.containers[0].image
|
|
self.assertEqual(actual_image, image,
|
|
"Expected job image {}, found {}".format(image, actual_image))
|
|
|
|
# delete the logical backup cron job
|
|
pg_patch_disable_backup = {
|
|
"spec": {
|
|
"enableLogicalBackup": False,
|
|
}
|
|
}
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_backup)
|
|
k8s.wait_for_logical_backup_job_deletion()
|
|
jobs = k8s.get_logical_backup_job().items
|
|
self.assertEqual(0, len(jobs),
|
|
"Expected 0 logical backup jobs, found {}".format(len(jobs)))
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_min_resource_limits(self):
|
|
'''
|
|
Lower resource limits below configured minimum and let operator fix it
|
|
'''
|
|
k8s = self.k8s
|
|
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
|
|
labels = 'spilo-role=master,' + cluster_label
|
|
_, failover_targets = k8s.get_pg_nodes(cluster_label)
|
|
|
|
# configure minimum boundaries for CPU and memory limits
|
|
minCPULimit = '500m'
|
|
minMemoryLimit = '500Mi'
|
|
patch_min_resource_limits = {
|
|
"data": {
|
|
"min_cpu_limit": minCPULimit,
|
|
"min_memory_limit": minMemoryLimit
|
|
}
|
|
}
|
|
k8s.update_config(patch_min_resource_limits)
|
|
|
|
# lower resource limits below minimum
|
|
pg_patch_resources = {
|
|
"spec": {
|
|
"resources": {
|
|
"requests": {
|
|
"cpu": "10m",
|
|
"memory": "50Mi"
|
|
},
|
|
"limits": {
|
|
"cpu": "200m",
|
|
"memory": "200Mi"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources)
|
|
|
|
try:
|
|
k8s.wait_for_pod_failover(failover_targets, labels)
|
|
k8s.wait_for_pod_start('spilo-role=replica')
|
|
|
|
pods = k8s.api.core_v1.list_namespaced_pod(
|
|
'default', label_selector=labels).items
|
|
self.assert_master_is_unique()
|
|
masterPod = pods[0]
|
|
|
|
self.assertEqual(masterPod.spec.containers[0].resources.limits['cpu'], minCPULimit,
|
|
"Expected CPU limit {}, found {}"
|
|
.format(minCPULimit, masterPod.spec.containers[0].resources.limits['cpu']))
|
|
self.assertEqual(masterPod.spec.containers[0].resources.limits['memory'], minMemoryLimit,
|
|
"Expected memory limit {}, found {}"
|
|
.format(minMemoryLimit, masterPod.spec.containers[0].resources.limits['memory']))
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_multi_namespace_support(self):
|
|
'''
|
|
Create a customized Postgres cluster in a non-default namespace.
|
|
'''
|
|
k8s = self.k8s
|
|
|
|
with open("manifests/complete-postgres-manifest.yaml", 'r+') as f:
|
|
pg_manifest = yaml.safe_load(f)
|
|
pg_manifest["metadata"]["namespace"] = self.namespace
|
|
yaml.dump(pg_manifest, f, Dumper=yaml.Dumper)
|
|
|
|
try:
|
|
k8s.create_with_kubectl("manifests/complete-postgres-manifest.yaml")
|
|
k8s.wait_for_pod_start("spilo-role=master", self.namespace)
|
|
self.assert_master_is_unique(self.namespace, "acid-test-cluster")
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_node_readiness_label(self):
|
|
'''
|
|
Remove node readiness label from master node. This must cause a failover.
|
|
'''
|
|
k8s = self.k8s
|
|
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
|
|
readiness_label = 'lifecycle-status'
|
|
readiness_value = 'ready'
|
|
|
|
try:
|
|
# get nodes of master and replica(s) (expected target of new master)
|
|
current_master_node, current_replica_nodes = k8s.get_pg_nodes(cluster_label)
|
|
num_replicas = len(current_replica_nodes)
|
|
failover_targets = self.get_failover_targets(current_master_node, current_replica_nodes)
|
|
|
|
# add node_readiness_label to potential failover nodes
|
|
patch_readiness_label = {
|
|
"metadata": {
|
|
"labels": {
|
|
readiness_label: readiness_value
|
|
}
|
|
}
|
|
}
|
|
for failover_target in failover_targets:
|
|
k8s.api.core_v1.patch_node(failover_target, patch_readiness_label)
|
|
|
|
# define node_readiness_label in config map which should trigger a failover of the master
|
|
patch_readiness_label_config = {
|
|
"data": {
|
|
"node_readiness_label": readiness_label + ':' + readiness_value,
|
|
}
|
|
}
|
|
k8s.update_config(patch_readiness_label_config)
|
|
new_master_node, new_replica_nodes = self.assert_failover(
|
|
current_master_node, num_replicas, failover_targets, cluster_label)
|
|
|
|
# patch also node where master ran before
|
|
k8s.api.core_v1.patch_node(current_master_node, patch_readiness_label)
|
|
|
|
# wait a little before proceeding with the pod distribution test
|
|
time.sleep(30)
|
|
|
|
# toggle pod anti affinity to move replica away from master node
|
|
self.assert_distributed_pods(new_master_node, new_replica_nodes, cluster_label)
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_scaling(self):
|
|
'''
|
|
Scale up from 2 to 3 and back to 2 pods by updating the Postgres manifest at runtime.
|
|
'''
|
|
k8s = self.k8s
|
|
labels = "application=spilo,cluster-name=acid-minimal-cluster"
|
|
|
|
try:
|
|
k8s.wait_for_pg_to_scale(3)
|
|
self.assertEqual(3, k8s.count_pods_with_label(labels))
|
|
self.assert_master_is_unique()
|
|
|
|
k8s.wait_for_pg_to_scale(2)
|
|
self.assertEqual(2, k8s.count_pods_with_label(labels))
|
|
self.assert_master_is_unique()
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_service_annotations(self):
|
|
'''
|
|
Create a Postgres cluster with service annotations and check them.
|
|
'''
|
|
k8s = self.k8s
|
|
patch_custom_service_annotations = {
|
|
"data": {
|
|
"custom_service_annotations": "foo:bar",
|
|
}
|
|
}
|
|
k8s.update_config(patch_custom_service_annotations)
|
|
|
|
try:
|
|
pg_patch_custom_annotations = {
|
|
"spec": {
|
|
"serviceAnnotations": {
|
|
"annotation.key": "value",
|
|
"foo": "bar",
|
|
}
|
|
}
|
|
}
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_custom_annotations)
|
|
|
|
# wait a little before proceeding
|
|
time.sleep(30)
|
|
annotations = {
|
|
"annotation.key": "value",
|
|
"foo": "bar",
|
|
}
|
|
self.assertTrue(k8s.check_service_annotations(
|
|
"cluster-name=acid-minimal-cluster,spilo-role=master", annotations))
|
|
self.assertTrue(k8s.check_service_annotations(
|
|
"cluster-name=acid-minimal-cluster,spilo-role=replica", annotations))
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
# clean up
|
|
unpatch_custom_service_annotations = {
|
|
"data": {
|
|
"custom_service_annotations": "",
|
|
}
|
|
}
|
|
k8s.update_config(unpatch_custom_service_annotations)
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_statefulset_annotation_propagation(self):
|
|
'''
|
|
Inject annotation to Postgresql CRD and check it's propagation to stateful set
|
|
'''
|
|
k8s = self.k8s
|
|
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
|
|
|
|
patch_sset_propagate_annotations = {
|
|
"data": {
|
|
"downscaler_annotations": "deployment-time,downscaler/*",
|
|
}
|
|
}
|
|
k8s.update_config(patch_sset_propagate_annotations)
|
|
|
|
try:
|
|
pg_crd_annotations = {
|
|
"metadata": {
|
|
"annotations": {
|
|
"deployment-time": "2020-04-30 12:00:00",
|
|
"downscaler/downtime_replicas": "0",
|
|
},
|
|
}
|
|
}
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_crd_annotations)
|
|
|
|
# wait a little before proceeding
|
|
time.sleep(60)
|
|
annotations = {
|
|
"deployment-time": "2020-04-30 12:00:00",
|
|
"downscaler/downtime_replicas": "0",
|
|
}
|
|
self.assertTrue(k8s.check_statefulset_annotations(cluster_label, annotations))
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_taint_based_eviction(self):
|
|
'''
|
|
Add taint "postgres=:NoExecute" to node with master. This must cause a failover.
|
|
'''
|
|
k8s = self.k8s
|
|
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
|
|
|
|
# get nodes of master and replica(s) (expected target of new master)
|
|
current_master_node, current_replica_nodes = k8s.get_pg_nodes(cluster_label)
|
|
num_replicas = len(current_replica_nodes)
|
|
failover_targets = self.get_failover_targets(current_master_node, current_replica_nodes)
|
|
|
|
# taint node with postgres=:NoExecute to force failover
|
|
body = {
|
|
"spec": {
|
|
"taints": [
|
|
{
|
|
"effect": "NoExecute",
|
|
"key": "postgres"
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
try:
|
|
# patch node and test if master is failing over to one of the expected nodes
|
|
k8s.api.core_v1.patch_node(current_master_node, body)
|
|
new_master_node, new_replica_nodes = self.assert_failover(
|
|
current_master_node, num_replicas, failover_targets, cluster_label)
|
|
|
|
# add toleration to pods
|
|
patch_toleration_config = {
|
|
"data": {
|
|
"toleration": "key:postgres,operator:Exists,effect:NoExecute"
|
|
}
|
|
}
|
|
k8s.update_config(patch_toleration_config)
|
|
|
|
# wait a little before proceeding with the pod distribution test
|
|
time.sleep(30)
|
|
|
|
# toggle pod anti affinity to move replica away from master node
|
|
self.assert_distributed_pods(new_master_node, new_replica_nodes, cluster_label)
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
|
def test_x_cluster_deletion(self):
|
|
'''
|
|
Test deletion with configured protection
|
|
'''
|
|
k8s = self.k8s
|
|
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
|
|
|
|
# configure delete protection
|
|
patch_delete_annotations = {
|
|
"data": {
|
|
"delete_annotation_date_key": "delete-date",
|
|
"delete_annotation_name_key": "delete-clustername"
|
|
}
|
|
}
|
|
k8s.update_config(patch_delete_annotations)
|
|
|
|
try:
|
|
# this delete attempt should be omitted because of missing annotations
|
|
k8s.api.custom_objects_api.delete_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster")
|
|
|
|
# check that pods and services are still there
|
|
k8s.wait_for_running_pods(cluster_label, 2)
|
|
k8s.wait_for_service(cluster_label)
|
|
|
|
# recreate Postgres cluster resource
|
|
k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml")
|
|
|
|
# wait a little before proceeding
|
|
time.sleep(10)
|
|
|
|
# add annotations to manifest
|
|
deleteDate = datetime.today().strftime('%Y-%m-%d')
|
|
pg_patch_delete_annotations = {
|
|
"metadata": {
|
|
"annotations": {
|
|
"delete-date": deleteDate,
|
|
"delete-clustername": "acid-minimal-cluster",
|
|
}
|
|
}
|
|
}
|
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_delete_annotations)
|
|
|
|
# wait a little before proceeding
|
|
time.sleep(10)
|
|
k8s.wait_for_running_pods(cluster_label, 2)
|
|
k8s.wait_for_service(cluster_label)
|
|
|
|
# now delete process should be triggered
|
|
k8s.api.custom_objects_api.delete_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster")
|
|
|
|
# wait until cluster is deleted
|
|
time.sleep(120)
|
|
|
|
# check if everything has been deleted
|
|
self.assertEqual(0, k8s.count_pods_with_label(cluster_label))
|
|
self.assertEqual(0, k8s.count_services_with_label(cluster_label))
|
|
self.assertEqual(0, k8s.count_endpoints_with_label(cluster_label))
|
|
self.assertEqual(0, k8s.count_statefulsets_with_label(cluster_label))
|
|
self.assertEqual(0, k8s.count_deployments_with_label(cluster_label))
|
|
self.assertEqual(0, k8s.count_pdbs_with_label(cluster_label))
|
|
self.assertEqual(0, k8s.count_secrets_with_label(cluster_label))
|
|
|
|
except timeout_decorator.TimeoutError:
|
|
print('Operator log: {}'.format(k8s.get_operator_log()))
|
|
raise
|
|
|
|
def get_failover_targets(self, master_node, replica_nodes):
|
|
'''
|
|
If all pods live on the same node, failover will happen to other worker(s)
|
|
'''
|
|
k8s = self.k8s
|
|
k8s_master_exclusion = 'kubernetes.io/hostname!=postgres-operator-e2e-tests-control-plane'
|
|
|
|
failover_targets = [x for x in replica_nodes if x != master_node]
|
|
if len(failover_targets) == 0:
|
|
nodes = k8s.api.core_v1.list_node(label_selector=k8s_master_exclusion)
|
|
for n in nodes.items:
|
|
if n.metadata.name != master_node:
|
|
failover_targets.append(n.metadata.name)
|
|
|
|
return failover_targets
|
|
|
|
def assert_failover(self, current_master_node, num_replicas, failover_targets, cluster_label):
|
|
'''
|
|
Check if master is failing over. The replica should move first to be the switchover target
|
|
'''
|
|
k8s = self.k8s
|
|
k8s.wait_for_pod_failover(failover_targets, 'spilo-role=master,' + cluster_label)
|
|
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
|
|
|
|
new_master_node, new_replica_nodes = k8s.get_pg_nodes(cluster_label)
|
|
self.assertNotEqual(current_master_node, new_master_node,
|
|
"Master on {} did not fail over to one of {}".format(current_master_node, failover_targets))
|
|
self.assertEqual(num_replicas, len(new_replica_nodes),
|
|
"Expected {} replicas, found {}".format(num_replicas, len(new_replica_nodes)))
|
|
self.assert_master_is_unique()
|
|
|
|
return new_master_node, new_replica_nodes
|
|
|
|
def assert_master_is_unique(self, namespace='default', clusterName="acid-minimal-cluster"):
|
|
'''
|
|
Check that there is a single pod in the k8s cluster with the label "spilo-role=master"
|
|
To be called manually after operations that affect pods
|
|
'''
|
|
k8s = self.k8s
|
|
labels = 'spilo-role=master,cluster-name=' + clusterName
|
|
|
|
num_of_master_pods = k8s.count_pods_with_label(labels, namespace)
|
|
self.assertEqual(num_of_master_pods, 1, "Expected 1 master pod, found {}".format(num_of_master_pods))
|
|
|
|
def assert_distributed_pods(self, master_node, replica_nodes, cluster_label):
|
|
'''
|
|
Other tests can lead to the situation that master and replica are on the same node.
|
|
Toggle pod anti affinty to distribute pods accross nodes (replica in particular).
|
|
'''
|
|
k8s = self.k8s
|
|
failover_targets = self.get_failover_targets(master_node, replica_nodes)
|
|
|
|
# enable pod anti affintiy in config map which should trigger movement of replica
|
|
patch_enable_antiaffinity = {
|
|
"data": {
|
|
"enable_pod_antiaffinity": "true"
|
|
}
|
|
}
|
|
k8s.update_config(patch_enable_antiaffinity)
|
|
self.assert_failover(master_node, len(replica_nodes), failover_targets, cluster_label)
|
|
|
|
# now disable pod anti affintiy again which will cause yet another failover
|
|
patch_disable_antiaffinity = {
|
|
"data": {
|
|
"enable_pod_antiaffinity": "false"
|
|
}
|
|
}
|
|
k8s.update_config(patch_disable_antiaffinity)
|
|
k8s.wait_for_pod_start('spilo-role=master')
|
|
k8s.wait_for_pod_start('spilo-role=replica')
|
|
|
|
|
|
class K8sApi:
|
|
|
|
def __init__(self):
|
|
|
|
# https://github.com/kubernetes-client/python/issues/309
|
|
warnings.simplefilter("ignore", ResourceWarning)
|
|
|
|
self.config = config.load_kube_config()
|
|
self.k8s_client = client.ApiClient()
|
|
|
|
self.core_v1 = client.CoreV1Api()
|
|
self.apps_v1 = client.AppsV1Api()
|
|
self.batch_v1_beta1 = client.BatchV1beta1Api()
|
|
self.custom_objects_api = client.CustomObjectsApi()
|
|
self.policy_v1_beta1 = client.PolicyV1beta1Api()
|
|
self.storage_v1_api = client.StorageV1Api()
|
|
|
|
|
|
class K8s:
|
|
'''
|
|
Wraps around K8s api client and helper methods.
|
|
'''
|
|
|
|
RETRY_TIMEOUT_SEC = 10
|
|
|
|
def __init__(self):
|
|
self.api = K8sApi()
|
|
|
|
def get_pg_nodes(self, pg_cluster_name, namespace='default'):
|
|
master_pod_node = ''
|
|
replica_pod_nodes = []
|
|
podsList = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pg_cluster_name)
|
|
for pod in podsList.items:
|
|
if pod.metadata.labels.get('spilo-role') == 'master':
|
|
master_pod_node = pod.spec.node_name
|
|
elif pod.metadata.labels.get('spilo-role') == 'replica':
|
|
replica_pod_nodes.append(pod.spec.node_name)
|
|
|
|
return master_pod_node, replica_pod_nodes
|
|
|
|
def wait_for_operator_pod_start(self):
|
|
self. wait_for_pod_start("name=postgres-operator")
|
|
# HACK operator must register CRD and/or Sync existing PG clusters after start up
|
|
# for local execution ~ 10 seconds suffices
|
|
time.sleep(60)
|
|
|
|
def get_operator_pod(self):
|
|
pods = self.api.core_v1.list_namespaced_pod(
|
|
'default', label_selector='name=postgres-operator'
|
|
).items
|
|
|
|
if pods:
|
|
return pods[0]
|
|
|
|
return None
|
|
|
|
def get_operator_log(self):
|
|
operator_pod = self.get_operator_pod()
|
|
pod_name = operator_pod.metadata.name
|
|
return self.api.core_v1.read_namespaced_pod_log(
|
|
name=pod_name,
|
|
namespace='default'
|
|
)
|
|
|
|
def wait_for_pod_start(self, pod_labels, namespace='default'):
|
|
pod_phase = 'No pod running'
|
|
while pod_phase != 'Running':
|
|
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items
|
|
if pods:
|
|
pod_phase = pods[0].status.phase
|
|
|
|
time.sleep(self.RETRY_TIMEOUT_SEC)
|
|
|
|
def get_service_type(self, svc_labels, namespace='default'):
|
|
svc_type = ''
|
|
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
|
|
for svc in svcs:
|
|
svc_type = svc.spec.type
|
|
return svc_type
|
|
|
|
def check_service_annotations(self, svc_labels, annotations, namespace='default'):
|
|
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
|
|
for svc in svcs:
|
|
for key, value in annotations.items():
|
|
if key not in svc.metadata.annotations or svc.metadata.annotations[key] != value:
|
|
print("Expected key {} not found in annotations {}".format(key, svc.metadata.annotation))
|
|
return False
|
|
return True
|
|
|
|
def check_statefulset_annotations(self, sset_labels, annotations, namespace='default'):
|
|
ssets = self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=sset_labels, limit=1).items
|
|
for sset in ssets:
|
|
for key, value in annotations.items():
|
|
if key not in sset.metadata.annotations or sset.metadata.annotations[key] != value:
|
|
print("Expected key {} not found in annotations {}".format(key, sset.metadata.annotation))
|
|
return False
|
|
return True
|
|
|
|
def wait_for_pg_to_scale(self, number_of_instances, namespace='default'):
|
|
|
|
body = {
|
|
"spec": {
|
|
"numberOfInstances": number_of_instances
|
|
}
|
|
}
|
|
_ = self.api.custom_objects_api.patch_namespaced_custom_object(
|
|
"acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body)
|
|
|
|
labels = 'application=spilo,cluster-name=acid-minimal-cluster'
|
|
while self.count_pods_with_label(labels) != number_of_instances:
|
|
time.sleep(self.RETRY_TIMEOUT_SEC)
|
|
|
|
def wait_for_running_pods(self, labels, number, namespace=''):
|
|
while self.count_pods_with_label(labels) != number:
|
|
time.sleep(self.RETRY_TIMEOUT_SEC)
|
|
|
|
def wait_for_pods_to_stop(self, labels, namespace=''):
|
|
while self.count_pods_with_label(labels) != 0:
|
|
time.sleep(self.RETRY_TIMEOUT_SEC)
|
|
|
|
def wait_for_service(self, labels, namespace='default'):
|
|
def get_services():
|
|
return self.api.core_v1.list_namespaced_service(
|
|
namespace, label_selector=labels
|
|
).items
|
|
|
|
while not get_services():
|
|
time.sleep(self.RETRY_TIMEOUT_SEC)
|
|
|
|
def count_pods_with_label(self, labels, namespace='default'):
|
|
return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items)
|
|
|
|
def count_services_with_label(self, labels, namespace='default'):
|
|
return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items)
|
|
|
|
def count_endpoints_with_label(self, labels, namespace='default'):
|
|
return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items)
|
|
|
|
def count_secrets_with_label(self, labels, namespace='default'):
|
|
return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items)
|
|
|
|
def count_statefulsets_with_label(self, labels, namespace='default'):
|
|
return len(self.api.apps_v1.list_namespaced_stateful_set(namespace, label_selector=labels).items)
|
|
|
|
def count_deployments_with_label(self, labels, namespace='default'):
|
|
return len(self.api.apps_v1.list_namespaced_deployment(namespace, label_selector=labels).items)
|
|
|
|
def count_pdbs_with_label(self, labels, namespace='default'):
|
|
return len(self.api.policy_v1_beta1.list_namespaced_pod_disruption_budget(
|
|
namespace, label_selector=labels).items)
|
|
|
|
def wait_for_pod_failover(self, failover_targets, labels, namespace='default'):
|
|
pod_phase = 'Failing over'
|
|
new_pod_node = ''
|
|
|
|
while (pod_phase != 'Running') or (new_pod_node not in failover_targets):
|
|
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
|
|
if pods:
|
|
new_pod_node = pods[0].spec.node_name
|
|
pod_phase = pods[0].status.phase
|
|
time.sleep(self.RETRY_TIMEOUT_SEC)
|
|
|
|
def get_logical_backup_job(self, namespace='default'):
|
|
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
|
|
|
|
def wait_for_logical_backup_job(self, expected_num_of_jobs):
|
|
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
|
|
time.sleep(self.RETRY_TIMEOUT_SEC)
|
|
|
|
def wait_for_logical_backup_job_deletion(self):
|
|
self.wait_for_logical_backup_job(expected_num_of_jobs=0)
|
|
|
|
def wait_for_logical_backup_job_creation(self):
|
|
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
|
|
|
|
def delete_operator_pod(self):
|
|
operator_pod = self.api.core_v1.list_namespaced_pod(
|
|
'default', label_selector="name=postgres-operator").items[0].metadata.name
|
|
self.api.core_v1.delete_namespaced_pod(operator_pod, "default") # restart reloads the conf
|
|
self.wait_for_operator_pod_start()
|
|
|
|
def update_config(self, config_map_patch):
|
|
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
|
|
self.delete_operator_pod()
|
|
|
|
def create_with_kubectl(self, path):
|
|
return subprocess.run(
|
|
["kubectl", "create", "-f", path],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
|
|
def exec_with_kubectl(self, pod, cmd):
|
|
return subprocess.run(["./exec.sh", pod, cmd],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
|
|
def get_effective_pod_image(self, pod_name, namespace='default'):
|
|
'''
|
|
Get the Spilo image pod currently uses. In case of lazy rolling updates
|
|
it may differ from the one specified in the stateful set.
|
|
'''
|
|
pod = self.api.core_v1.list_namespaced_pod(
|
|
namespace, label_selector="statefulset.kubernetes.io/pod-name=" + pod_name)
|
|
return pod.items[0].spec.containers[0].image
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|