rename service field to services as it contains service per role
This commit is contained in:
		
							parent
							
								
									5a7a3fec17
								
							
						
					
					
						commit
						272d7e1bcf
					
				|  | @ -41,7 +41,7 @@ type Config struct { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type kubeResources struct { | type kubeResources struct { | ||||||
| 	Service     map[postgresRole]*v1.Service | 	Services    map[postgresRole]*v1.Service | ||||||
| 	Endpoint    *v1.Endpoints | 	Endpoint    *v1.Endpoints | ||||||
| 	Secrets     map[types.UID]*v1.Secret | 	Secrets     map[types.UID]*v1.Secret | ||||||
| 	Statefulset *v1beta1.StatefulSet | 	Statefulset *v1beta1.StatefulSet | ||||||
|  | @ -96,7 +96,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql | ||||||
| 		pgUsers:          make(map[string]spec.PgUser), | 		pgUsers:          make(map[string]spec.PgUser), | ||||||
| 		systemUsers:      make(map[string]spec.PgUser), | 		systemUsers:      make(map[string]spec.PgUser), | ||||||
| 		podSubscribers:   make(map[spec.NamespacedName]chan spec.PodEvent), | 		podSubscribers:   make(map[spec.NamespacedName]chan spec.PodEvent), | ||||||
| 		kubeResources:    kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[postgresRole]*v1.Service)}, | 		kubeResources:    kubeResources{Secrets: make(map[types.UID]*v1.Secret), Services: make(map[postgresRole]*v1.Service)}, | ||||||
| 		masterLess:       false, | 		masterLess:       false, | ||||||
| 		userSyncStrategy: users.DefaultUserSyncStrategy{}, | 		userSyncStrategy: users.DefaultUserSyncStrategy{}, | ||||||
| 		deleteOptions:    &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, | 		deleteOptions:    &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, | ||||||
|  | @ -246,11 +246,11 @@ func (c *Cluster) Create() error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match bool, reason string) { | func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match bool, reason string) { | ||||||
| 	//TODO: improve comparison
 | 	//TODO: improve comparison
 | ||||||
| 	if c.Service[role].Spec.Type != service.Spec.Type { | 	if c.Services[role].Spec.Type != service.Spec.Type { | ||||||
| 		return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q", | 		return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q", | ||||||
| 			role, service.Spec.Type, c.Service[role].Spec.Type) | 			role, service.Spec.Type, c.Services[role].Spec.Type) | ||||||
| 	} | 	} | ||||||
| 	oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges | 	oldSourceRanges := c.Services[role].Spec.LoadBalancerSourceRanges | ||||||
| 	newSourceRanges := service.Spec.LoadBalancerSourceRanges | 	newSourceRanges := service.Spec.LoadBalancerSourceRanges | ||||||
| 	/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ | 	/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ | ||||||
| 	if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) { | 	if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) { | ||||||
|  | @ -260,7 +260,7 @@ func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match | ||||||
| 		return false, fmt.Sprintf("new %s service's LoadBalancerSourceRange doesn't match the current one", role) | 		return false, fmt.Sprintf("new %s service's LoadBalancerSourceRange doesn't match the current one", role) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation] | 	oldDNSAnnotation := c.Services[role].Annotations[constants.ZalandoDNSNameAnnotation] | ||||||
| 	newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] | 	newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] | ||||||
| 	if oldDNSAnnotation != newDNSAnnotation { | 	if oldDNSAnnotation != newDNSAnnotation { | ||||||
| 		return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) | 		return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) | ||||||
|  | @ -445,12 +445,12 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||||
| 		} | 		} | ||||||
| 		newService := c.generateService(role, &newSpec.Spec) | 		newService := c.generateService(role, &newSpec.Spec) | ||||||
| 		if match, reason := c.sameServiceWith(role, newService); !match { | 		if match, reason := c.sameServiceWith(role, newService); !match { | ||||||
| 			c.logServiceChanges(role, c.Service[role], newService, true, reason) | 			c.logServiceChanges(role, c.Services[role], newService, true, reason) | ||||||
| 			if err := c.updateService(role, newService); err != nil { | 			if err := c.updateService(role, newService); err != nil { | ||||||
| 				c.setStatus(spec.ClusterStatusUpdateFailed) | 				c.setStatus(spec.ClusterStatusUpdateFailed) | ||||||
| 				return fmt.Errorf("could not update %s service: %v", role, err) | 				return fmt.Errorf("could not update %s service: %v", role, err) | ||||||
| 			} | 			} | ||||||
| 			c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) | 			c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Services[role].ObjectMeta)) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -31,9 +31,9 @@ func (c *Cluster) loadResources() error { | ||||||
| 	for i, svc := range services.Items { | 	for i, svc := range services.Items { | ||||||
| 		switch postgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) { | 		switch postgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) { | ||||||
| 		case replica: | 		case replica: | ||||||
| 			c.Service[replica] = &services.Items[i] | 			c.Services[replica] = &services.Items[i] | ||||||
| 		default: | 		default: | ||||||
| 			c.Service[master] = &services.Items[i] | 			c.Services[master] = &services.Items[i] | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -91,7 +91,7 @@ func (c *Cluster) listResources() error { | ||||||
| 		c.logger.Infof("found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) | 		c.logger.Infof("found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for role, service := range c.Service { | 	for role, service := range c.Services { | ||||||
| 		c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) | 		c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -231,7 +231,7 @@ func (c *Cluster) deleteStatefulSet() error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) createService(role postgresRole) (*v1.Service, error) { | func (c *Cluster) createService(role postgresRole) (*v1.Service, error) { | ||||||
| 	if c.Service[role] != nil { | 	if c.Services[role] != nil { | ||||||
| 		return nil, fmt.Errorf("service already exists in the cluster") | 		return nil, fmt.Errorf("service already exists in the cluster") | ||||||
| 	} | 	} | ||||||
| 	serviceSpec := c.generateService(role, &c.Spec) | 	serviceSpec := c.generateService(role, &c.Spec) | ||||||
|  | @ -241,18 +241,18 @@ func (c *Cluster) createService(role postgresRole) (*v1.Service, error) { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.Service[role] = service | 	c.Services[role] = service | ||||||
| 	return service, nil | 	return service, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error { | func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error { | ||||||
| 	if c.Service[role] == nil { | 	if c.Services[role] == nil { | ||||||
| 		return fmt.Errorf("there is no service in the cluster") | 		return fmt.Errorf("there is no service in the cluster") | ||||||
| 	} | 	} | ||||||
| 	serviceName := util.NameFromMeta(c.Service[role].ObjectMeta) | 	serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) | ||||||
| 	endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta) | 	endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta) | ||||||
| 	// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
 | 	// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
 | ||||||
| 	if newService.Spec.Type != c.Service[role].Spec.Type { | 	if newService.Spec.Type != c.Services[role].Spec.Type { | ||||||
| 		// service type has changed, need to replace the service completely.
 | 		// service type has changed, need to replace the service completely.
 | ||||||
| 		// we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
 | 		// we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
 | ||||||
| 		var ( | 		var ( | ||||||
|  | @ -263,12 +263,12 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error | ||||||
| 		if role == master { | 		if role == master { | ||||||
| 			// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
 | 			// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
 | ||||||
| 			// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
 | 			// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
 | ||||||
| 			currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, metav1.GetOptions{}) | 			currentEndpoint, err = c.KubeClient.Endpoints(c.Services[role].Namespace).Get(c.Services[role].Name, metav1.GetOptions{}) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return fmt.Errorf("could not get current cluster endpoints: %v", err) | 				return fmt.Errorf("could not get current cluster endpoints: %v", err) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) | 		err = c.KubeClient.Services(c.Services[role].Namespace).Delete(c.Services[role].Name, c.deleteOptions) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not delete service %q: %v", serviceName, err) | 			return fmt.Errorf("could not delete service %q: %v", serviceName, err) | ||||||
| 		} | 		} | ||||||
|  | @ -277,11 +277,11 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not create service %q: %v", serviceName, err) | 			return fmt.Errorf("could not create service %q: %v", serviceName, err) | ||||||
| 		} | 		} | ||||||
| 		c.Service[role] = svc | 		c.Services[role] = svc | ||||||
| 		if role == master { | 		if role == master { | ||||||
| 			// create the new endpoint using the addresses obtained from the previous one
 | 			// create the new endpoint using the addresses obtained from the previous one
 | ||||||
| 			endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) | 			endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) | ||||||
| 			ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) | 			ep, err := c.KubeClient.Endpoints(c.Services[role].Namespace).Create(endpointSpec) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) | 				return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) | ||||||
| 			} | 			} | ||||||
|  | @ -293,8 +293,8 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error | ||||||
| 	if len(newService.ObjectMeta.Annotations) > 0 { | 	if len(newService.ObjectMeta.Annotations) > 0 { | ||||||
| 		annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations) | 		annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations) | ||||||
| 
 | 
 | ||||||
| 		_, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( | 		_, err := c.KubeClient.Services(c.Services[role].Namespace).Patch( | ||||||
| 			c.Service[role].Name, | 			c.Services[role].Name, | ||||||
| 			types.StrategicMergePatchType, | 			types.StrategicMergePatchType, | ||||||
| 			[]byte(annotationsPatchData), "") | 			[]byte(annotationsPatchData), "") | ||||||
| 
 | 
 | ||||||
|  | @ -308,30 +308,30 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error | ||||||
| 		return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) | 		return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( | 	svc, err := c.KubeClient.Services(c.Services[role].Namespace).Patch( | ||||||
| 		c.Service[role].Name, | 		c.Services[role].Name, | ||||||
| 		types.MergePatchType, | 		types.MergePatchType, | ||||||
| 		patchData, "") | 		patchData, "") | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not patch service %q: %v", serviceName, err) | 		return fmt.Errorf("could not patch service %q: %v", serviceName, err) | ||||||
| 	} | 	} | ||||||
| 	c.Service[role] = svc | 	c.Services[role] = svc | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) deleteService(role postgresRole) error { | func (c *Cluster) deleteService(role postgresRole) error { | ||||||
| 	c.logger.Debugf("deleting service %s", role) | 	c.logger.Debugf("deleting service %s", role) | ||||||
| 	if c.Service[role] == nil { | 	if c.Services[role] == nil { | ||||||
| 		return fmt.Errorf("there is no %s service in the cluster", role) | 		return fmt.Errorf("there is no %s service in the cluster", role) | ||||||
| 	} | 	} | ||||||
| 	service := c.Service[role] | 	service := c.Services[role] | ||||||
| 	err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions) | 	err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) | 	c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) | ||||||
| 	c.Service[role] = nil | 	c.Services[role] = nil | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -422,12 +422,12 @@ func (c *Cluster) createRoles() (err error) { | ||||||
| 
 | 
 | ||||||
| // GetServiceMaster returns cluster's kubernetes master Service
 | // GetServiceMaster returns cluster's kubernetes master Service
 | ||||||
| func (c *Cluster) GetServiceMaster() *v1.Service { | func (c *Cluster) GetServiceMaster() *v1.Service { | ||||||
| 	return c.Service[master] | 	return c.Services[master] | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // GetServiceReplica returns cluster's kubernetes replica Service
 | // GetServiceReplica returns cluster's kubernetes replica Service
 | ||||||
| func (c *Cluster) GetServiceReplica() *v1.Service { | func (c *Cluster) GetServiceReplica() *v1.Service { | ||||||
| 	return c.Service[replica] | 	return c.Services[replica] | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // GetEndpoint returns cluster's kubernetes Endpoint
 | // GetEndpoint returns cluster's kubernetes Endpoint
 | ||||||
|  |  | ||||||
|  | @ -43,7 +43,7 @@ func (c *Cluster) Sync() error { | ||||||
| 	c.logger.Debugf("syncing services") | 	c.logger.Debugf("syncing services") | ||||||
| 	for _, role := range []postgresRole{master, replica} { | 	for _, role := range []postgresRole{master, replica} { | ||||||
| 		if role == replica && !c.Spec.ReplicaLoadBalancer { | 		if role == replica && !c.Spec.ReplicaLoadBalancer { | ||||||
| 			if c.Service[role] != nil { | 			if c.Services[role] != nil { | ||||||
| 				// delete the left over replica service
 | 				// delete the left over replica service
 | ||||||
| 				if err := c.deleteService(role); err != nil { | 				if err := c.deleteService(role); err != nil { | ||||||
| 					return fmt.Errorf("could not delete obsolete %s service: %v", role, err) | 					return fmt.Errorf("could not delete obsolete %s service: %v", role, err) | ||||||
|  | @ -82,7 +82,7 @@ func (c *Cluster) Sync() error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncService(role postgresRole) error { | func (c *Cluster) syncService(role postgresRole) error { | ||||||
| 	cSpec := c.Spec | 	cSpec := c.Spec | ||||||
| 	if c.Service[role] == nil { | 	if c.Services[role] == nil { | ||||||
| 		c.logger.Infof("could not find the cluster's %s service", role) | 		c.logger.Infof("could not find the cluster's %s service", role) | ||||||
| 		svc, err := c.createService(role) | 		svc, err := c.createService(role) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  | @ -98,7 +98,7 @@ func (c *Cluster) syncService(role postgresRole) error { | ||||||
| 	if match { | 	if match { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	c.logServiceChanges(role, c.Service[role], desiredSvc, false, reason) | 	c.logServiceChanges(role, c.Services[role], desiredSvc, false, reason) | ||||||
| 
 | 
 | ||||||
| 	if err := c.updateService(role, desiredSvc); err != nil { | 	if err := c.updateService(role, desiredSvc); err != nil { | ||||||
| 		return fmt.Errorf("could not update %s service to match desired state: %v", role, err) | 		return fmt.Errorf("could not update %s service to match desired state: %v", role, err) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue