From 83760ebbef5c1954a11155cf62c56588dd872d1a Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Thu, 17 Aug 2017 12:24:23 +0200 Subject: [PATCH] discard cluster events from the queue on cluster delete; delete cluster from the clusters map before deleting cluster itself --- pkg/controller/postgresql.go | 43 +++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 8dc142e14..2c446f0b8 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -193,26 +193,20 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { cl.Error = nil lg.Infoln("cluster has been updated") case spec.EventDelete: - teamName := strings.ToLower(cl.Spec.TeamID) - - lg.Infoln("Deletion of the cluster started") if !clusterFound { lg.Errorf("unknown cluster: %q", clusterName) return } + lg.Infoln("deletion of the cluster started") - if err := cl.Delete(); err != nil { - lg.Errorf("could not delete cluster: %v", err) - return - } - + teamName := strings.ToLower(cl.Spec.TeamID) func() { defer c.clustersMu.Unlock() c.clustersMu.Lock() delete(c.clusters, clusterName) delete(c.clusterLogs, clusterName) - for i, val := range c.teamClusters[teamName] { // on relativel + for i, val := range c.teamClusters[teamName] { if val == clusterName { copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:]) c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{} @@ -222,6 +216,11 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { } }() + if err := cl.Delete(); err != nil { + lg.Errorf("could not delete cluster: %v", err) + return + } + lg.Infof("cluster has been deleted") case spec.EventSync: lg.Infof("syncing of the cluster started") @@ -305,13 +304,35 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec NewSpec: new, WorkerID: workerID, } - //TODO: if we delete cluster, discard all the previous events for the cluster lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { - lg.Errorf("error when queueing cluster event: %v", clusterEvent) + lg.Errorf("error while queueing cluster event: %v", clusterEvent) } lg.Infof("%q event has been queued", eventType) + + if eventType != spec.EventDelete { + return + } + + for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate} { + obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid)) + if err != nil { + lg.Warningf("could not get event from the queue: %v", err) + continue + } + + if !exists { + continue + } + + err = c.clusterEventQueues[workerID].Delete(obj) + if err != nil { + lg.Warningf("could not delete event from the queue: %v", err) + } else { + lg.Debugf("event %q has been discarded for the cluster", evType) + } + } } func (c *Controller) postgresqlAdd(obj interface{}) {