From 011458fb0575fcfa65f82ca9fb19517e44128646 Mon Sep 17 00:00:00 2001 From: Sergey Dudoladov Date: Thu, 21 Dec 2017 17:28:55 +0100 Subject: [PATCH 1/4] Add a REST endpoint to list databases in all clusters --- pkg/apiserver/apiserver.go | 10 ++++++++++ pkg/controller/status.go | 14 ++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 2d9c497cd..80409f6d1 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -37,6 +37,7 @@ type controllerInformer interface { ListQueue(workerID uint32) (*spec.QueueDump, error) GetWorkersCnt() uint32 WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) + GetClusterDatabasesMap() map[string][]string } // Server describes HTTP API server @@ -78,6 +79,7 @@ func New(controller controllerInformer, port int, logger *logrus.Logger) *Server mux.HandleFunc("/clusters/", s.clusters) mux.HandleFunc("/workers/", s.workers) + mux.HandleFunc("/databases/", s.databases) s.http = http.Server{ Addr: fmt.Sprintf(":%d", port), @@ -222,6 +224,14 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) { s.respond(resp, err, w) } +func (s *Server) databases(w http.ResponseWriter, req *http.Request) { + + databaseNamesPerCluster := s.controller.GetClusterDatabasesMap() + s.respond(databaseNamesPerCluster, nil, w) + return + +} + func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) { workersCnt := s.controller.GetWorkersCnt() resp := make(map[uint32]*spec.QueueDump, workersCnt) diff --git a/pkg/controller/status.go b/pkg/controller/status.go index 27465fdd7..b5edcec87 100644 --- a/pkg/controller/status.go +++ b/pkg/controller/status.go @@ -32,6 +32,20 @@ func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, e return status, nil } +// GetClusterDatabasesMap returns for each cluster the list of databases running there +func (c *Controller) GetClusterDatabasesMap() map[string][]string { + + m := make(map[string][]string) + + for _, cluster := range c.clusters { + for database := range cluster.Postgresql.Spec.Databases { + m[cluster.Name] = append(m[cluster.Name], database) + } + } + + return m +} + // TeamClusterList returns team-clusters map func (c *Controller) TeamClusterList() map[string][]spec.NamespacedName { return c.teamClusters From b8bf97ab768a4b2f89561c1d723eb74d41cfdd5e Mon Sep 17 00:00:00 2001 From: Sergey Dudoladov Date: Fri, 22 Dec 2017 12:53:57 +0100 Subject: [PATCH 2/4] Integrate comments from code reviews --- pkg/apiserver/apiserver.go | 4 ++-- pkg/controller/status.go | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 80409f6d1..97a971e75 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -37,7 +37,7 @@ type controllerInformer interface { ListQueue(workerID uint32) (*spec.QueueDump, error) GetWorkersCnt() uint32 WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) - GetClusterDatabasesMap() map[string][]string + ClusterDatabasesMap() map[string][]string } // Server describes HTTP API server @@ -226,7 +226,7 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) { func (s *Server) databases(w http.ResponseWriter, req *http.Request) { - databaseNamesPerCluster := s.controller.GetClusterDatabasesMap() + databaseNamesPerCluster := s.controller.ClusterDatabasesMap() s.respond(databaseNamesPerCluster, nil, w) return diff --git a/pkg/controller/status.go b/pkg/controller/status.go index b5edcec87..f0f948efb 100644 --- a/pkg/controller/status.go +++ b/pkg/controller/status.go @@ -32,16 +32,20 @@ func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, e return status, nil } -// GetClusterDatabasesMap returns for each cluster the list of databases running there -func (c *Controller) GetClusterDatabasesMap() map[string][]string { +// ClusterDatabasesMap returns for each cluster the list of databases running there +func (c *Controller) ClusterDatabasesMap() map[string][]string { m := make(map[string][]string) + c.clustersMu.RLock() for _, cluster := range c.clusters { + cluster.Lock() for database := range cluster.Postgresql.Spec.Databases { m[cluster.Name] = append(m[cluster.Name], database) } + cluster.Unlock() } + c.clustersMu.RUnlock() return m } From 9720ac1f7e86557ffa54e4f0bbf19ca8651e5182 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 22 Dec 2017 13:06:11 +0100 Subject: [PATCH 3/4] WIP: Hold the proper locks while examining the list of databases. Introduce a new lock called specMu lock to protect the cluster spec. This lock is held on update and sync, and when retrieving the spec in the API code. There is no need to acquire it for cluster creation and deletion: creation assigns the spec to the cluster before linking it to the controller, and deletion just removes the cluster from the list in the controller, both holding the global clustersMu Lock. --- pkg/apiserver/apiserver.go | 4 ++-- pkg/cluster/cluster.go | 5 +++-- pkg/cluster/sync.go | 2 +- pkg/cluster/util.go | 33 ++++++++++++++++++++++++++++++++- pkg/controller/status.go | 14 +++++++++++--- 5 files changed, 49 insertions(+), 9 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 80409f6d1..2b0b2f98e 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -33,11 +33,11 @@ type controllerInformer interface { ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) ClusterLogs(team, cluster string) ([]*spec.LogEntry, error) ClusterHistory(team, cluster string) ([]*spec.Diff, error) + ClusterDatabasesMap() map[string][]string WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) ListQueue(workerID uint32) (*spec.QueueDump, error) GetWorkersCnt() uint32 WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) - GetClusterDatabasesMap() map[string][]string } // Server describes HTTP API server @@ -226,7 +226,7 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) { func (s *Server) databases(w http.ResponseWriter, req *http.Request) { - databaseNamesPerCluster := s.controller.GetClusterDatabasesMap() + databaseNamesPerCluster := s.controller.ClusterDatabasesMap() s.respond(databaseNamesPerCluster, nil, w) return diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 66aed2d74..3ed4c156d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -75,7 +75,8 @@ type Cluster struct { oauthTokenGetter OAuthTokenGetter KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? currentProcess spec.Process - processMu sync.RWMutex + processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex + specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex } type compareStatefulsetResult struct { @@ -437,7 +438,7 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error { defer c.mu.Unlock() c.setStatus(spec.ClusterStatusUpdating) - c.Postgresql = *newSpec + c.setSpec(newSpec) defer func() { if updateFailed { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 4a24b311e..864f2a833 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -20,7 +20,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { c.mu.Lock() defer c.mu.Unlock() - c.Postgresql = *newSpec + c.setSpec(newSpec) defer func() { if err != nil { diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 580b87251..9bd292271 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -1,9 +1,12 @@ package cluster import ( + "bytes" + "encoding/gob" "encoding/json" "fmt" "math/rand" + "sort" "strings" "time" @@ -18,7 +21,6 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" - "sort" ) // OAuthTokenGetter provides the method for fetching OAuth tokens @@ -386,3 +388,32 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName { return replicas[rand.Intn(len(replicas))] } + +func cloneSpec(from *spec.Postgresql) (*spec.Postgresql, error) { + var ( + buf bytes.Buffer + result *spec.Postgresql + err error + ) + enc := gob.NewEncoder(&buf) + if err = enc.Encode(*from); err != nil { + return nil, fmt.Errorf("could not encode the spec: %v", err) + } + dec := gob.NewDecoder(&buf) + if err = dec.Decode(&result); err != nil { + return nil, fmt.Errorf("could not decode the spec: %v", err) + } + return result, nil +} + +func (c *Cluster) setSpec(newSpec *spec.Postgresql) { + c.specMu.Lock() + c.Postgresql = *newSpec + c.specMu.Unlock() +} + +func (c *Cluster) GetSpec() (*spec.Postgresql, error) { + c.specMu.RLock() + defer c.specMu.RUnlock() + return cloneSpec(&c.Postgresql) +} diff --git a/pkg/controller/status.go b/pkg/controller/status.go index b5edcec87..fa1703aa3 100644 --- a/pkg/controller/status.go +++ b/pkg/controller/status.go @@ -33,13 +33,21 @@ func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, e } // GetClusterDatabasesMap returns for each cluster the list of databases running there -func (c *Controller) GetClusterDatabasesMap() map[string][]string { +func (c *Controller) ClusterDatabasesMap() map[string][]string { m := make(map[string][]string) + // avoid modifying the cluster list while we are fetching each one of them. + c.clustersMu.RLock() + defer c.clustersMu.RUnlock() for _, cluster := range c.clusters { - for database := range cluster.Postgresql.Spec.Databases { - m[cluster.Name] = append(m[cluster.Name], database) + // GetSpec holds the specMu lock of a cluster + if spec, err := cluster.GetSpec(); err == nil { + for database := range spec.Spec.Databases { + m[cluster.Name] = append(m[cluster.Name], database) + } + } else { + c.logger.Warningf("could not get the list of databases for cluster %q: %v", cluster.Name, err) } } From 5c8bd04169587d754c56ec4f5872319fcde3f64a Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 22 Dec 2017 15:48:13 +0100 Subject: [PATCH 4/4] Sort database by name. --- pkg/controller/status.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/controller/status.go b/pkg/controller/status.go index f6d5a3027..bc9480e36 100644 --- a/pkg/controller/status.go +++ b/pkg/controller/status.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "sort" "sync/atomic" "github.com/Sirupsen/logrus" @@ -46,6 +47,7 @@ func (c *Controller) ClusterDatabasesMap() map[string][]string { for database := range spec.Spec.Databases { m[cluster.Name] = append(m[cluster.Name], database) } + sort.Strings(m[cluster.Name]) } else { c.logger.Warningf("could not get the list of databases for cluster %q: %v", cluster.Name, err) }