split processEvent function into several functions
This commit is contained in:
		
							parent
							
								
									d39cfaab43
								
							
						
					
					
						commit
						8f5a8e21d3
					
				|  | @ -74,6 +74,94 @@ func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, | ||||||
| 	return req.Watch() | 	return req.Watch() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *Controller) processAddEvent(workerID uint32, clusterName spec.NamespacedName, spec *spec.Postgresql) { | ||||||
|  | 	log := c.logger.WithField("worker", workerID) | ||||||
|  | 
 | ||||||
|  | 	log.Infof("Creation of the '%s' cluster started", clusterName) | ||||||
|  | 
 | ||||||
|  | 	stopCh := make(chan struct{}) | ||||||
|  | 	cl := cluster.New(c.makeClusterConfig(), *spec, log) | ||||||
|  | 	cl.Run(stopCh) | ||||||
|  | 
 | ||||||
|  | 	c.clustersMu.Lock() | ||||||
|  | 	c.clusters[clusterName] = cl | ||||||
|  | 	c.stopChs[clusterName] = stopCh | ||||||
|  | 	c.clustersMu.Unlock() | ||||||
|  | 
 | ||||||
|  | 	if err := cl.Create(); err != nil { | ||||||
|  | 		cl.Error = fmt.Errorf("could not create cluster: %v", err) | ||||||
|  | 		log.Errorf("%v", cl.Error) | ||||||
|  | 
 | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	log.Infof("Cluster '%s' has been created", clusterName) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Controller) processUpdateEvent(workerID uint32, clusterName spec.NamespacedName, cl *cluster.Cluster, newSpec *spec.Postgresql) { | ||||||
|  | 	log := c.logger.WithField("worker", workerID) | ||||||
|  | 
 | ||||||
|  | 	log.Infof("Update of the '%s' cluster started", clusterName) | ||||||
|  | 
 | ||||||
|  | 	if err := cl.Update(newSpec); err != nil { | ||||||
|  | 		cl.Error = fmt.Errorf("could not update cluster: %s", err) | ||||||
|  | 		log.Errorf("%v", cl.Error) | ||||||
|  | 
 | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	cl.Error = nil | ||||||
|  | 
 | ||||||
|  | 	log.Infof("Cluster '%s' has been updated", clusterName) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Controller) processDeleteEvent(workerID uint32, clusterName spec.NamespacedName, cl *cluster.Cluster) { | ||||||
|  | 	log := c.logger.WithField("worker", workerID) | ||||||
|  | 
 | ||||||
|  | 	log.Infof("Deletion of the '%s' cluster started", clusterName) | ||||||
|  | 
 | ||||||
|  | 	if err := cl.Delete(); err != nil { | ||||||
|  | 		log.Errorf("could not delete cluster '%s': %s", clusterName, err) | ||||||
|  | 
 | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	close(c.stopChs[clusterName]) | ||||||
|  | 
 | ||||||
|  | 	c.clustersMu.Lock() | ||||||
|  | 	delete(c.clusters, clusterName) | ||||||
|  | 	delete(c.stopChs, clusterName) | ||||||
|  | 	c.clustersMu.Unlock() | ||||||
|  | 
 | ||||||
|  | 	log.Infof("Cluster '%s' has been deleted", clusterName) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Controller) processSyncEvent(workerID uint32, cl *cluster.Cluster, clusterFound bool, clusterName spec.NamespacedName, newSpec *spec.Postgresql) { | ||||||
|  | 	log := c.logger.WithField("worker", workerID) | ||||||
|  | 
 | ||||||
|  | 	log.Infof("Syncing of the '%s' cluster started", clusterName) | ||||||
|  | 
 | ||||||
|  | 	// no race condition because a cluster is always processed by single worker
 | ||||||
|  | 	if !clusterFound { | ||||||
|  | 		stopCh := make(chan struct{}) | ||||||
|  | 		cl := cluster.New(c.makeClusterConfig(), *newSpec, log) | ||||||
|  | 		cl.Run(stopCh) | ||||||
|  | 
 | ||||||
|  | 		c.clustersMu.Lock() | ||||||
|  | 		c.clusters[clusterName] = cl | ||||||
|  | 		c.stopChs[clusterName] = stopCh | ||||||
|  | 		c.clustersMu.Unlock() | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := cl.Sync(); err != nil { | ||||||
|  | 		cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) | ||||||
|  | 		log.Errorf("%v", cl) | ||||||
|  | 
 | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	cl.Error = nil | ||||||
|  | 
 | ||||||
|  | 	log.Infof("Cluster '%s' has been synced", clusterName) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *Controller) processEvent(obj interface{}) error { | func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 	var clusterName spec.NamespacedName | 	var clusterName spec.NamespacedName | ||||||
| 
 | 
 | ||||||
|  | @ -81,7 +169,7 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		return fmt.Errorf("could not cast to ClusterEvent") | 		return fmt.Errorf("could not cast to ClusterEvent") | ||||||
| 	} | 	} | ||||||
| 	logger := c.logger.WithField("worker", event.WorkerID) | 	log := c.logger.WithField("worker", event.WorkerID) | ||||||
| 
 | 
 | ||||||
| 	if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { | 	if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { | ||||||
| 		clusterName = util.NameFromMeta(event.NewSpec.Metadata) | 		clusterName = util.NameFromMeta(event.NewSpec.Metadata) | ||||||
|  | @ -96,86 +184,27 @@ func (c *Controller) processEvent(obj interface{}) error { | ||||||
| 	switch event.EventType { | 	switch event.EventType { | ||||||
| 	case spec.EventAdd: | 	case spec.EventAdd: | ||||||
| 		if clusterFound { | 		if clusterFound { | ||||||
| 			logger.Debugf("Cluster '%s' already exists", clusterName) | 			log.Debugf("Cluster '%s' already exists", clusterName) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		logger.Infof("Creation of the '%s' cluster started", clusterName) | 		c.processAddEvent(event.WorkerID, clusterName, event.NewSpec) | ||||||
| 
 |  | ||||||
| 		stopCh := make(chan struct{}) |  | ||||||
| 		cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) |  | ||||||
| 		cl.Run(stopCh) |  | ||||||
| 
 |  | ||||||
| 		c.clustersMu.Lock() |  | ||||||
| 		c.clusters[clusterName] = cl |  | ||||||
| 		c.stopChs[clusterName] = stopCh |  | ||||||
| 		c.clustersMu.Unlock() |  | ||||||
| 
 |  | ||||||
| 		if err := cl.Create(); err != nil { |  | ||||||
| 			cl.Error = fmt.Errorf("could not create cluster: %v", err) |  | ||||||
| 			logger.Errorf("%v", cl.Error) |  | ||||||
| 
 |  | ||||||
| 			return nil |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		logger.Infof("Cluster '%s' has been created", clusterName) |  | ||||||
| 	case spec.EventUpdate: | 	case spec.EventUpdate: | ||||||
| 		logger.Infof("Update of the '%s' cluster started", clusterName) |  | ||||||
| 
 |  | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			logger.Warnf("Cluster '%s' does not exist", clusterName) | 			log.Warnf("Cluster '%s' does not exist", clusterName) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		if err := cl.Update(event.NewSpec); err != nil { |  | ||||||
| 			cl.Error = fmt.Errorf("could not update cluster: %s", err) |  | ||||||
| 			logger.Errorf("%v", cl.Error) |  | ||||||
| 
 | 
 | ||||||
| 			return nil | 		c.processUpdateEvent(event.WorkerID, clusterName, cl, event.NewSpec) | ||||||
| 		} |  | ||||||
| 		cl.Error = nil |  | ||||||
| 		logger.Infof("Cluster '%s' has been updated", clusterName) |  | ||||||
| 	case spec.EventDelete: | 	case spec.EventDelete: | ||||||
| 		logger.Infof("Deletion of the '%s' cluster started", clusterName) |  | ||||||
| 		if !clusterFound { | 		if !clusterFound { | ||||||
| 			logger.Errorf("Unknown cluster: %s", clusterName) | 			log.Errorf("Unknown cluster: %s", clusterName) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if err := cl.Delete(); err != nil { | 		c.processDeleteEvent(event.WorkerID, clusterName, cl) | ||||||
| 			logger.Errorf("could not delete cluster '%s': %s", clusterName, err) |  | ||||||
| 			return nil |  | ||||||
| 		} |  | ||||||
| 		close(c.stopChs[clusterName]) |  | ||||||
| 
 |  | ||||||
| 		c.clustersMu.Lock() |  | ||||||
| 		delete(c.clusters, clusterName) |  | ||||||
| 		delete(c.stopChs, clusterName) |  | ||||||
| 		c.clustersMu.Unlock() |  | ||||||
| 
 |  | ||||||
| 		logger.Infof("Cluster '%s' has been deleted", clusterName) |  | ||||||
| 	case spec.EventSync: | 	case spec.EventSync: | ||||||
| 		logger.Infof("Syncing of the '%s' cluster started", clusterName) | 		c.processSyncEvent(event.WorkerID, cl, clusterFound, clusterName, event.NewSpec) | ||||||
| 
 |  | ||||||
| 		// no race condition because a cluster is always processed by single worker
 |  | ||||||
| 		if !clusterFound { |  | ||||||
| 			stopCh := make(chan struct{}) |  | ||||||
| 			cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) |  | ||||||
| 			cl.Run(stopCh) |  | ||||||
| 
 |  | ||||||
| 			c.clustersMu.Lock() |  | ||||||
| 			c.clusters[clusterName] = cl |  | ||||||
| 			c.stopChs[clusterName] = stopCh |  | ||||||
| 			c.clustersMu.Unlock() |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if err := cl.Sync(); err != nil { |  | ||||||
| 			cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) |  | ||||||
| 			logger.Errorf("%v", cl) |  | ||||||
| 			return nil |  | ||||||
| 		} |  | ||||||
| 		cl.Error = nil |  | ||||||
| 
 |  | ||||||
| 		logger.Infof("Cluster '%s' has been synced", clusterName) |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue