From 8f5a8e21d3b85f0235541e6d3266153ac354b52a Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 29 May 2017 15:25:36 +0200 Subject: [PATCH] split processEvent function into several functions --- pkg/controller/postgresql.go | 163 +++++++++++++++++++++-------------- 1 file changed, 96 insertions(+), 67 deletions(-) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 5e1919ecb..fcdb8403c 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -74,6 +74,94 @@ func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, return req.Watch() } +func (c *Controller) processAddEvent(workerID uint32, clusterName spec.NamespacedName, spec *spec.Postgresql) { + log := c.logger.WithField("worker", workerID) + + log.Infof("Creation of the '%s' cluster started", clusterName) + + stopCh := make(chan struct{}) + cl := cluster.New(c.makeClusterConfig(), *spec, log) + cl.Run(stopCh) + + c.clustersMu.Lock() + c.clusters[clusterName] = cl + c.stopChs[clusterName] = stopCh + c.clustersMu.Unlock() + + if err := cl.Create(); err != nil { + cl.Error = fmt.Errorf("could not create cluster: %v", err) + log.Errorf("%v", cl.Error) + + return + } + + log.Infof("Cluster '%s' has been created", clusterName) +} + +func (c *Controller) processUpdateEvent(workerID uint32, clusterName spec.NamespacedName, cl *cluster.Cluster, newSpec *spec.Postgresql) { + log := c.logger.WithField("worker", workerID) + + log.Infof("Update of the '%s' cluster started", clusterName) + + if err := cl.Update(newSpec); err != nil { + cl.Error = fmt.Errorf("could not update cluster: %s", err) + log.Errorf("%v", cl.Error) + + return + } + cl.Error = nil + + log.Infof("Cluster '%s' has been updated", clusterName) +} + +func (c *Controller) processDeleteEvent(workerID uint32, clusterName spec.NamespacedName, cl *cluster.Cluster) { + log := c.logger.WithField("worker", workerID) + + log.Infof("Deletion of the '%s' cluster started", clusterName) + + if err := cl.Delete(); err != nil { + log.Errorf("could not delete cluster '%s': %s", clusterName, err) + + return + } + close(c.stopChs[clusterName]) + + c.clustersMu.Lock() + delete(c.clusters, clusterName) + delete(c.stopChs, clusterName) + c.clustersMu.Unlock() + + log.Infof("Cluster '%s' has been deleted", clusterName) +} + +func (c *Controller) processSyncEvent(workerID uint32, cl *cluster.Cluster, clusterFound bool, clusterName spec.NamespacedName, newSpec *spec.Postgresql) { + log := c.logger.WithField("worker", workerID) + + log.Infof("Syncing of the '%s' cluster started", clusterName) + + // no race condition because a cluster is always processed by single worker + if !clusterFound { + stopCh := make(chan struct{}) + cl := cluster.New(c.makeClusterConfig(), *newSpec, log) + cl.Run(stopCh) + + c.clustersMu.Lock() + c.clusters[clusterName] = cl + c.stopChs[clusterName] = stopCh + c.clustersMu.Unlock() + } + + if err := cl.Sync(); err != nil { + cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) + log.Errorf("%v", cl) + + return + } + cl.Error = nil + + log.Infof("Cluster '%s' has been synced", clusterName) +} + func (c *Controller) processEvent(obj interface{}) error { var clusterName spec.NamespacedName @@ -81,7 +169,7 @@ func (c *Controller) processEvent(obj interface{}) error { if !ok { return fmt.Errorf("could not cast to ClusterEvent") } - logger := c.logger.WithField("worker", event.WorkerID) + log := c.logger.WithField("worker", event.WorkerID) if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { clusterName = util.NameFromMeta(event.NewSpec.Metadata) @@ -96,86 +184,27 @@ func (c *Controller) processEvent(obj interface{}) error { switch event.EventType { case spec.EventAdd: if clusterFound { - logger.Debugf("Cluster '%s' already exists", clusterName) + log.Debugf("Cluster '%s' already exists", clusterName) return nil } - logger.Infof("Creation of the '%s' cluster started", clusterName) - - stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) - cl.Run(stopCh) - - c.clustersMu.Lock() - c.clusters[clusterName] = cl - c.stopChs[clusterName] = stopCh - c.clustersMu.Unlock() - - if err := cl.Create(); err != nil { - cl.Error = fmt.Errorf("could not create cluster: %v", err) - logger.Errorf("%v", cl.Error) - - return nil - } - - logger.Infof("Cluster '%s' has been created", clusterName) + c.processAddEvent(event.WorkerID, clusterName, event.NewSpec) case spec.EventUpdate: - logger.Infof("Update of the '%s' cluster started", clusterName) - if !clusterFound { - logger.Warnf("Cluster '%s' does not exist", clusterName) + log.Warnf("Cluster '%s' does not exist", clusterName) return nil } - if err := cl.Update(event.NewSpec); err != nil { - cl.Error = fmt.Errorf("could not update cluster: %s", err) - logger.Errorf("%v", cl.Error) - return nil - } - cl.Error = nil - logger.Infof("Cluster '%s' has been updated", clusterName) + c.processUpdateEvent(event.WorkerID, clusterName, cl, event.NewSpec) case spec.EventDelete: - logger.Infof("Deletion of the '%s' cluster started", clusterName) if !clusterFound { - logger.Errorf("Unknown cluster: %s", clusterName) + log.Errorf("Unknown cluster: %s", clusterName) return nil } - if err := cl.Delete(); err != nil { - logger.Errorf("could not delete cluster '%s': %s", clusterName, err) - return nil - } - close(c.stopChs[clusterName]) - - c.clustersMu.Lock() - delete(c.clusters, clusterName) - delete(c.stopChs, clusterName) - c.clustersMu.Unlock() - - logger.Infof("Cluster '%s' has been deleted", clusterName) + c.processDeleteEvent(event.WorkerID, clusterName, cl) case spec.EventSync: - logger.Infof("Syncing of the '%s' cluster started", clusterName) - - // no race condition because a cluster is always processed by single worker - if !clusterFound { - stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) - cl.Run(stopCh) - - c.clustersMu.Lock() - c.clusters[clusterName] = cl - c.stopChs[clusterName] = stopCh - c.clustersMu.Unlock() - } - - if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err) - logger.Errorf("%v", cl) - return nil - } - cl.Error = nil - - logger.Infof("Cluster '%s' has been synced", clusterName) + c.processSyncEvent(event.WorkerID, cl, clusterFound, clusterName, event.NewSpec) } return nil