diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index b99a4bf71..0ad229493 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -93,7 +93,7 @@ func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { s.logger.Fatalf("Could not start http server: %v", err) } }() - s.logger.Infof("Listening on %s", s.http.Addr) + s.logger.Infof("listening on %s", s.http.Addr) <-stopCh diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 02a06cac2..5fa8d6b36 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -132,12 +132,12 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { DoRaw() if k8sutil.ResourceNotFound(err) { - c.logger.Warningf("could not set status for the non-existing cluster") + c.logger.Warningf("could not set %q status for the non-existing cluster", status) return } if err != nil { - c.logger.Warningf("could not set status for the cluster: %v", err) + c.logger.Warningf("could not set %q status for the cluster: %v", status, err) } } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8bc9632f6..eaf0a6a8c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,6 +6,7 @@ import ( "github.com/Sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -169,7 +170,7 @@ func (c *Controller) initController() { return "", fmt.Errorf("could not cast to ClusterEvent") } - return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil + return queueClusterKey(e.EventType, e.UID), nil }) } @@ -206,3 +207,7 @@ func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.Wait c.postgresqlInformer.Run(stopCh) } + +func queueClusterKey(eventType spec.EventType, uid types.UID) string { + return fmt.Sprintf("%s-%s", eventType, uid) +} diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 8dc142e14..2c446f0b8 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -193,26 +193,20 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { cl.Error = nil lg.Infoln("cluster has been updated") case spec.EventDelete: - teamName := strings.ToLower(cl.Spec.TeamID) - - lg.Infoln("Deletion of the cluster started") if !clusterFound { lg.Errorf("unknown cluster: %q", clusterName) return } + lg.Infoln("deletion of the cluster started") - if err := cl.Delete(); err != nil { - lg.Errorf("could not delete cluster: %v", err) - return - } - + teamName := strings.ToLower(cl.Spec.TeamID) func() { defer c.clustersMu.Unlock() c.clustersMu.Lock() delete(c.clusters, clusterName) delete(c.clusterLogs, clusterName) - for i, val := range c.teamClusters[teamName] { // on relativel + for i, val := range c.teamClusters[teamName] { if val == clusterName { copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:]) c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{} @@ -222,6 +216,11 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { } }() + if err := cl.Delete(); err != nil { + lg.Errorf("could not delete cluster: %v", err) + return + } + lg.Infof("cluster has been deleted") case spec.EventSync: lg.Infof("syncing of the cluster started") @@ -305,13 +304,35 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec NewSpec: new, WorkerID: workerID, } - //TODO: if we delete cluster, discard all the previous events for the cluster lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { - lg.Errorf("error when queueing cluster event: %v", clusterEvent) + lg.Errorf("error while queueing cluster event: %v", clusterEvent) } lg.Infof("%q event has been queued", eventType) + + if eventType != spec.EventDelete { + return + } + + for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate} { + obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid)) + if err != nil { + lg.Warningf("could not get event from the queue: %v", err) + continue + } + + if !exists { + continue + } + + err = c.clusterEventQueues[workerID].Delete(obj) + if err != nil { + lg.Warningf("could not delete event from the queue: %v", err) + } else { + lg.Debugf("event %q has been discarded for the cluster", evType) + } + } } func (c *Controller) postgresqlAdd(obj interface{}) {