Implement replicaLoadBalancer boolean flag. (#38)
The flag adds a replica service with the name cluster_name-repl and
a DNS name that defaults to {cluster}-repl.{team}.{hostedzone}.
The implementation converted Service field of the cluster into a map
with one or two elements and deals with the cases when the new flag
is changed on a running cluster
(the update and the sync should create or delete the replica service).
In order to pick up master and replica service and master endpoint
when listing cluster resources.
* Update the spec when updating the cluster.
			
			
This commit is contained in:
		
							parent
							
								
									7b0ca31bfb
								
							
						
					
					
						commit
						dc36c4ca12
					
				|  | @ -34,7 +34,7 @@ var ( | |||
| 	userRegexp         = regexp.MustCompile(`^[a-z0-9]([-_a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-_a-z0-9]*[a-z0-9])?)*$`) | ||||
| ) | ||||
| 
 | ||||
| //TODO: remove struct duplication
 | ||||
| // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
 | ||||
| type Config struct { | ||||
| 	KubeClient          *kubernetes.Clientset //TODO: move clients to the better place?
 | ||||
| 	RestClient          *rest.RESTClient | ||||
|  | @ -45,7 +45,7 @@ type Config struct { | |||
| } | ||||
| 
 | ||||
| type kubeResources struct { | ||||
| 	Service     *v1.Service | ||||
| 	Service     map[PostgresRole]*v1.Service | ||||
| 	Endpoint    *v1.Endpoints | ||||
| 	Secrets     map[types.UID]*v1.Secret | ||||
| 	Statefulset *v1beta1.StatefulSet | ||||
|  | @ -77,9 +77,10 @@ type compareStatefulsetResult struct { | |||
| 	reasons       []string | ||||
| } | ||||
| 
 | ||||
| // New creates a new cluster. This function should be called from a controller.
 | ||||
| func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||
| 	lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) | ||||
| 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} | ||||
| 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} | ||||
| 	orphanDependents := true | ||||
| 
 | ||||
| 	podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { | ||||
|  | @ -108,7 +109,7 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | |||
| 	return cluster | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) ClusterName() spec.NamespacedName { | ||||
| func (c *Cluster) clusterName() spec.NamespacedName { | ||||
| 	return util.NameFromMeta(c.Metadata) | ||||
| } | ||||
| 
 | ||||
|  | @ -136,7 +137,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { | |||
| 	} | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		c.logger.Warningf("could not set status for cluster '%s': %s", c.ClusterName(), err) | ||||
| 		c.logger.Warningf("could not set status for cluster '%s': %s", c.clusterName(), err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | @ -155,11 +156,10 @@ func (c *Cluster) initUsers() error { | |||
| 		return fmt.Errorf("could not init human users: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Debugf("Initialized users: %# v", util.Pretty(c.pgUsers)) | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Create creates the new kubernetes objects associated with the cluster.
 | ||||
| func (c *Cluster) Create() error { | ||||
| 	c.mu.Lock() | ||||
| 	defer c.mu.Unlock() | ||||
|  | @ -182,11 +182,16 @@ func (c *Cluster) Create() error { | |||
| 	} | ||||
| 	c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) | ||||
| 
 | ||||
| 	service, err := c.createService() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not create service: %v", err) | ||||
| 	for _, role := range []PostgresRole{Master, Replica} { | ||||
| 		if role == Replica && !c.Spec.ReplicaLoadBalancer { | ||||
| 			continue | ||||
| 		} | ||||
| 		service, err := c.createService(role) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not create %s service: %v", role, err) | ||||
| 		} | ||||
| 		c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) | ||||
| 	} | ||||
| 	c.logger.Infof("service '%s' has been successfully created", util.NameFromMeta(service.ObjectMeta)) | ||||
| 
 | ||||
| 	if err = c.initUsers(); err != nil { | ||||
| 		return err | ||||
|  | @ -226,7 +231,7 @@ func (c *Cluster) Create() error { | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	err = c.ListResources() | ||||
| 	err = c.listResources() | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("could not list resources: %s", err) | ||||
| 	} | ||||
|  | @ -234,14 +239,19 @@ func (c *Cluster) Create() error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) sameServiceWith(service *v1.Service) (match bool, reason string) { | ||||
| func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) { | ||||
| 	//TODO: improve comparison
 | ||||
| 	if !reflect.DeepEqual(c.Service.Spec.LoadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) { | ||||
| 		reason = "new service's LoadBalancerSourceRange doesn't match the current one" | ||||
| 	} else { | ||||
| 		match = true | ||||
| 	match = true | ||||
| 	old := c.Service[role].Spec.LoadBalancerSourceRanges | ||||
| 	new := service.Spec.LoadBalancerSourceRanges | ||||
| 	/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ | ||||
| 	if (len(old) == 0) && (len(new) == 0) { | ||||
| 		return true, "" | ||||
| 	} | ||||
| 	return | ||||
| 	if !reflect.DeepEqual(old, new) { | ||||
| 		return false, fmt.Sprintf("new %s service's LoadBalancerSourceRange doesn't match the current one", role) | ||||
| 	} | ||||
| 	return true, "" | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) sameVolumeWith(volume spec.Volume) (match bool, reason string) { | ||||
|  | @ -377,6 +387,8 @@ func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resource | |||
| 
 | ||||
| } | ||||
| 
 | ||||
| // Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object.
 | ||||
| // (i.e. service) is treated as an error.
 | ||||
| func (c *Cluster) Update(newSpec *spec.Postgresql) error { | ||||
| 	c.mu.Lock() | ||||
| 	defer c.mu.Unlock() | ||||
|  | @ -385,14 +397,46 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | |||
| 	c.logger.Debugf("Cluster update from version %s to %s", | ||||
| 		c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) | ||||
| 
 | ||||
| 	newService := c.genService(newSpec.Spec.AllowedSourceRanges) | ||||
| 	if match, reason := c.sameServiceWith(newService); !match { | ||||
| 		c.logServiceChanges(c.Service, newService, true, reason) | ||||
| 		if err := c.updateService(newService); err != nil { | ||||
| 			c.setStatus(spec.ClusterStatusUpdateFailed) | ||||
| 			return fmt.Errorf("could not update service: %v", err) | ||||
| 	/* Make sure we update when this function exists */ | ||||
| 	defer func() { | ||||
| 		c.Postgresql = *newSpec | ||||
| 	}() | ||||
| 
 | ||||
| 	for _, role := range []PostgresRole{Master, Replica} { | ||||
| 		if role == Replica { | ||||
| 			if !newSpec.Spec.ReplicaLoadBalancer { | ||||
| 				// old spec had a load balancer, but the new one doesn't
 | ||||
| 				if c.Spec.ReplicaLoadBalancer { | ||||
| 					err := c.deleteService(role) | ||||
| 					if err != nil { | ||||
| 						return fmt.Errorf("could not delete obsolete %s service: %v", role, err) | ||||
| 					} | ||||
| 					c.logger.Infof("deleted obsolete %s service", role) | ||||
| 				} | ||||
| 			} else { | ||||
| 				if !c.Spec.ReplicaLoadBalancer { | ||||
| 					// old spec didn't have a load balancer, but the one does
 | ||||
| 					service, err := c.createService(role) | ||||
| 					if err != nil { | ||||
| 						return fmt.Errorf("could not create new %s service: %v", role, err) | ||||
| 					} | ||||
| 					c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) | ||||
| 				} | ||||
| 			} | ||||
| 			// only proceeed further if both old and new load balancer were present
 | ||||
| 			if !(newSpec.Spec.ReplicaLoadBalancer && c.Spec.ReplicaLoadBalancer) { | ||||
| 				continue | ||||
| 			} | ||||
| 		} | ||||
| 		newService := c.genService(role, newSpec.Spec.AllowedSourceRanges) | ||||
| 		if match, reason := c.sameServiceWith(role, newService); !match { | ||||
| 			c.logServiceChanges(role, c.Service[role], newService, true, reason) | ||||
| 			if err := c.updateService(role, newService); err != nil { | ||||
| 				c.setStatus(spec.ClusterStatusUpdateFailed) | ||||
| 				return fmt.Errorf("could not update %s service: %v", role, err) | ||||
| 			} | ||||
| 			c.logger.Infof("%s service '%s' has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) | ||||
| 		} | ||||
| 		c.logger.Infof("service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta)) | ||||
| 	} | ||||
| 
 | ||||
| 	newStatefulSet, err := c.genStatefulSet(newSpec.Spec) | ||||
|  | @ -448,6 +492,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
 | ||||
| func (c *Cluster) Delete() error { | ||||
| 	c.mu.Lock() | ||||
| 	defer c.mu.Unlock() | ||||
|  | @ -456,8 +501,13 @@ func (c *Cluster) Delete() error { | |||
| 		return fmt.Errorf("could not delete endpoint: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := c.deleteService(); err != nil { | ||||
| 		return fmt.Errorf("could not delete service: %v", err) | ||||
| 	for _, role := range []PostgresRole{Master, Replica} { | ||||
| 		if role == Replica && !c.Spec.ReplicaLoadBalancer { | ||||
| 			continue | ||||
| 		} | ||||
| 		if err := c.deleteService(role); err != nil { | ||||
| 			return fmt.Errorf("could not delete %s service: %v", role, err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if err := c.deleteStatefulSet(); err != nil { | ||||
|  | @ -473,6 +523,7 @@ func (c *Cluster) Delete() error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue.
 | ||||
| func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { | ||||
| 	c.podEventsQueue.Add(event) | ||||
| } | ||||
|  | @ -493,6 +544,7 @@ func (c *Cluster) processPodEvent(obj interface{}) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Run starts the pod event dispatching for the given cluster.
 | ||||
| func (c *Cluster) Run(stopCh <-chan struct{}) { | ||||
| 	go c.processPodEventQueue(stopCh) | ||||
| } | ||||
|  |  | |||
|  | @ -15,9 +15,9 @@ import ( | |||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	PGBinariesLocationTemplate       = "/usr/lib/postgresql/%s/bin" | ||||
| 	PatroniPGBinariesParameterName   = "pg_bin" | ||||
| 	PatroniPGParametersParameterName = "parameters" | ||||
| 	pgBinariesLocationTemplate       = "/usr/lib/postgresql/%s/bin" | ||||
| 	patroniPGBinariesParameterName   = "bin_dir" | ||||
| 	patroniPGParametersParameterName = "parameters" | ||||
| ) | ||||
| 
 | ||||
| type pgUser struct { | ||||
|  | @ -25,7 +25,7 @@ type pgUser struct { | |||
| 	Options  []string `json:"options"` | ||||
| } | ||||
| 
 | ||||
| type PatroniDCS struct { | ||||
| type patroniDCS struct { | ||||
| 	TTL                  uint32  `json:"ttl,omitempty"` | ||||
| 	LoopWait             uint32  `json:"loop_wait,omitempty"` | ||||
| 	RetryTimeout         uint32  `json:"retry_timeout,omitempty"` | ||||
|  | @ -36,7 +36,7 @@ type pgBootstrap struct { | |||
| 	Initdb []interface{}     `json:"initdb"` | ||||
| 	Users  map[string]pgUser `json:"users"` | ||||
| 	PgHBA  []string          `json:"pg_hba"` | ||||
| 	DCS    PatroniDCS        `json:"dcs,omitempty"` | ||||
| 	DCS    patroniDCS        `json:"dcs,omitempty"` | ||||
| } | ||||
| 
 | ||||
| type spiloConfiguration struct { | ||||
|  | @ -185,9 +185,9 @@ PATRONI_INITDB_PARAMS: | |||
| 	} | ||||
| 
 | ||||
| 	config.PgLocalConfiguration = make(map[string]interface{}) | ||||
| 	config.PgLocalConfiguration[PatroniPGBinariesParameterName] = fmt.Sprintf(PGBinariesLocationTemplate, pg.PgVersion) | ||||
| 	config.PgLocalConfiguration[patroniPGBinariesParameterName] = fmt.Sprintf(pgBinariesLocationTemplate, pg.PgVersion) | ||||
| 	if len(pg.Parameters) > 0 { | ||||
| 		config.PgLocalConfiguration[PatroniPGParametersParameterName] = pg.Parameters | ||||
| 		config.PgLocalConfiguration[patroniPGParametersParameterName] = pg.Parameters | ||||
| 	} | ||||
| 	config.Bootstrap.Users = map[string]pgUser{ | ||||
| 		c.OpConfig.PamRoleName: { | ||||
|  | @ -425,14 +425,22 @@ func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1. | |||
| 	return &secret | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) genService(allowedSourceRanges []string) *v1.Service { | ||||
| func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v1.Service { | ||||
| 
 | ||||
| 	dnsNameFunction := c.masterDnsName | ||||
| 	name := c.Metadata.Name | ||||
| 	if role == Replica { | ||||
| 		dnsNameFunction = c.replicaDnsName | ||||
| 		name = name + "-repl" | ||||
| 	} | ||||
| 
 | ||||
| 	service := &v1.Service{ | ||||
| 		ObjectMeta: v1.ObjectMeta{ | ||||
| 			Name:      c.Metadata.Name, | ||||
| 			Name:      name, | ||||
| 			Namespace: c.Metadata.Namespace, | ||||
| 			Labels:    c.labelsSet(), | ||||
| 			Labels:    c.roleLabelsSet(role), | ||||
| 			Annotations: map[string]string{ | ||||
| 				constants.ZalandoDNSNameAnnotation: c.dnsName(), | ||||
| 				constants.ZalandoDNSNameAnnotation: dnsNameFunction(), | ||||
| 				constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, | ||||
| 			}, | ||||
| 		}, | ||||
|  | @ -442,16 +450,19 @@ func (c *Cluster) genService(allowedSourceRanges []string) *v1.Service { | |||
| 			LoadBalancerSourceRanges: allowedSourceRanges, | ||||
| 		}, | ||||
| 	} | ||||
| 	if role == Replica { | ||||
| 		service.Spec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)} | ||||
| 	} | ||||
| 
 | ||||
| 	return service | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) genEndpoints() *v1.Endpoints { | ||||
| func (c *Cluster) genMasterEndpoints() *v1.Endpoints { | ||||
| 	endpoints := &v1.Endpoints{ | ||||
| 		ObjectMeta: v1.ObjectMeta{ | ||||
| 			Name:      c.Metadata.Name, | ||||
| 			Namespace: c.Metadata.Namespace, | ||||
| 			Labels:    c.labelsSet(), | ||||
| 			Labels:    c.roleLabelsSet(Master), | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
|  |  | |||
|  | @ -24,20 +24,31 @@ func (c *Cluster) loadResources() error { | |||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get list of services: %v", err) | ||||
| 	} | ||||
| 	if len(services.Items) > 1 { | ||||
| 	if len(services.Items) > 2 { | ||||
| 		return fmt.Errorf("too many(%d) services for a cluster", len(services.Items)) | ||||
| 	} else if len(services.Items) == 1 { | ||||
| 		c.Service = &services.Items[0] | ||||
| 	} | ||||
| 	for i, svc := range services.Items { | ||||
| 		switch PostgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) { | ||||
| 		case Replica: | ||||
| 			c.Service[Replica] = &services.Items[i] | ||||
| 		default: | ||||
| 			c.Service[Master] = &services.Items[i] | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	endpoints, err := c.KubeClient.Endpoints(ns).List(listOptions) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get list of endpoints: %v", err) | ||||
| 	} | ||||
| 	if len(endpoints.Items) > 1 { | ||||
| 	if len(endpoints.Items) > 2 { | ||||
| 		return fmt.Errorf("too many(%d) endpoints for a cluster", len(endpoints.Items)) | ||||
| 	} else if len(endpoints.Items) == 1 { | ||||
| 		c.Endpoint = &endpoints.Items[0] | ||||
| 	} | ||||
| 
 | ||||
| 	for i, ep := range endpoints.Items { | ||||
| 		if ep.Labels[c.OpConfig.PodRoleLabel] != string(Replica) { | ||||
| 			c.Endpoint = &endpoints.Items[i] | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	secrets, err := c.KubeClient.Secrets(ns).List(listOptions) | ||||
|  | @ -58,14 +69,15 @@ func (c *Cluster) loadResources() error { | |||
| 	} | ||||
| 	if len(statefulSets.Items) > 1 { | ||||
| 		return fmt.Errorf("too many(%d) statefulsets for a cluster", len(statefulSets.Items)) | ||||
| 	} else if len(statefulSets.Items) == 1 { | ||||
| 	} | ||||
| 	if len(statefulSets.Items) == 1 { | ||||
| 		c.Statefulset = &statefulSets.Items[0] | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) ListResources() error { | ||||
| func (c *Cluster) listResources() error { | ||||
| 	if c.Statefulset != nil { | ||||
| 		c.logger.Infof("Found statefulset: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) | ||||
| 	} | ||||
|  | @ -78,8 +90,8 @@ func (c *Cluster) ListResources() error { | |||
| 		c.logger.Infof("Found endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) | ||||
| 	} | ||||
| 
 | ||||
| 	if c.Service != nil { | ||||
| 		c.logger.Infof("Found service: %s (uid: %s)", util.NameFromMeta(c.Service.ObjectMeta), c.Service.UID) | ||||
| 	for role, service := range c.Service { | ||||
| 		c.logger.Infof("Found %s service: %s (uid: %s)", role, util.NameFromMeta(service.ObjectMeta), service.UID) | ||||
| 	} | ||||
| 
 | ||||
| 	pods, err := c.listPods() | ||||
|  | @ -217,57 +229,56 @@ func (c *Cluster) deleteStatefulSet() error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) createService() (*v1.Service, error) { | ||||
| 	if c.Service != nil { | ||||
| func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { | ||||
| 	if c.Service[role] != nil { | ||||
| 		return nil, fmt.Errorf("service already exists in the cluster") | ||||
| 	} | ||||
| 	serviceSpec := c.genService(c.Spec.AllowedSourceRanges) | ||||
| 	serviceSpec := c.genService(role, c.Spec.AllowedSourceRanges) | ||||
| 
 | ||||
| 	service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	c.Service = service | ||||
| 
 | ||||
| 	c.Service[role] = service | ||||
| 	return service, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) updateService(newService *v1.Service) error { | ||||
| 	if c.Service == nil { | ||||
| func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error { | ||||
| 	if c.Service[role] == nil { | ||||
| 		return fmt.Errorf("there is no service in the cluster") | ||||
| 	} | ||||
| 	serviceName := util.NameFromMeta(c.Service.ObjectMeta) | ||||
| 	serviceName := util.NameFromMeta(c.Service[role].ObjectMeta) | ||||
| 
 | ||||
| 	patchData, err := specPatch(newService.Spec) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not form patch for the service '%s': %v", serviceName, err) | ||||
| 	} | ||||
| 
 | ||||
| 	svc, err := c.KubeClient.Services(c.Service.Namespace).Patch( | ||||
| 		c.Service.Name, | ||||
| 	svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( | ||||
| 		c.Service[role].Name, | ||||
| 		api.MergePatchType, | ||||
| 		patchData, "") | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not patch service '%s': %v", serviceName, err) | ||||
| 	} | ||||
| 	c.Service = svc | ||||
| 	c.Service[role] = svc | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) deleteService() error { | ||||
| 	c.logger.Debugln("Deleting service") | ||||
| 
 | ||||
| 	if c.Service == nil { | ||||
| 		return fmt.Errorf("there is no service in the cluster") | ||||
| func (c *Cluster) deleteService(role PostgresRole) error { | ||||
| 	c.logger.Debugf("Deleting service %s", role) | ||||
| 	if c.Service[role] == nil { | ||||
| 		return fmt.Errorf("There is no %s service in the cluster", role) | ||||
| 	} | ||||
| 	err := c.KubeClient.Services(c.Service.Namespace).Delete(c.Service.Name, c.deleteOptions) | ||||
| 	service := c.Service[role] | ||||
| 	err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	c.logger.Infof("service '%s' has been deleted", util.NameFromMeta(c.Service.ObjectMeta)) | ||||
| 	c.Service = nil | ||||
| 
 | ||||
| 	c.logger.Infof("%s service '%s' has been deleted", role, util.NameFromMeta(service.ObjectMeta)) | ||||
| 	c.Service[role] = nil | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  | @ -275,7 +286,7 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { | |||
| 	if c.Endpoint != nil { | ||||
| 		return nil, fmt.Errorf("endpoint already exists in the cluster") | ||||
| 	} | ||||
| 	endpointsSpec := c.genEndpoints() | ||||
| 	endpointsSpec := c.genMasterEndpoints() | ||||
| 
 | ||||
| 	endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec) | ||||
| 	if err != nil { | ||||
|  |  | |||
|  | @ -8,6 +8,8 @@ import ( | |||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/volumes" | ||||
| ) | ||||
| 
 | ||||
| // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest.
 | ||||
| // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them.
 | ||||
| func (c *Cluster) Sync() error { | ||||
| 	c.mu.Lock() | ||||
| 	defer c.mu.Unlock() | ||||
|  | @ -32,9 +34,20 @@ func (c *Cluster) Sync() error { | |||
| 	} | ||||
| 
 | ||||
| 	c.logger.Debugf("Syncing services") | ||||
| 	if err := c.syncService(); err != nil { | ||||
| 		if !k8sutil.ResourceAlreadyExists(err) { | ||||
| 			return fmt.Errorf("coud not sync services: %v", err) | ||||
| 	for _, role := range []PostgresRole{Master, Replica} { | ||||
| 		if role == Replica && !c.Spec.ReplicaLoadBalancer { | ||||
| 			if c.Service[role] != nil { | ||||
| 				// delete the left over replica service
 | ||||
| 				if err := c.deleteService(role); err != nil { | ||||
| 					return fmt.Errorf("could not delete obsolete %s service: %v", role) | ||||
| 				} | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
| 		if err := c.syncService(role); err != nil { | ||||
| 			if !k8sutil.ResourceAlreadyExists(err) { | ||||
| 				return fmt.Errorf("coud not sync %s service: %v", role, err) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
|  | @ -48,16 +61,15 @@ func (c *Cluster) Sync() error { | |||
| 	if !c.databaseAccessDisabled() { | ||||
| 		if err := c.initDbConn(); err != nil { | ||||
| 			return fmt.Errorf("could not init db connection: %v", err) | ||||
| 		} else { | ||||
| 			c.logger.Debugf("Syncing roles") | ||||
| 			if err := c.SyncRoles(); err != nil { | ||||
| 				return fmt.Errorf("could not sync roles: %v", err) | ||||
| 			} | ||||
| 		} | ||||
| 		c.logger.Debugf("Syncing roles") | ||||
| 		if err := c.syncRoles(); err != nil { | ||||
| 			return fmt.Errorf("could not sync roles: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Debugf("Syncing persistent volumes") | ||||
| 	if err := c.SyncVolumes(); err != nil { | ||||
| 	if err := c.syncVolumes(); err != nil { | ||||
| 		return fmt.Errorf("could not sync persistent volumes: %v", err) | ||||
| 	} | ||||
| 
 | ||||
|  | @ -75,30 +87,30 @@ func (c *Cluster) syncSecrets() error { | |||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncService() error { | ||||
| func (c *Cluster) syncService(role PostgresRole) error { | ||||
| 	cSpec := c.Spec | ||||
| 	if c.Service == nil { | ||||
| 		c.logger.Infof("could not find the cluster's service") | ||||
| 		svc, err := c.createService() | ||||
| 	if c.Service[role] == nil { | ||||
| 		c.logger.Infof("could not find the cluster's %s service", role) | ||||
| 		svc, err := c.createService(role) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not create missing service: %v", err) | ||||
| 			return fmt.Errorf("could not create missing %s service: %v", role, err) | ||||
| 		} | ||||
| 		c.logger.Infof("Created missing service '%s'", util.NameFromMeta(svc.ObjectMeta)) | ||||
| 		c.logger.Infof("Created missing %s service '%s'", role, util.NameFromMeta(svc.ObjectMeta)) | ||||
| 
 | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	desiredSvc := c.genService(cSpec.AllowedSourceRanges) | ||||
| 	match, reason := c.sameServiceWith(desiredSvc) | ||||
| 	desiredSvc := c.genService(role, cSpec.AllowedSourceRanges) | ||||
| 	match, reason := c.sameServiceWith(role, desiredSvc) | ||||
| 	if match { | ||||
| 		return nil | ||||
| 	} | ||||
| 	c.logServiceChanges(c.Service, desiredSvc, false, reason) | ||||
| 	c.logServiceChanges(role, c.Service[role], desiredSvc, false, reason) | ||||
| 
 | ||||
| 	if err := c.updateService(desiredSvc); err != nil { | ||||
| 		return fmt.Errorf("could not update service to match desired state: %v", err) | ||||
| 	if err := c.updateService(role, desiredSvc); err != nil { | ||||
| 		return fmt.Errorf("could not update %s service to match desired state: %v", role, err) | ||||
| 	} | ||||
| 	c.logger.Infof("service '%s' is in the desired state now", util.NameFromMeta(desiredSvc.ObjectMeta)) | ||||
| 	c.logger.Infof("%s service '%s' is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
|  | @ -181,7 +193,7 @@ func (c *Cluster) syncStatefulSet() error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) SyncRoles() error { | ||||
| func (c *Cluster) syncRoles() error { | ||||
| 	var userNames []string | ||||
| 
 | ||||
| 	if err := c.initUsers(); err != nil { | ||||
|  | @ -201,9 +213,9 @@ func (c *Cluster) SyncRoles() error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| /* SyncVolume reads all persistent volumes and checks that their size matches the one declared in the statefulset */ | ||||
| func (c *Cluster) SyncVolumes() error { | ||||
| 	act, err := c.VolumesNeedResizing(c.Spec.Volume) | ||||
| // syncVolumes reads all persistent volumes and checks that their size matches the one declared in the statefulset.
 | ||||
| func (c *Cluster) syncVolumes() error { | ||||
| 	act, err := c.volumesNeedResizing(c.Spec.Volume) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not compare size of the volumes: %v", err) | ||||
| 	} | ||||
|  |  | |||
|  | @ -0,0 +1,8 @@ | |||
| package cluster | ||||
| 
 | ||||
| type PostgresRole string | ||||
| 
 | ||||
| const ( | ||||
| 	Master  PostgresRole = "master" | ||||
| 	Replica PostgresRole = "replica" | ||||
| ) | ||||
|  | @ -82,14 +82,14 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) logServiceChanges(old, new *v1.Service, isUpdate bool, reason string) { | ||||
| func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) { | ||||
| 	if isUpdate { | ||||
| 		c.logger.Infof("service '%s' has been changed", | ||||
| 			util.NameFromMeta(old.ObjectMeta), | ||||
| 		c.logger.Infof("%s service '%s' has been changed", | ||||
| 			role, util.NameFromMeta(old.ObjectMeta), | ||||
| 		) | ||||
| 	} else { | ||||
| 		c.logger.Infof("service '%s  is not in the desired state and needs to be updated", | ||||
| 			util.NameFromMeta(old.ObjectMeta), | ||||
| 		c.logger.Infof("%s service '%s  is not in the desired state and needs to be updated", | ||||
| 			role, util.NameFromMeta(old.ObjectMeta), | ||||
| 		) | ||||
| 	} | ||||
| 	c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) | ||||
|  | @ -145,7 +145,6 @@ func (c *Cluster) getTeamMembers() ([]string, error) { | |||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("could not get team info: %v", err) | ||||
| 	} | ||||
| 	c.logger.Debugf("Got from the Team API: %+v", *teamInfo) | ||||
| 
 | ||||
| 	return teamInfo.Members, nil | ||||
| } | ||||
|  | @ -263,14 +262,30 @@ func (c *Cluster) waitStatefulsetPodsReady() error { | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) labelsSet() labels.Set { | ||||
| 	lbls := c.OpConfig.ClusterLabels | ||||
| 	lbls := make(map[string]string) | ||||
| 	for k, v := range c.OpConfig.ClusterLabels { | ||||
| 		lbls[k] = v | ||||
| 	} | ||||
| 	lbls[c.OpConfig.ClusterNameLabel] = c.Metadata.Name | ||||
| 
 | ||||
| 	return labels.Set(lbls) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) dnsName() string { | ||||
| 	return strings.ToLower(c.OpConfig.DNSNameFormat.Format( | ||||
| func (c *Cluster) roleLabelsSet(role PostgresRole) labels.Set { | ||||
| 	lbls := c.labelsSet() | ||||
| 	lbls[c.OpConfig.PodRoleLabel] = string(role) | ||||
| 	return lbls | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) masterDnsName() string { | ||||
| 	return strings.ToLower(c.OpConfig.MasterDNSNameFormat.Format( | ||||
| 		"cluster", c.Spec.ClusterName, | ||||
| 		"team", c.teamName(), | ||||
| 		"hostedzone", c.OpConfig.DbHostedZone)) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) replicaDnsName() string { | ||||
| 	return strings.ToLower(c.OpConfig.ReplicaDNSNameFormat.Format( | ||||
| 		"cluster", c.Spec.ClusterName, | ||||
| 		"team", c.teamName(), | ||||
| 		"hostedzone", c.OpConfig.DbHostedZone)) | ||||
|  |  | |||
|  | @ -60,15 +60,14 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { | |||
| 	for _, pvc := range pvcs { | ||||
| 		lastDash := strings.LastIndex(pvc.Name, "-") | ||||
| 		if lastDash > 0 && lastDash < len(pvc.Name)-1 { | ||||
| 			if pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]); err != nil { | ||||
| 			pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]) | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) | ||||
| 			} else { | ||||
| 				if int32(pvcNumber) > lastPodIndex { | ||||
| 					c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) | ||||
| 					continue | ||||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			if int32(pvcNumber) > lastPodIndex { | ||||
| 				c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) | ||||
| 				continue | ||||
| 			} | ||||
| 		} | ||||
| 		pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName) | ||||
| 		if err != nil { | ||||
|  | @ -139,7 +138,7 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) VolumesNeedResizing(newVolume spec.Volume) (bool, error) { | ||||
| func (c *Cluster) volumesNeedResizing(newVolume spec.Volume) (bool, error) { | ||||
| 	volumes, manifestSize, err := c.listVolumesWithManifestSize(newVolume) | ||||
| 	if err != nil { | ||||
| 		return false, err | ||||
|  |  | |||
|  | @ -62,7 +62,7 @@ func (c *Controller) podAdd(obj interface{}) { | |||
| 	} | ||||
| 
 | ||||
| 	podEvent := spec.PodEvent{ | ||||
| 		ClusterName:     c.PodClusterName(pod), | ||||
| 		ClusterName:     c.podClusterName(pod), | ||||
| 		PodName:         util.NameFromMeta(pod.ObjectMeta), | ||||
| 		CurPod:          pod, | ||||
| 		EventType:       spec.EventAdd, | ||||
|  | @ -84,7 +84,7 @@ func (c *Controller) podUpdate(prev, cur interface{}) { | |||
| 	} | ||||
| 
 | ||||
| 	podEvent := spec.PodEvent{ | ||||
| 		ClusterName:     c.PodClusterName(curPod), | ||||
| 		ClusterName:     c.podClusterName(curPod), | ||||
| 		PodName:         util.NameFromMeta(curPod.ObjectMeta), | ||||
| 		PrevPod:         prevPod, | ||||
| 		CurPod:          curPod, | ||||
|  | @ -102,7 +102,7 @@ func (c *Controller) podDelete(obj interface{}) { | |||
| 	} | ||||
| 
 | ||||
| 	podEvent := spec.PodEvent{ | ||||
| 		ClusterName:     c.PodClusterName(pod), | ||||
| 		ClusterName:     c.podClusterName(pod), | ||||
| 		PodName:         util.NameFromMeta(pod.ObjectMeta), | ||||
| 		CurPod:          pod, | ||||
| 		EventType:       spec.EventDelete, | ||||
|  |  | |||
|  | @ -115,7 +115,7 @@ Users: | |||
| 	return result, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) PodClusterName(pod *v1.Pod) spec.NamespacedName { | ||||
| func (c *Controller) podClusterName(pod *v1.Pod) spec.NamespacedName { | ||||
| 	if name, ok := pod.Labels[c.opConfig.ClusterNameLabel]; ok { | ||||
| 		return spec.NamespacedName{ | ||||
| 			Namespace: pod.Namespace, | ||||
|  |  | |||
|  | @ -11,6 +11,7 @@ import ( | |||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| ) | ||||
| 
 | ||||
| // MaintenanceWindow describes the time window when the operator is allowed to do maintenance on a cluster.
 | ||||
| type MaintenanceWindow struct { | ||||
| 	Everyday  bool | ||||
| 	Weekday   time.Weekday | ||||
|  | @ -18,26 +19,31 @@ type MaintenanceWindow struct { | |||
| 	EndTime   time.Time // End time
 | ||||
| } | ||||
| 
 | ||||
| // Volume describes a single volume in the manifest.
 | ||||
| type Volume struct { | ||||
| 	Size         string `json:"size"` | ||||
| 	StorageClass string `json:"storageClass"` | ||||
| } | ||||
| 
 | ||||
| // PostgresqlParam describes PostgreSQL version and pairs of configuration parameter name - values.
 | ||||
| type PostgresqlParam struct { | ||||
| 	PgVersion  string            `json:"version"` | ||||
| 	Parameters map[string]string `json:"parameters"` | ||||
| } | ||||
| 
 | ||||
| // ResourceDescription describes CPU and memory resources defined for a cluster.
 | ||||
| type ResourceDescription struct { | ||||
| 	CPU    string `json:"cpu"` | ||||
| 	Memory string `json:"memory"` | ||||
| } | ||||
| 
 | ||||
| // Resources describes requests and limits for the cluster resouces.
 | ||||
| type Resources struct { | ||||
| 	ResourceRequest ResourceDescription `json:"requests,omitempty"` | ||||
| 	ResourceLimits  ResourceDescription `json:"limits,omitempty"` | ||||
| } | ||||
| 
 | ||||
| // Patroni contains Patroni-specific configuration
 | ||||
| type Patroni struct { | ||||
| 	InitDB               map[string]string `json:"initdb"` | ||||
| 	PgHba                []string          `json:"pg_hba"` | ||||
|  | @ -47,10 +53,12 @@ type Patroni struct { | |||
| 	MaximumLagOnFailover float32           `json:"maximum_lag_on_failover"` // float32 because https://github.com/kubernetes/kubernetes/issues/30213
 | ||||
| } | ||||
| 
 | ||||
| type UserFlags []string | ||||
| type userFlags []string | ||||
| 
 | ||||
| // PostgresStatus contains status of the PostgreSQL cluster (running, creation failed etc.)
 | ||||
| type PostgresStatus string | ||||
| 
 | ||||
| // possible values for PostgreSQL cluster statuses
 | ||||
| const ( | ||||
| 	ClusterStatusUnknown      PostgresStatus = "" | ||||
| 	ClusterStatusCreating     PostgresStatus = "Creating" | ||||
|  | @ -61,7 +69,7 @@ const ( | |||
| 	ClusterStatusInvalid      PostgresStatus = "Invalid" | ||||
| ) | ||||
| 
 | ||||
| // PostgreSQL Third Party (resource) Object
 | ||||
| // Postgresql defines PostgreSQL Third Party (resource) Object.
 | ||||
| type Postgresql struct { | ||||
| 	unversioned.TypeMeta `json:",inline"` | ||||
| 	Metadata             v1.ObjectMeta `json:"metadata"` | ||||
|  | @ -71,6 +79,7 @@ type Postgresql struct { | |||
| 	Error  error          `json:"-"` | ||||
| } | ||||
| 
 | ||||
| // PostgresSpec defines the specification for the PostgreSQL TPR.
 | ||||
| type PostgresSpec struct { | ||||
| 	PostgresqlParam `json:"postgresql"` | ||||
| 	Volume          `json:"volume,omitempty"` | ||||
|  | @ -79,12 +88,14 @@ type PostgresSpec struct { | |||
| 
 | ||||
| 	TeamID              string               `json:"teamId"` | ||||
| 	AllowedSourceRanges []string             `json:"allowedSourceRanges"` | ||||
| 	ReplicaLoadBalancer bool                 `json:"replicaLoadBalancer,omitempty"` | ||||
| 	NumberOfInstances   int32                `json:"numberOfInstances"` | ||||
| 	Users               map[string]UserFlags `json:"users"` | ||||
| 	Users               map[string]userFlags `json:"users"` | ||||
| 	MaintenanceWindows  []MaintenanceWindow  `json:"maintenanceWindows,omitempty"` | ||||
| 	ClusterName         string               `json:"-"` | ||||
| } | ||||
| 
 | ||||
| // PostgresqlList defines a list of PostgreSQL clusters.
 | ||||
| type PostgresqlList struct { | ||||
| 	unversioned.TypeMeta `json:",inline"` | ||||
| 	Metadata             unversioned.ListMeta `json:"metadata"` | ||||
|  | @ -118,6 +129,7 @@ func parseWeekday(s string) (time.Weekday, error) { | |||
| 	return time.Weekday(weekday), nil | ||||
| } | ||||
| 
 | ||||
| // MarshalJSON converts a maintenance window definition to JSON.
 | ||||
| func (m *MaintenanceWindow) MarshalJSON() ([]byte, error) { | ||||
| 	if m.Everyday { | ||||
| 		return []byte(fmt.Sprintf("\"%s-%s\"", | ||||
|  | @ -131,6 +143,7 @@ func (m *MaintenanceWindow) MarshalJSON() ([]byte, error) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // UnmarshalJSON convets a JSON to the maintenance window definition.
 | ||||
| func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { | ||||
| 	var ( | ||||
| 		got MaintenanceWindow | ||||
|  | @ -176,18 +189,17 @@ func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // GetObject implements Object interface for PostgreSQL TPR spec object.
 | ||||
| func (p *Postgresql) GetObjectKind() unversioned.ObjectKind { | ||||
| 	return &p.TypeMeta | ||||
| } | ||||
| 
 | ||||
| // GetObjectMeta implements ObjectMetaAccessor interface for PostgreSQL TPR spec object.
 | ||||
| func (p *Postgresql) GetObjectMeta() meta.Object { | ||||
| 	return &p.Metadata | ||||
| } | ||||
| 
 | ||||
| func (pl *PostgresqlList) GetObjectKind() unversioned.ObjectKind { | ||||
| 	return &pl.TypeMeta | ||||
| } | ||||
| 
 | ||||
| // GetListMeta implements ListMetaAccessor interface for PostgreSQL TPR List spec object.
 | ||||
| func (pl *PostgresqlList) GetListMeta() unversioned.List { | ||||
| 	return &pl.Metadata | ||||
| } | ||||
|  | @ -213,11 +225,12 @@ func extractClusterName(clusterName string, teamName string) (string, error) { | |||
| // resources and ugorji. If/when these issues are resolved, the code below
 | ||||
| // should no longer be required.
 | ||||
| //
 | ||||
| type PostgresqlListCopy PostgresqlList | ||||
| type PostgresqlCopy Postgresql | ||||
| type postgresqlListCopy PostgresqlList | ||||
| type postgresqlCopy Postgresql | ||||
| 
 | ||||
| // UnmarshalJSON converts a JSON into the PostgreSQL object.
 | ||||
| func (p *Postgresql) UnmarshalJSON(data []byte) error { | ||||
| 	var tmp PostgresqlCopy | ||||
| 	var tmp postgresqlCopy | ||||
| 
 | ||||
| 	err := json.Unmarshal(data, &tmp) | ||||
| 	if err != nil { | ||||
|  | @ -247,8 +260,9 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // UnmarshalJSON converts a JSON into the PostgreSQL List object.
 | ||||
| func (pl *PostgresqlList) UnmarshalJSON(data []byte) error { | ||||
| 	var tmp PostgresqlListCopy | ||||
| 	var tmp postgresqlListCopy | ||||
| 
 | ||||
| 	err := json.Unmarshal(data, &tmp) | ||||
| 	if err != nil { | ||||
|  |  | |||
|  | @ -7,10 +7,13 @@ import ( | |||
| 	"k8s.io/client-go/pkg/types" | ||||
| ) | ||||
| 
 | ||||
| // EvenType contains type of the events for the TPRs and Pods received from Kubernetes
 | ||||
| type EventType string | ||||
| 
 | ||||
| // NamespacedName describes the namespace/name pairs used in Kubernetes names.
 | ||||
| type NamespacedName types.NamespacedName | ||||
| 
 | ||||
| // Possible values for the EventType
 | ||||
| const ( | ||||
| 	EventAdd    EventType = "ADD" | ||||
| 	EventUpdate EventType = "UPDATE" | ||||
|  | @ -18,6 +21,7 @@ const ( | |||
| 	EventSync   EventType = "SYNC" | ||||
| ) | ||||
| 
 | ||||
| // ClusterEvent carries the payload of the Cluster TPR events.
 | ||||
| type ClusterEvent struct { | ||||
| 	UID       types.UID | ||||
| 	EventType EventType | ||||
|  | @ -26,13 +30,15 @@ type ClusterEvent struct { | |||
| 	WorkerID  uint32 | ||||
| } | ||||
| 
 | ||||
| type SyncUserOperation int | ||||
| type syncUserOperation int | ||||
| 
 | ||||
| // Possible values for the sync user operation (removal of users is not supported yet)
 | ||||
| const ( | ||||
| 	PGSyncUserAdd = iota | ||||
| 	PGsyncUserAlter | ||||
| ) | ||||
| 
 | ||||
| // PodEvent describes the event for a single Pod
 | ||||
| type PodEvent struct { | ||||
| 	ResourceVersion string | ||||
| 	ClusterName     NamespacedName | ||||
|  | @ -42,6 +48,7 @@ type PodEvent struct { | |||
| 	EventType       EventType | ||||
| } | ||||
| 
 | ||||
| // PgUser contains information about a single user.
 | ||||
| type PgUser struct { | ||||
| 	Name     string | ||||
| 	Password string | ||||
|  | @ -49,13 +56,16 @@ type PgUser struct { | |||
| 	MemberOf []string | ||||
| } | ||||
| 
 | ||||
| // PgUserMap maps user names to the definitions.
 | ||||
| type PgUserMap map[string]PgUser | ||||
| 
 | ||||
| // PgSyncUserRequest has information about a single request to sync a user.
 | ||||
| type PgSyncUserRequest struct { | ||||
| 	Kind SyncUserOperation | ||||
| 	Kind syncUserOperation | ||||
| 	User PgUser | ||||
| } | ||||
| 
 | ||||
| // UserSyncer defines an interface for the implementations to sync users from the manifest to the DB.
 | ||||
| type UserSyncer interface { | ||||
| 	ProduceSyncRequests(dbUsers PgUserMap, newUsers PgUserMap) (req []PgSyncUserRequest) | ||||
| 	ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error | ||||
|  | @ -69,10 +79,12 @@ func (n NamespacedName) String() string { | |||
| 	return types.NamespacedName(n).String() | ||||
| } | ||||
| 
 | ||||
| // MarshalJSON defines marshaling rule for the namespaced name type.
 | ||||
| func (n NamespacedName) MarshalJSON() ([]byte, error) { | ||||
| 	return []byte("\"" + n.String() + "\""), nil | ||||
| } | ||||
| 
 | ||||
| // Decode converts a (possibly unqualified) string into the namespaced name object.
 | ||||
| func (n *NamespacedName) Decode(value string) error { | ||||
| 	name := types.NewNamespacedNameFromString(value) | ||||
| 	if value != "" && name == (types.NamespacedName{}) { | ||||
|  |  | |||
|  | @ -43,19 +43,20 @@ type Config struct { | |||
| 	TPR | ||||
| 	Resources | ||||
| 	Auth | ||||
| 	Namespace          string         `name:"namespace"` | ||||
| 	EtcdHost           string         `name:"etcd_host" default:"etcd-client.default.svc.cluster.local:2379"` | ||||
| 	DockerImage        string         `name:"docker_image" default:"registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4"` | ||||
| 	ServiceAccountName string         `name:"service_account_name" default:"operator"` | ||||
| 	DbHostedZone       string         `name:"db_hosted_zone" default:"db.example.com"` | ||||
| 	EtcdScope          string         `name:"etcd_scope" default:"service"` | ||||
| 	WALES3Bucket       string         `name:"wal_s3_bucket"` | ||||
| 	KubeIAMRole        string         `name:"kube_iam_role"` | ||||
| 	DebugLogging       bool           `name:"debug_logging" default:"true"` | ||||
| 	EnableDBAccess     bool           `name:"enable_database_access" default:"true"` | ||||
| 	EnableTeamsAPI     bool           `name:"enable_teams_api" default:"true"` | ||||
| 	DNSNameFormat      stringTemplate `name:"dns_name_format" default:"{cluster}.{team}.{hostedzone}"` | ||||
| 	Workers            uint32         `name:"workers" default:"4"` | ||||
| 	Namespace            string         `name:"namespace"` | ||||
| 	EtcdHost             string         `name:"etcd_host" default:"etcd-client.default.svc.cluster.local:2379"` | ||||
| 	DockerImage          string         `name:"docker_image" default:"registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4"` | ||||
| 	ServiceAccountName   string         `name:"service_account_name" default:"operator"` | ||||
| 	DbHostedZone         string         `name:"db_hosted_zone" default:"db.example.com"` | ||||
| 	EtcdScope            string         `name:"etcd_scope" default:"service"` | ||||
| 	WALES3Bucket         string         `name:"wal_s3_bucket"` | ||||
| 	KubeIAMRole          string         `name:"kube_iam_role"` | ||||
| 	DebugLogging         bool           `name:"debug_logging" default:"true"` | ||||
| 	EnableDBAccess       bool           `name:"enable_database_access" default:"true"` | ||||
| 	EnableTeamsAPI       bool           `name:"enable_teams_api" default:"true"` | ||||
| 	MasterDNSNameFormat  stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` | ||||
| 	ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` | ||||
| 	Workers              uint32         `name:"workers" default:"4"` | ||||
| } | ||||
| 
 | ||||
| func (c Config) MustMarshal() string { | ||||
|  |  | |||
|  | @ -9,7 +9,7 @@ import ( | |||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| type Decoder interface { | ||||
| type decoder interface { | ||||
| 	Decode(value string) error | ||||
| } | ||||
| 
 | ||||
|  | @ -21,15 +21,15 @@ type fieldInfo struct { | |||
| 
 | ||||
| type stringTemplate string | ||||
| 
 | ||||
| func decoderFrom(field reflect.Value) (d Decoder) { | ||||
| func decoderFrom(field reflect.Value) (d decoder) { | ||||
| 	// it may be impossible for a struct field to fail this check
 | ||||
| 	if !field.CanInterface() { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	d, ok := field.Interface().(Decoder) | ||||
| 	d, ok := field.Interface().(decoder) | ||||
| 	if !ok && field.CanAddr() { | ||||
| 		d, _ = field.Addr().Interface().(Decoder) | ||||
| 		d, _ = field.Addr().Interface().(decoder) | ||||
| 	} | ||||
| 
 | ||||
| 	return d | ||||
|  |  | |||
|  | @ -1,5 +1,6 @@ | |||
| package constants | ||||
| 
 | ||||
| // Names and values in Kubernetes annotation for services, statefulsets and volumes
 | ||||
| const ( | ||||
| 	ZalandoDNSNameAnnotation           = "external-dns.alpha.kubernetes.io/hostname" | ||||
| 	ElbTimeoutAnnotationName           = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" | ||||
|  |  | |||
|  | @ -2,8 +2,11 @@ package constants | |||
| 
 | ||||
| import "time" | ||||
| 
 | ||||
| // AWS specific constants used by other modules
 | ||||
| const ( | ||||
| 	AWS_REGION       = "eu-central-1" | ||||
| 	// default region for AWS. TODO: move it to the operator configuration
 | ||||
| 	AWS_REGION = "eu-central-1" | ||||
| 	// EBS related constants
 | ||||
| 	EBSVolumeIDStart = "/vol-" | ||||
| 	EBSProvisioner   = "kubernetes.io/aws-ebs" | ||||
| 	//https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_VolumeModification.html
 | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ package constants | |||
| 
 | ||||
| import "time" | ||||
| 
 | ||||
| // General kubernetes-related constants
 | ||||
| const ( | ||||
| 	ListClustersURITemplate     = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName       // Namespace
 | ||||
| 	WatchClustersURITemplate    = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace
 | ||||
|  |  | |||
|  | @ -1,5 +1,6 @@ | |||
| package constants | ||||
| 
 | ||||
| // Different properties of the PostgreSQL Third Party Resources
 | ||||
| const ( | ||||
| 	TPRName        = "postgresql" | ||||
| 	TPRVendor      = "acid.zalan.do" | ||||
|  |  | |||
|  | @ -1,5 +1,6 @@ | |||
| package constants | ||||
| 
 | ||||
| // Measurement-unit definitions
 | ||||
| const ( | ||||
| 	Gigabyte = 1073741824 | ||||
| ) | ||||
|  |  | |||
|  | @ -11,19 +11,22 @@ var ( | |||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	EXT2      = "ext2" | ||||
| 	EXT3      = "ext3" | ||||
| 	EXT4      = "ext4" | ||||
| 	ext2      = "ext2" | ||||
| 	ext3      = "ext3" | ||||
| 	ext4      = "ext4" | ||||
| 	resize2fs = "resize2fs" | ||||
| ) | ||||
| 
 | ||||
| //  Ext234Resize implements the FilesystemResizer interface for the ext4/3/2fs.
 | ||||
| type Ext234Resize struct { | ||||
| } | ||||
| 
 | ||||
| // CanResizeFilesystem checks whether Ext234Resize can resize this filesystem.
 | ||||
| func (c *Ext234Resize) CanResizeFilesystem(fstype string) bool { | ||||
| 	return fstype == EXT2 || fstype == EXT3 || fstype == EXT4 | ||||
| 	return fstype == ext2 || fstype == ext3 || fstype == ext4 | ||||
| } | ||||
| 
 | ||||
| // ResizeFilesystem calls resize2fs to resize the filesystem if necessary.
 | ||||
| func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func(cmd string) (out string, err error)) error { | ||||
| 	command := fmt.Sprintf("%s %s 2>&1", resize2fs, deviceName) | ||||
| 	out, err := commandExecutor(command) | ||||
|  |  | |||
|  | @ -7,6 +7,7 @@ import ( | |||
| 
 | ||||
| type ConditionFunc func() (bool, error) | ||||
| 
 | ||||
| // Retry calls ConditionFunc until it returns boolean true, a timeout expires or an error occurs.
 | ||||
| func Retry(interval time.Duration, timeout time.Duration, f ConditionFunc) error { | ||||
| 	//TODO: make the retry exponential
 | ||||
| 	if timeout < interval { | ||||
|  |  | |||
|  | @ -9,7 +9,8 @@ import ( | |||
| 	"github.com/Sirupsen/logrus" | ||||
| ) | ||||
| 
 | ||||
| type InfrastructureAccount struct { | ||||
| // InfrastructureAccount defines an account of the team on some infrastructure (i.e AWS, Google) platform.
 | ||||
| type infrastructureAccount struct { | ||||
| 	ID          string `json:"id"` | ||||
| 	Name        string `json:"name"` | ||||
| 	Provider    string `json:"provider"` | ||||
|  | @ -20,7 +21,8 @@ type InfrastructureAccount struct { | |||
| 	Disabled    bool   `json:"disabled"` | ||||
| } | ||||
| 
 | ||||
| type Team struct { | ||||
| // Team defines informaiton for a single team, including the list of members and infrastructure accounts.
 | ||||
| type team struct { | ||||
| 	Dn           string   `json:"dn"` | ||||
| 	ID           string   `json:"id"` | ||||
| 	TeamName     string   `json:"id_name"` | ||||
|  | @ -34,15 +36,17 @@ type Team struct { | |||
| 	DeliveryLead string   `json:"delivery_lead"` | ||||
| 	ParentTeamID string   `json:"parent_team_id"` | ||||
| 
 | ||||
| 	InfrastructureAccounts []InfrastructureAccount `json:"infrastructure-accounts"` | ||||
| 	InfrastructureAccounts []infrastructureAccount `json:"infrastructure-accounts"` | ||||
| } | ||||
| 
 | ||||
| //
 | ||||
| type API struct { | ||||
| 	url        string | ||||
| 	httpClient *http.Client | ||||
| 	logger     *logrus.Entry | ||||
| } | ||||
| 
 | ||||
| // NewTeamsAPI creates an object to query the team API.
 | ||||
| func NewTeamsAPI(url string, log *logrus.Logger) *API { | ||||
| 	t := API{ | ||||
| 		url:        strings.TrimRight(url, "/"), | ||||
|  | @ -53,7 +57,8 @@ func NewTeamsAPI(url string, log *logrus.Logger) *API { | |||
| 	return &t | ||||
| } | ||||
| 
 | ||||
| func (t *API) TeamInfo(teamID, token string) (*Team, error) { | ||||
| // TeamInfo returns information about a given team using its ID and a token to authenticate to the API service.
 | ||||
| func (t *API) TeamInfo(teamID, token string) (*team, error) { | ||||
| 	url := fmt.Sprintf("%s/teams/%s", t.url, teamID) | ||||
| 	t.logger.Debugf("Request url: %s", url) | ||||
| 	req, err := http.NewRequest("GET", url, nil) | ||||
|  | @ -81,7 +86,7 @@ func (t *API) TeamInfo(teamID, token string) (*Team, error) { | |||
| 
 | ||||
| 		return nil, fmt.Errorf("team API query failed with status code %d", resp.StatusCode) | ||||
| 	} | ||||
| 	teamInfo := &Team{} | ||||
| 	teamInfo := &team{} | ||||
| 	d := json.NewDecoder(resp.Body) | ||||
| 	err = d.Decode(teamInfo) | ||||
| 	if err != nil { | ||||
|  |  | |||
|  | @ -17,7 +17,7 @@ var ( | |||
| var teamsAPItc = []struct { | ||||
| 	in     string | ||||
| 	inCode int | ||||
| 	out    *Team | ||||
| 	out    *team | ||||
| 	err    error | ||||
| }{ | ||||
| 	{`{ | ||||
|  | @ -66,7 +66,7 @@ var teamsAPItc = []struct { | |||
| "parent_team_id": "111221" | ||||
| }`, | ||||
| 		200, | ||||
| 		&Team{ | ||||
| 		&team{ | ||||
| 			Dn:           "cn=100100,ou=official,ou=foobar,dc=zalando,dc=net", | ||||
| 			ID:           "acid", | ||||
| 			TeamName:     "ACID", | ||||
|  | @ -79,7 +79,7 @@ var teamsAPItc = []struct { | |||
| 			CostCenter:   "00099999", | ||||
| 			DeliveryLead: "member4", | ||||
| 			ParentTeamID: "111221", | ||||
| 			InfrastructureAccounts: []InfrastructureAccount{ | ||||
| 			InfrastructureAccounts: []infrastructureAccount{ | ||||
| 				{ | ||||
| 					ID:          "1234512345", | ||||
| 					Name:        "acid", | ||||
|  |  | |||
|  | @ -18,9 +18,14 @@ const ( | |||
| 	inRoleTemplate   = `IN ROLE %s` | ||||
| ) | ||||
| 
 | ||||
| // DefaultUserSyncStrategy implements a user sync strategy that merges already existing database users
 | ||||
| // with those defined in the manifest, altering existing users when necessary. It will never strips
 | ||||
| // an existing roles of another role membership, nor it removes the already assigned flag
 | ||||
| // (except for the NOLOGIN). TODO: process other NOflags, i.e. NOSUPERUSER correctly.
 | ||||
| type DefaultUserSyncStrategy struct { | ||||
| } | ||||
| 
 | ||||
| // ProduceSyncRequests figures out the types of changes that need to happen with the given users.
 | ||||
| func (s DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserMap, | ||||
| 	newUsers spec.PgUserMap) (reqs []spec.PgSyncUserRequest) { | ||||
| 
 | ||||
|  | @ -55,6 +60,7 @@ func (s DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserMap, | |||
| 	return | ||||
| } | ||||
| 
 | ||||
| // ExecuteSyncRequests makes actual database changes from the requests passed in its arguments.
 | ||||
| func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserRequest, db *sql.DB) error { | ||||
| 	for _, r := range reqs { | ||||
| 		switch r.Kind { | ||||
|  |  | |||
|  | @ -3,7 +3,6 @@ package util | |||
| import ( | ||||
| 	"crypto/md5" | ||||
| 	"encoding/hex" | ||||
| 	"fmt" | ||||
| 	"math/rand" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | @ -24,6 +23,7 @@ func init() { | |||
| 	rand.Seed(int64(time.Now().Unix())) | ||||
| } | ||||
| 
 | ||||
| // RandomPassword generates random alphanumeric password of a given length.
 | ||||
| func RandomPassword(n int) string { | ||||
| 	b := make([]byte, n) | ||||
| 	for i := range b { | ||||
|  | @ -33,6 +33,7 @@ func RandomPassword(n int) string { | |||
| 	return string(b) | ||||
| } | ||||
| 
 | ||||
| // NameFromMeta converts a metadata object to the NamespacedName name representation.
 | ||||
| func NameFromMeta(meta v1.ObjectMeta) spec.NamespacedName { | ||||
| 	return spec.NamespacedName{ | ||||
| 		Namespace: meta.Namespace, | ||||
|  | @ -40,6 +41,7 @@ func NameFromMeta(meta v1.ObjectMeta) spec.NamespacedName { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // PGUserPassword is used to generate md5 password hash for a given user. It does nothing for already hashed passwords.
 | ||||
| func PGUserPassword(user spec.PgUser) string { | ||||
| 	if (len(user.Password) == md5.Size*2+len(md5prefix) && user.Password[:3] == md5prefix) || user.Password == "" { | ||||
| 		// Avoid processing already encrypted or empty passwords
 | ||||
|  | @ -49,17 +51,14 @@ func PGUserPassword(user spec.PgUser) string { | |||
| 	return md5prefix + hex.EncodeToString(s[:]) | ||||
| } | ||||
| 
 | ||||
| func Pretty(x interface{}) (f fmt.Formatter) { | ||||
| 	return pretty.Formatter(x) | ||||
| } | ||||
| 
 | ||||
| // PrettyDiff shows the diff between 2 objects in an easy to understand format. It is mainly used for debugging output.
 | ||||
| func PrettyDiff(a, b interface{}) (result string) { | ||||
| 	diff := pretty.Diff(a, b) | ||||
| 	return strings.Join(diff, "\n") | ||||
| } | ||||
| 
 | ||||
| // SubstractStringSlices finds elements in a that are not in b and return them as a result slice.
 | ||||
| func SubstractStringSlices(a []string, b []string) (result []string, equal bool) { | ||||
| 	// Find elements in a that are not in b and return them as a result slice
 | ||||
| 	// Slices are assumed to contain unique elements only
 | ||||
| OUTER: | ||||
| 	for _, vala := range a { | ||||
|  |  | |||
|  | @ -13,10 +13,12 @@ import ( | |||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| ) | ||||
| 
 | ||||
| // EBSVolumeResizer implements volume resizing interface for AWS EBS volumes.
 | ||||
| type EBSVolumeResizer struct { | ||||
| 	connection *ec2.EC2 | ||||
| } | ||||
| 
 | ||||
| // ConnectToProvider connects to AWS.
 | ||||
| func (c *EBSVolumeResizer) ConnectToProvider() error { | ||||
| 	sess, err := session.NewSession(&aws.Config{Region: aws.String(constants.AWS_REGION)}) | ||||
| 	if err != nil { | ||||
|  | @ -26,10 +28,12 @@ func (c *EBSVolumeResizer) ConnectToProvider() error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // IsConnectedToProvider checks if AWS connection is established.
 | ||||
| func (c *EBSVolumeResizer) IsConnectedToProvider() bool { | ||||
| 	return c.connection != nil | ||||
| } | ||||
| 
 | ||||
| // VolumeBelongsToProvider checks if the given persistent volume is backed by EBS.
 | ||||
| func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool { | ||||
| 	return pv.Spec.AWSElasticBlockStore != nil && pv.Annotations[constants.VolumeStorateProvisionerAnnotation] == constants.EBSProvisioner | ||||
| } | ||||
|  | @ -47,6 +51,7 @@ func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, | |||
| 	return volumeID[idx:], nil | ||||
| } | ||||
| 
 | ||||
| // ResizeVolume actually calls AWS API to resize the EBS volume if necessary.
 | ||||
| func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { | ||||
| 	/* first check if the volume is already of a requested size */ | ||||
| 	volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeId}}) | ||||
|  | @ -89,7 +94,8 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { | |||
| 				return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) | ||||
| 			} | ||||
| 			if *out.VolumesModifications[0].VolumeId != volumeId { | ||||
| 				return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"") | ||||
| 				return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"", | ||||
| 					*out.VolumesModifications[0].VolumeId, volumeId) | ||||
| 			} | ||||
| 			return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil | ||||
| 		}) | ||||
|  |  | |||
|  | @ -4,6 +4,7 @@ import ( | |||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| ) | ||||
| 
 | ||||
| // VolumeResizer defines the set of methods used to implememnt provider-specific resizing of persistent volumes.
 | ||||
| type VolumeResizer interface { | ||||
| 	ConnectToProvider() error | ||||
| 	IsConnectedToProvider() bool | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue