discard cluster events from the queue on cluster delete;
delete cluster from the clusters map before deleting cluster itself
This commit is contained in:
		
							parent
							
								
									f2c23021bb
								
							
						
					
					
						commit
						83760ebbef
					
				|  | @ -193,26 +193,20 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { | ||||||
| 		cl.Error = nil | 		cl.Error = nil | ||||||
| 		lg.Infoln("cluster has been updated") | 		lg.Infoln("cluster has been updated") | ||||||
| 	case spec.EventDelete: | 	case spec.EventDelete: | ||||||
| 		teamName := strings.ToLower(cl.Spec.TeamID) |  | ||||||
| 
 |  | ||||||
| 		lg.Infoln("Deletion of the cluster started") |  | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			lg.Errorf("unknown cluster: %q", clusterName) | 			lg.Errorf("unknown cluster: %q", clusterName) | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  | 		lg.Infoln("deletion of the cluster started") | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Delete(); err != nil { | 		teamName := strings.ToLower(cl.Spec.TeamID) | ||||||
| 			lg.Errorf("could not delete cluster: %v", err) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		func() { | 		func() { | ||||||
| 			defer c.clustersMu.Unlock() | 			defer c.clustersMu.Unlock() | ||||||
| 			c.clustersMu.Lock() | 			c.clustersMu.Lock() | ||||||
| 
 | 
 | ||||||
| 			delete(c.clusters, clusterName) | 			delete(c.clusters, clusterName) | ||||||
| 			delete(c.clusterLogs, clusterName) | 			delete(c.clusterLogs, clusterName) | ||||||
| 			for i, val := range c.teamClusters[teamName] { // on relativel
 | 			for i, val := range c.teamClusters[teamName] { | ||||||
| 				if val == clusterName { | 				if val == clusterName { | ||||||
| 					copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:]) | 					copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:]) | ||||||
| 					c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{} | 					c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{} | ||||||
|  | @ -222,6 +216,11 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { | ||||||
| 			} | 			} | ||||||
| 		}() | 		}() | ||||||
| 
 | 
 | ||||||
|  | 		if err := cl.Delete(); err != nil { | ||||||
|  | 			lg.Errorf("could not delete cluster: %v", err) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		lg.Infof("cluster has been deleted") | 		lg.Infof("cluster has been deleted") | ||||||
| 	case spec.EventSync: | 	case spec.EventSync: | ||||||
| 		lg.Infof("syncing of the cluster started") | 		lg.Infof("syncing of the cluster started") | ||||||
|  | @ -305,13 +304,35 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec | ||||||
| 		NewSpec:   new, | 		NewSpec:   new, | ||||||
| 		WorkerID:  workerID, | 		WorkerID:  workerID, | ||||||
| 	} | 	} | ||||||
| 	//TODO: if we delete cluster, discard all the previous events for the cluster
 |  | ||||||
| 
 | 
 | ||||||
| 	lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) | 	lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) | ||||||
| 	if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { | 	if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { | ||||||
| 		lg.Errorf("error when queueing cluster event: %v", clusterEvent) | 		lg.Errorf("error while queueing cluster event: %v", clusterEvent) | ||||||
| 	} | 	} | ||||||
| 	lg.Infof("%q event has been queued", eventType) | 	lg.Infof("%q event has been queued", eventType) | ||||||
|  | 
 | ||||||
|  | 	if eventType != spec.EventDelete { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate} { | ||||||
|  | 		obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid)) | ||||||
|  | 		if err != nil { | ||||||
|  | 			lg.Warningf("could not get event from the queue: %v", err) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if !exists { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		err = c.clusterEventQueues[workerID].Delete(obj) | ||||||
|  | 		if err != nil { | ||||||
|  | 			lg.Warningf("could not delete event from the queue: %v", err) | ||||||
|  | 		} else { | ||||||
|  | 			lg.Debugf("event %q has been discarded for the cluster", evType) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Controller) postgresqlAdd(obj interface{}) { | func (c *Controller) postgresqlAdd(obj interface{}) { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue