pass error from process event function to the PopProcessFunc func
This commit is contained in:
		
							parent
							
								
									8f5a8e21d3
								
							
						
					
					
						commit
						0baae91a0e
					
				|  | @ -74,7 +74,7 @@ func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, | |||
| 	return req.Watch() | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) processAddEvent(workerID uint32, clusterName spec.NamespacedName, spec *spec.Postgresql) { | ||||
| func (c *Controller) processAddEvent(workerID uint32, clusterName spec.NamespacedName, spec *spec.Postgresql) error { | ||||
| 	log := c.logger.WithField("worker", workerID) | ||||
| 
 | ||||
| 	log.Infof("Creation of the '%s' cluster started", clusterName) | ||||
|  | @ -92,13 +92,14 @@ func (c *Controller) processAddEvent(workerID uint32, clusterName spec.Namespace | |||
| 		cl.Error = fmt.Errorf("could not create cluster: %v", err) | ||||
| 		log.Errorf("%v", cl.Error) | ||||
| 
 | ||||
| 		return | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	log.Infof("Cluster '%s' has been created", clusterName) | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) processUpdateEvent(workerID uint32, clusterName spec.NamespacedName, cl *cluster.Cluster, newSpec *spec.Postgresql) { | ||||
| func (c *Controller) processUpdateEvent(workerID uint32, clusterName spec.NamespacedName, cl *cluster.Cluster, newSpec *spec.Postgresql) error { | ||||
| 	log := c.logger.WithField("worker", workerID) | ||||
| 
 | ||||
| 	log.Infof("Update of the '%s' cluster started", clusterName) | ||||
|  | @ -107,14 +108,16 @@ func (c *Controller) processUpdateEvent(workerID uint32, clusterName spec.Namesp | |||
| 		cl.Error = fmt.Errorf("could not update cluster: %s", err) | ||||
| 		log.Errorf("%v", cl.Error) | ||||
| 
 | ||||
| 		return | ||||
| 		return nil | ||||
| 	} | ||||
| 	cl.Error = nil | ||||
| 
 | ||||
| 	log.Infof("Cluster '%s' has been updated", clusterName) | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) processDeleteEvent(workerID uint32, clusterName spec.NamespacedName, cl *cluster.Cluster) { | ||||
| func (c *Controller) processDeleteEvent(workerID uint32, clusterName spec.NamespacedName, cl *cluster.Cluster) error { | ||||
| 	log := c.logger.WithField("worker", workerID) | ||||
| 
 | ||||
| 	log.Infof("Deletion of the '%s' cluster started", clusterName) | ||||
|  | @ -122,7 +125,7 @@ func (c *Controller) processDeleteEvent(workerID uint32, clusterName spec.Namesp | |||
| 	if err := cl.Delete(); err != nil { | ||||
| 		log.Errorf("could not delete cluster '%s': %s", clusterName, err) | ||||
| 
 | ||||
| 		return | ||||
| 		return nil | ||||
| 	} | ||||
| 	close(c.stopChs[clusterName]) | ||||
| 
 | ||||
|  | @ -132,9 +135,11 @@ func (c *Controller) processDeleteEvent(workerID uint32, clusterName spec.Namesp | |||
| 	c.clustersMu.Unlock() | ||||
| 
 | ||||
| 	log.Infof("Cluster '%s' has been deleted", clusterName) | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) processSyncEvent(workerID uint32, cl *cluster.Cluster, clusterFound bool, clusterName spec.NamespacedName, newSpec *spec.Postgresql) { | ||||
| func (c *Controller) processSyncEvent(workerID uint32, cl *cluster.Cluster, clusterFound bool, clusterName spec.NamespacedName, newSpec *spec.Postgresql) error { | ||||
| 	log := c.logger.WithField("worker", workerID) | ||||
| 
 | ||||
| 	log.Infof("Syncing of the '%s' cluster started", clusterName) | ||||
|  | @ -155,11 +160,13 @@ func (c *Controller) processSyncEvent(workerID uint32, cl *cluster.Cluster, clus | |||
| 		cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) | ||||
| 		log.Errorf("%v", cl) | ||||
| 
 | ||||
| 		return | ||||
| 		return nil | ||||
| 	} | ||||
| 	cl.Error = nil | ||||
| 
 | ||||
| 	log.Infof("Cluster '%s' has been synced", clusterName) | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) processEvent(obj interface{}) error { | ||||
|  | @ -188,25 +195,27 @@ func (c *Controller) processEvent(obj interface{}) error { | |||
| 			return nil | ||||
| 		} | ||||
| 
 | ||||
| 		c.processAddEvent(event.WorkerID, clusterName, event.NewSpec) | ||||
| 		return c.processAddEvent(event.WorkerID, clusterName, event.NewSpec) | ||||
| 	case spec.EventUpdate: | ||||
| 		if !clusterFound { | ||||
| 			log.Warnf("Cluster '%s' does not exist", clusterName) | ||||
| 			return nil | ||||
| 		} | ||||
| 
 | ||||
| 		c.processUpdateEvent(event.WorkerID, clusterName, cl, event.NewSpec) | ||||
| 		return c.processUpdateEvent(event.WorkerID, clusterName, cl, event.NewSpec) | ||||
| 	case spec.EventDelete: | ||||
| 		if !clusterFound { | ||||
| 			log.Errorf("Unknown cluster: %s", clusterName) | ||||
| 			return nil | ||||
| 		} | ||||
| 
 | ||||
| 		c.processDeleteEvent(event.WorkerID, clusterName, cl) | ||||
| 		return c.processDeleteEvent(event.WorkerID, clusterName, cl) | ||||
| 	case spec.EventSync: | ||||
| 		c.processSyncEvent(event.WorkerID, cl, clusterFound, clusterName, event.NewSpec) | ||||
| 		return c.processSyncEvent(event.WorkerID, cl, clusterFound, clusterName, event.NewSpec) | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Errorf("unknown event type: %v", event.EventType) | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue