patch/update services on type change (#824)
* use Update when disabling LoadBalancer + added e2e test
This commit is contained in:
		
							parent
							
								
									744c71d16b
								
							
						
					
					
						commit
						3b10dc645d
					
				|  | @ -58,6 +58,55 @@ class EndToEndTestCase(unittest.TestCase): | |||
|         k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") | ||||
|         k8s.wait_for_pod_start('spilo-role=master') | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_enable_load_balancer(self): | ||||
|         ''' | ||||
|         Test if services are updated when enabling/disabling load balancers | ||||
|         ''' | ||||
| 
 | ||||
|         k8s = self.k8s | ||||
|         cluster_label = 'version=acid-minimal-cluster' | ||||
| 
 | ||||
|         # enable load balancer services | ||||
|         pg_patch_enable_lbs = { | ||||
|             "spec": { | ||||
|                 "enableMasterLoadBalancer": True, | ||||
|                 "enableReplicaLoadBalancer": True | ||||
|             } | ||||
|         } | ||||
|         k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||
|             "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_enable_lbs) | ||||
|         # wait for service recreation | ||||
|         time.sleep(60) | ||||
| 
 | ||||
|         master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master') | ||||
|         self.assertEqual(master_svc_type, 'LoadBalancer', | ||||
|                          "Expected LoadBalancer service type for master, found {}".format(master_svc_type)) | ||||
| 
 | ||||
|         repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica') | ||||
|         self.assertEqual(repl_svc_type, 'LoadBalancer', | ||||
|                          "Expected LoadBalancer service type for replica, found {}".format(repl_svc_type)) | ||||
| 
 | ||||
|         # disable load balancer services again | ||||
|         pg_patch_disable_lbs = { | ||||
|             "spec": { | ||||
|                 "enableMasterLoadBalancer": False, | ||||
|                 "enableReplicaLoadBalancer": False | ||||
|             } | ||||
|         } | ||||
|         k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||
|             "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_lbs) | ||||
|         # wait for service recreation | ||||
|         time.sleep(60) | ||||
| 
 | ||||
|         master_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=master') | ||||
|         self.assertEqual(master_svc_type, 'ClusterIP', | ||||
|                          "Expected ClusterIP service type for master, found {}".format(master_svc_type)) | ||||
| 
 | ||||
|         repl_svc_type = k8s.get_service_type(cluster_label + ',spilo-role=replica') | ||||
|         self.assertEqual(repl_svc_type, 'ClusterIP', | ||||
|                          "Expected ClusterIP service type for replica, found {}".format(repl_svc_type)) | ||||
| 
 | ||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||
|     def test_min_resource_limits(self): | ||||
|         ''' | ||||
|  | @ -362,6 +411,13 @@ class K8s: | |||
|                 pod_phase = pods[0].status.phase | ||||
|             time.sleep(self.RETRY_TIMEOUT_SEC) | ||||
| 
 | ||||
|     def get_service_type(self, svc_labels, namespace='default'): | ||||
|         svc_type = '' | ||||
|         svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items | ||||
|         for svc in svcs: | ||||
|             svc_type = svc.spec.type | ||||
|         return svc_type | ||||
| 
 | ||||
|     def check_service_annotations(self, svc_labels, annotations, namespace='default'): | ||||
|         svcs = self.api.core_v1.list_namespaced_service(namespace, label_selector=svc_labels, limit=1).items | ||||
|         for svc in svcs: | ||||
|  |  | |||
|  | @ -114,6 +114,7 @@ rules: | |||
|   - delete | ||||
|   - get | ||||
|   - patch | ||||
|   - update | ||||
| # to CRUD the StatefulSet which controls the Postgres cluster instances | ||||
| - apiGroups: | ||||
|   - apps | ||||
|  |  | |||
|  | @ -366,6 +366,11 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error { | ||||
| 	var ( | ||||
| 		svc *v1.Service | ||||
| 		err error | ||||
| 	) | ||||
| 
 | ||||
| 	c.setProcessName("updating %v service", role) | ||||
| 
 | ||||
| 	if c.Services[role] == nil { | ||||
|  | @ -373,70 +378,6 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error | |||
| 	} | ||||
| 
 | ||||
| 	serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) | ||||
| 	endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta) | ||||
| 	// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
 | ||||
| 	if newService.Spec.Type != c.Services[role].Spec.Type { | ||||
| 		// service type has changed, need to replace the service completely.
 | ||||
| 		// we cannot use just patch the current service, since it may contain attributes incompatible with the new type.
 | ||||
| 		var ( | ||||
| 			currentEndpoint *v1.Endpoints | ||||
| 			err             error | ||||
| 		) | ||||
| 
 | ||||
| 		if role == Master { | ||||
| 			// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
 | ||||
| 			// the addresses stored in it before the service is deleted (deletion of the service removes the endpoint)
 | ||||
| 			currentEndpoint, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("could not get current cluster %s endpoints: %v", role, err) | ||||
| 			} | ||||
| 		} | ||||
| 		err = c.KubeClient.Services(serviceName.Namespace).Delete(serviceName.Name, c.deleteOptions) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not delete service %q: %v", serviceName, err) | ||||
| 		} | ||||
| 
 | ||||
| 		// wait until the service is truly deleted
 | ||||
| 		c.logger.Debugf("waiting for service to be deleted") | ||||
| 
 | ||||
| 		err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, | ||||
| 			func() (bool, error) { | ||||
| 				_, err2 := c.KubeClient.Services(serviceName.Namespace).Get(serviceName.Name, metav1.GetOptions{}) | ||||
| 				if err2 == nil { | ||||
| 					return false, nil | ||||
| 				} | ||||
| 				if k8sutil.ResourceNotFound(err2) { | ||||
| 					return true, nil | ||||
| 				} | ||||
| 				return false, err2 | ||||
| 			}) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not delete service %q: %v", serviceName, err) | ||||
| 		} | ||||
| 
 | ||||
| 		// make sure we clear the stored service and endpoint status if the subsequent create fails.
 | ||||
| 		c.Services[role] = nil | ||||
| 		c.Endpoints[role] = nil | ||||
| 		if role == Master { | ||||
| 			// create the new endpoint using the addresses obtained from the previous one
 | ||||
| 			endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets) | ||||
| 			ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) | ||||
| 			} | ||||
| 
 | ||||
| 			c.Endpoints[role] = ep | ||||
| 		} | ||||
| 
 | ||||
| 		svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not create service %q: %v", serviceName, err) | ||||
| 		} | ||||
| 
 | ||||
| 		c.Services[role] = svc | ||||
| 
 | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	// update the service annotation in order to propagate ELB notation.
 | ||||
| 	if len(newService.ObjectMeta.Annotations) > 0 { | ||||
|  | @ -454,18 +395,30 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	patchData, err := specPatch(newService.Spec) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) | ||||
| 	} | ||||
| 	// now, patch the service spec, but when disabling LoadBalancers do update instead
 | ||||
| 	// patch does not work because of LoadBalancerSourceRanges field (even if set to nil)
 | ||||
| 	oldServiceType := c.Services[role].Spec.Type | ||||
| 	newServiceType := newService.Spec.Type | ||||
| 	if newServiceType == "ClusterIP" && newServiceType != oldServiceType { | ||||
| 		newService.ResourceVersion = c.Services[role].ResourceVersion | ||||
| 		newService.Spec.ClusterIP = c.Services[role].Spec.ClusterIP | ||||
| 		svc, err = c.KubeClient.Services(serviceName.Namespace).Update(newService) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not update service %q: %v", serviceName, err) | ||||
| 		} | ||||
| 	} else { | ||||
| 		patchData, err := specPatch(newService.Spec) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) | ||||
| 		} | ||||
| 
 | ||||
| 	// update the service spec
 | ||||
| 	svc, err := c.KubeClient.Services(serviceName.Namespace).Patch( | ||||
| 		serviceName.Name, | ||||
| 		types.MergePatchType, | ||||
| 		patchData, "") | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not patch service %q: %v", serviceName, err) | ||||
| 		svc, err = c.KubeClient.Services(serviceName.Namespace).Patch( | ||||
| 			serviceName.Name, | ||||
| 			types.MergePatchType, | ||||
| 			patchData, "") | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not patch service %q: %v", serviceName, err) | ||||
| 		} | ||||
| 	} | ||||
| 	c.Services[role] = svc | ||||
| 
 | ||||
|  |  | |||
|  | @ -116,7 +116,7 @@ func (c *Cluster) syncServices() error { | |||
| 		c.logger.Debugf("syncing %s service", role) | ||||
| 
 | ||||
| 		if err := c.syncEndpoint(role); err != nil { | ||||
| 			return fmt.Errorf("could not sync %s endpont: %v", role, err) | ||||
| 			return fmt.Errorf("could not sync %s endpoint: %v", role, err) | ||||
| 		} | ||||
| 
 | ||||
| 		if err := c.syncService(role); err != nil { | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue