use Update when disabling LoadBalancer + added e2e test

This commit is contained in:
Felix Kunde 2020-02-13 11:39:08 +01:00
parent e339b8fe1d
commit 64b5cd1e95
4 changed files with 88 additions and 12 deletions

View File

@ -58,6 +58,56 @@ class EndToEndTestCase(unittest.TestCase):
k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml")
k8s.wait_for_pod_start('spilo-role=master')
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_enable_load_balancer(self):
'''
Test if services are updated when enabling/disabling load balancers
'''
k8s = self.k8s
cluster_label = 'version=acid-minimal-cluster'
# enable load balancer services
pg_patch_enable_lbs = {
"spec": {
"enableMasterLoadBalancer": True,
"enableReplicaLoadBalancer": True
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_enable_lbs)
# wait for service recreation
time.sleep(10)
master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master')
self.assertEqual(master_svc_type, 'LoadBalancer',
"Expected LoadBalancer service type for master, found {}".format(master_svc_type))
repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica')
self.assertEqual(repl_svc_type, 'LoadBalancer',
"Expected LoadBalancer service type for replica, found {}".format(repl_svc_type))
# disable load balancer services again
pg_patch_disable_lbs = {
"spec": {
"enableMasterLoadBalancer": False,
"enableReplicaLoadBalancer": False
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_lbs)
# wait for service recreation
time.sleep(10)
master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master')
self.assertEqual(master_svc_type, 'ClusterIP',
"Expected ClusterIP service type for master, found {}".format(master_svc_type))
repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica')
self.assertEqual(repl_svc_type, 'ClusterIP',
"Expected ClusterIP service type for replica, found {}".format(repl_svc_type))
"""
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_min_resource_limits(self):
'''
@ -287,6 +337,7 @@ class EndToEndTestCase(unittest.TestCase):
}
}
k8s.update_config(unpatch_custom_service_annotations)
"""
def assert_master_is_unique(self, namespace='default', version="acid-minimal-cluster"):
'''
@ -353,6 +404,13 @@ class K8s:
pod_phase = pods[0].status.phase
time.sleep(self.RETRY_TIMEOUT_SEC)
def get_service_type(self, svc_labels, namespace='default'):
svc_type = ''
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:
svc_type = svc.spec.type
return svc_type
def check_service_annotations(self, svc_labels, annotations, namespace='default'):
svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items
for svc in svcs:

View File

@ -114,6 +114,7 @@ rules:
- delete
- get
- patch
- update
# to CRUD the StatefulSet which controls the Postgres cluster instances
- apiGroups:
- apps

View File

@ -366,6 +366,11 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
}
func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error {
var (
svc *v1.Service
err error
)
c.setProcessName("updating %v service", role)
if c.Services[role] == nil {
@ -390,18 +395,30 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
}
}
patchData, err := specPatch(newService.Spec)
if err != nil {
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
}
// when disabling LoadBalancers patch does not work because of LoadBalancerSourceRanges field (even if nil)
oldServiceType := c.Services[role].Spec.Type
newServiceType := newService.Spec.Type
if newServiceType == "ClusterIP" && newServiceType != oldServiceType {
newService.ResourceVersion = c.Services[role].ResourceVersion
newService.Spec.ClusterIP = c.Services[role].Spec.ClusterIP
svc, err = c.KubeClient.Services(serviceName.Namespace).Update(newService)
if err != nil {
return fmt.Errorf("could not update service %q: %v", serviceName, err)
}
} else {
patchData, err := specPatch(newService.Spec)
if err != nil {
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
}
// update the service spec
svc, err := c.KubeClient.Services(serviceName.Namespace).Patch(
serviceName.Name,
types.MergePatchType,
patchData, "")
if err != nil {
return fmt.Errorf("could not patch service %q: %v", serviceName, err)
// update the service spec
svc, err = c.KubeClient.Services(serviceName.Namespace).Patch(
serviceName.Name,
types.MergePatchType,
patchData, "")
if err != nil {
return fmt.Errorf("could not patch service %q: %v", serviceName, err)
}
}
c.Services[role] = svc

View File

@ -116,7 +116,7 @@ func (c *Cluster) syncServices() error {
c.logger.Debugf("syncing %s service", role)
if err := c.syncEndpoint(role); err != nil {
return fmt.Errorf("could not sync %s endpont: %v", role, err)
return fmt.Errorf("could not sync %s endpoint: %v", role, err)
}
if err := c.syncService(role); err != nil {