skip clusters with invalid spec
This commit is contained in:
		
							parent
							
								
									1d2fb0091f
								
							
						
					
					
						commit
						356be8f0f1
					
				|  | @ -369,7 +369,10 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | |||
| 		//TODO: update PVC
 | ||||
| 	} | ||||
| 
 | ||||
| 	newStatefulSet := c.genStatefulSet(newSpec.Spec) | ||||
| 	newStatefulSet, err := c.genStatefulSet(newSpec.Spec) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Can't generate StatefulSet: %s", err) | ||||
| 	} | ||||
| 	sameSS, rollingUpdate, reason := c.compareStatefulSetWith(newStatefulSet) | ||||
| 
 | ||||
| 	if !sameSS { | ||||
|  |  | |||
|  | @ -43,7 +43,9 @@ type spiloConfiguration struct { | |||
| 	Bootstrap            pgBootstrap            `json:"bootstrap"` | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) resourceRequirements(resources spec.Resources) *v1.ResourceRequirements { | ||||
| func (c *Cluster) resourceRequirements(resources spec.Resources) (*v1.ResourceRequirements, error) { | ||||
| 	var err error | ||||
| 
 | ||||
| 	specRequests := resources.ResourceRequest | ||||
| 	specLimits := resources.ResourceLimits | ||||
| 
 | ||||
|  | @ -54,26 +56,47 @@ func (c *Cluster) resourceRequirements(resources spec.Resources) *v1.ResourceReq | |||
| 
 | ||||
| 	result := v1.ResourceRequirements{} | ||||
| 
 | ||||
| 	result.Requests = fillResourceList(specRequests, defaultRequests) | ||||
| 	result.Limits = fillResourceList(specLimits, defaultLimits) | ||||
| 	result.Requests, err = fillResourceList(specRequests, defaultRequests) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("Can't fill resource requests: %s", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return &result | ||||
| 	result.Limits, err = fillResourceList(specLimits, defaultLimits) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("Can't fill resource limits: %s", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return &result, nil | ||||
| } | ||||
| 
 | ||||
| func fillResourceList(spec spec.ResourceDescription, defaults spec.ResourceDescription) v1.ResourceList { | ||||
| func fillResourceList(spec spec.ResourceDescription, defaults spec.ResourceDescription) (v1.ResourceList, error) { | ||||
| 	var err error | ||||
| 	requests := v1.ResourceList{} | ||||
| 
 | ||||
| 	if spec.Cpu != "" { | ||||
| 		requests[v1.ResourceCPU] = resource.MustParse(spec.Cpu) | ||||
| 		requests[v1.ResourceCPU], err = resource.ParseQuantity(spec.Cpu) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("Can't parse CPU quantity: %s", err) | ||||
| 		} | ||||
| 	} else { | ||||
| 		requests[v1.ResourceCPU] = resource.MustParse(defaults.Cpu) | ||||
| 		requests[v1.ResourceCPU], err = resource.ParseQuantity(defaults.Cpu) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("Can't parse default CPU quantity: %s", err) | ||||
| 		} | ||||
| 	} | ||||
| 	if spec.Memory != "" { | ||||
| 		requests[v1.ResourceMemory] = resource.MustParse(spec.Memory) | ||||
| 	} else { | ||||
| 		requests[v1.ResourceMemory] = resource.MustParse(defaults.Memory) | ||||
| 		requests[v1.ResourceMemory], err = resource.ParseQuantity(spec.Memory) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("Can't parse memory quantity: %s", err) | ||||
| 		} | ||||
| 	return requests | ||||
| 	} else { | ||||
| 		requests[v1.ResourceMemory], err = resource.ParseQuantity(defaults.Memory) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("Can't parse default memory quantity: %s", err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return requests, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) generateSpiloJSONConfiguration(pg *spec.PostgresqlParam, patroni *spec.Patroni) string { | ||||
|  | @ -170,7 +193,6 @@ PATRONI_INITDB_PARAMS: | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec { | ||||
| 
 | ||||
| 	spiloConfiguration := c.generateSpiloJSONConfiguration(pgParameters, patroniParameters) | ||||
| 
 | ||||
| 	envVars := []v1.EnvVar{ | ||||
|  | @ -290,10 +312,17 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, | |||
| 	return &template | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) *v1beta1.StatefulSet { | ||||
| 	resourceRequirements := c.resourceRequirements(spec.Resources) | ||||
| func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) { | ||||
| 	resourceRequirements, err := c.resourceRequirements(spec.Resources) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	podTemplate := c.genPodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni) | ||||
| 	volumeClaimTemplate := persistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) | ||||
| 	volumeClaimTemplate, err := persistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	statefulSet := &v1beta1.StatefulSet{ | ||||
| 		ObjectMeta: v1.ObjectMeta{ | ||||
|  | @ -309,10 +338,10 @@ func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) *v1beta1.StatefulSet { | |||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	return statefulSet | ||||
| 	return statefulSet, nil | ||||
| } | ||||
| 
 | ||||
| func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) *v1.PersistentVolumeClaim { | ||||
| func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { | ||||
| 	metadata := v1.ObjectMeta{ | ||||
| 		Name: constants.DataVolumeName, | ||||
| 	} | ||||
|  | @ -323,18 +352,24 @@ func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) *v1.Pe | |||
| 		metadata.Annotations = map[string]string{"volume.alpha.kubernetes.io/storage-class": "default"} | ||||
| 	} | ||||
| 
 | ||||
| 	quantity, err := resource.ParseQuantity(volumeSize) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("Can't parse volume size: %s", err) | ||||
| 	} | ||||
| 
 | ||||
| 	volumeClaim := &v1.PersistentVolumeClaim{ | ||||
| 		ObjectMeta: metadata, | ||||
| 		Spec: v1.PersistentVolumeClaimSpec{ | ||||
| 			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, | ||||
| 			Resources: v1.ResourceRequirements{ | ||||
| 				Requests: v1.ResourceList{ | ||||
| 					v1.ResourceStorage: resource.MustParse(volumeSize), | ||||
| 					v1.ResourceStorage: quantity, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	return volumeClaim | ||||
| 
 | ||||
| 	return volumeClaim, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) { | ||||
|  |  | |||
|  | @ -106,7 +106,10 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { | |||
| 	if c.Statefulset != nil { | ||||
| 		return nil, fmt.Errorf("StatefulSet already exists in the cluster") | ||||
| 	} | ||||
| 	statefulSetSpec := c.genStatefulSet(c.Spec) | ||||
| 	statefulSetSpec, err := c.genStatefulSet(c.Spec) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("Can't generate StatefulSet: %s", err) | ||||
| 	} | ||||
| 	statefulSet, err := c.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(statefulSetSpec) | ||||
| 	if k8sutil.ResourceAlreadyExists(err) { | ||||
| 		return nil, fmt.Errorf("StatefulSet '%s' already exists", util.NameFromMeta(statefulSetSpec.ObjectMeta)) | ||||
|  |  | |||
|  | @ -137,7 +137,11 @@ func (c *Cluster) syncStatefulSet() error { | |||
| 			match  bool | ||||
| 			reason string | ||||
| 		) | ||||
| 		desiredSS := c.genStatefulSet(cSpec) | ||||
| 		desiredSS, err := c.genStatefulSet(cSpec) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("Can't generate StatefulSet: %s", err) | ||||
| 		} | ||||
| 
 | ||||
| 		match, rollUpdate, reason = c.compareStatefulSetWith(desiredSS) | ||||
| 		if match { | ||||
| 			return nil | ||||
|  |  | |||
|  | @ -37,17 +37,30 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e | |||
| 		return nil, fmt.Errorf("Can't extract list of postgresql objects: %s", err) | ||||
| 	} | ||||
| 
 | ||||
| 	var activeClustersCnt, failedClustersCnt int | ||||
| 	for _, obj := range objList { | ||||
| 		pg, ok := obj.(*spec.Postgresql) | ||||
| 		if !ok { | ||||
| 			return nil, fmt.Errorf("Can't cast object to postgresql") | ||||
| 		} | ||||
| 
 | ||||
| 		if pg.Error != nil { | ||||
| 			failedClustersCnt++ | ||||
| 			continue | ||||
| 		} | ||||
| 		c.queueClusterEvent(nil, pg, spec.EventSync) | ||||
| 
 | ||||
| 		c.logger.Debugf("Sync of the '%s' cluster has been queued", util.NameFromMeta(pg.Metadata)) | ||||
| 		activeClustersCnt++ | ||||
| 	} | ||||
| 	if len(objList) > 0 { | ||||
| 		c.logger.Infof("There are %d clusters currently running", len(objList)) | ||||
| 		if failedClustersCnt > 0 && activeClustersCnt == 0 { | ||||
| 			c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) | ||||
| 		} else if failedClustersCnt == 0 && activeClustersCnt > 0 { | ||||
| 			c.logger.Infof("There are %d clusters running", activeClustersCnt) | ||||
| 		} else { | ||||
| 			c.logger.Infof("There are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt) | ||||
| 		} | ||||
| 	} else { | ||||
| 		c.logger.Infof("No clusters running") | ||||
| 	} | ||||
|  | @ -170,15 +183,29 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec | |||
| 	var ( | ||||
| 		uid          types.UID | ||||
| 		clusterName  spec.NamespacedName | ||||
| 		clusterError error | ||||
| 	) | ||||
| 
 | ||||
| 	if old != nil { | ||||
| 	if old != nil { //update, delete
 | ||||
| 		uid = old.Metadata.GetUID() | ||||
| 		clusterName = util.NameFromMeta(old.Metadata) | ||||
| 		if eventType == spec.EventUpdate && new.Error == nil && old != nil { | ||||
| 			eventType = spec.EventAdd | ||||
| 			clusterError = new.Error | ||||
| 		} else { | ||||
| 			clusterError = old.Error | ||||
| 		} | ||||
| 	} else { //add, sync
 | ||||
| 		uid = new.Metadata.GetUID() | ||||
| 		clusterName = util.NameFromMeta(new.Metadata) | ||||
| 		clusterError = new.Error | ||||
| 	} | ||||
| 
 | ||||
| 	if clusterError != nil && eventType != spec.EventDelete { | ||||
| 		c.logger.Debugf("Skipping %s event for invalid cluster %s (reason: %s)", eventType, clusterName, clusterError) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	workerId := c.clusterWorkerId(clusterName) | ||||
| 	clusterEvent := spec.ClusterEvent{ | ||||
| 		EventType: eventType, | ||||
|  |  | |||
|  | @ -38,8 +38,8 @@ type ResourceDescription struct { | |||
| } | ||||
| 
 | ||||
| type Resources struct { | ||||
| 	ResourceRequest ResourceDescription `json:"requests,omitempty""` | ||||
| 	ResourceLimits  ResourceDescription `json:"limits,omitempty""` | ||||
| 	ResourceRequest ResourceDescription `json:"requests,omitempty"` | ||||
| 	ResourceLimits  ResourceDescription `json:"limits,omitempty"` | ||||
| } | ||||
| 
 | ||||
| type Patroni struct { | ||||
|  | @ -62,6 +62,7 @@ const ( | |||
| 	ClusterStatusUpdateFailed PostgresStatus = "UpdateFailed" | ||||
| 	ClusterStatusAddFailed    PostgresStatus = "CreateFailed" | ||||
| 	ClusterStatusRunning      PostgresStatus = "Running" | ||||
| 	ClusterStatusInvalid      PostgresStatus = "Invalid" | ||||
| ) | ||||
| 
 | ||||
| // PostgreSQL Third Party (resource) Object
 | ||||
|  | @ -71,6 +72,7 @@ type Postgresql struct { | |||
| 
 | ||||
| 	Spec   PostgresSpec   `json:"spec"` | ||||
| 	Status PostgresStatus `json:"status"` | ||||
| 	Error  error          `json:"-"` | ||||
| } | ||||
| 
 | ||||
| type PostgresSpec struct { | ||||
|  | @ -189,6 +191,18 @@ func (pl *PostgresqlList) GetListMeta() unversioned.List { | |||
| 	return &pl.Metadata | ||||
| } | ||||
| 
 | ||||
| func clusterName(clusterName string, teamName string) (string, error) { | ||||
| 	teamNameLen := len(teamName) | ||||
| 	if len(clusterName) < teamNameLen+2 { | ||||
| 		return "", fmt.Errorf("Name is too short") | ||||
| 	} | ||||
| 	if strings.ToLower(clusterName[:teamNameLen+1]) != strings.ToLower(teamName)+"-" { | ||||
| 		return "", fmt.Errorf("Name must match {TEAM}-{NAME} format") | ||||
| 	} | ||||
| 
 | ||||
| 	return clusterName[teamNameLen+1:], nil | ||||
| } | ||||
| 
 | ||||
| // The code below is used only to work around a known problem with third-party
 | ||||
| // resources and ugorji. If/when these issues are resolved, the code below
 | ||||
| // should no longer be required.
 | ||||
|  | @ -196,31 +210,31 @@ func (pl *PostgresqlList) GetListMeta() unversioned.List { | |||
| type PostgresqlListCopy PostgresqlList | ||||
| type PostgresqlCopy Postgresql | ||||
| 
 | ||||
| func clusterName(clusterName string, teamName string) (string, error) { | ||||
| 	teamNameLen := len(teamName) | ||||
| 	if len(clusterName) < teamNameLen+2 { | ||||
| 		return "", fmt.Errorf("Name is too short") | ||||
| 	} | ||||
| 	if strings.ToLower(clusterName[:teamNameLen+1]) != strings.ToLower(teamName)+"-" { | ||||
| 		return "", fmt.Errorf("Name must start with the team name and dash") | ||||
| 	} | ||||
| 
 | ||||
| 	return clusterName[teamNameLen+1:], nil | ||||
| } | ||||
| 
 | ||||
| func (p *Postgresql) UnmarshalJSON(data []byte) error { | ||||
| 	tmp := PostgresqlCopy{} | ||||
| 	err := json.Unmarshal(data, &tmp) | ||||
| 	if err != nil { | ||||
| 		metaErr := json.Unmarshal(data, &tmp.Metadata) | ||||
| 		if metaErr != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		tmp.Error = err | ||||
| 		tmp.Status = ClusterStatusInvalid | ||||
| 
 | ||||
| 		*p = Postgresql(tmp) | ||||
| 
 | ||||
| 		return nil | ||||
| 	} | ||||
| 	tmp2 := Postgresql(tmp) | ||||
| 
 | ||||
| 	clusterName, err := clusterName(tmp2.Metadata.Name, tmp2.Spec.TeamId) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err == nil { | ||||
| 		tmp2.Spec.ClusterName = clusterName | ||||
| 	} else { | ||||
| 		tmp2.Error = err | ||||
| 		tmp2.Status = ClusterStatusInvalid | ||||
| 	} | ||||
| 	*p = tmp2 | ||||
| 
 | ||||
| 	return nil | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue