Merge pull request #195 from zalando-incubator/databases-rest-endpoint
Add a REST endpoint to list databases in all clusters
This commit is contained in:
commit
bb5ce6cbbe
|
|
@ -33,6 +33,7 @@ type controllerInformer interface {
|
||||||
ClusterStatus(team, cluster string) (*spec.ClusterStatus, error)
|
ClusterStatus(team, cluster string) (*spec.ClusterStatus, error)
|
||||||
ClusterLogs(team, cluster string) ([]*spec.LogEntry, error)
|
ClusterLogs(team, cluster string) ([]*spec.LogEntry, error)
|
||||||
ClusterHistory(team, cluster string) ([]*spec.Diff, error)
|
ClusterHistory(team, cluster string) ([]*spec.Diff, error)
|
||||||
|
ClusterDatabasesMap() map[string][]string
|
||||||
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
|
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
|
||||||
ListQueue(workerID uint32) (*spec.QueueDump, error)
|
ListQueue(workerID uint32) (*spec.QueueDump, error)
|
||||||
GetWorkersCnt() uint32
|
GetWorkersCnt() uint32
|
||||||
|
|
@ -78,6 +79,7 @@ func New(controller controllerInformer, port int, logger *logrus.Logger) *Server
|
||||||
|
|
||||||
mux.HandleFunc("/clusters/", s.clusters)
|
mux.HandleFunc("/clusters/", s.clusters)
|
||||||
mux.HandleFunc("/workers/", s.workers)
|
mux.HandleFunc("/workers/", s.workers)
|
||||||
|
mux.HandleFunc("/databases/", s.databases)
|
||||||
|
|
||||||
s.http = http.Server{
|
s.http = http.Server{
|
||||||
Addr: fmt.Sprintf(":%d", port),
|
Addr: fmt.Sprintf(":%d", port),
|
||||||
|
|
@ -222,6 +224,14 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) {
|
||||||
s.respond(resp, err, w)
|
s.respond(resp, err, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) databases(w http.ResponseWriter, req *http.Request) {
|
||||||
|
|
||||||
|
databaseNamesPerCluster := s.controller.ClusterDatabasesMap()
|
||||||
|
s.respond(databaseNamesPerCluster, nil, w)
|
||||||
|
return
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) {
|
||||||
workersCnt := s.controller.GetWorkersCnt()
|
workersCnt := s.controller.GetWorkersCnt()
|
||||||
resp := make(map[uint32]*spec.QueueDump, workersCnt)
|
resp := make(map[uint32]*spec.QueueDump, workersCnt)
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,8 @@ type Cluster struct {
|
||||||
oauthTokenGetter OAuthTokenGetter
|
oauthTokenGetter OAuthTokenGetter
|
||||||
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
||||||
currentProcess spec.Process
|
currentProcess spec.Process
|
||||||
processMu sync.RWMutex
|
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
|
||||||
|
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type compareStatefulsetResult struct {
|
type compareStatefulsetResult struct {
|
||||||
|
|
@ -437,7 +438,7 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
c.setStatus(spec.ClusterStatusUpdating)
|
c.setStatus(spec.ClusterStatusUpdating)
|
||||||
c.Postgresql = *newSpec
|
c.setSpec(newSpec)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if updateFailed {
|
if updateFailed {
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
c.Postgresql = *newSpec
|
c.setSpec(newSpec)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,12 @@
|
||||||
package cluster
|
package cluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/gob"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -18,7 +21,6 @@ import (
|
||||||
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
|
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
|
||||||
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
|
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
|
||||||
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
|
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
|
||||||
"sort"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// OAuthTokenGetter provides the method for fetching OAuth tokens
|
// OAuthTokenGetter provides the method for fetching OAuth tokens
|
||||||
|
|
@ -386,3 +388,32 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st
|
||||||
func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName {
|
func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName {
|
||||||
return replicas[rand.Intn(len(replicas))]
|
return replicas[rand.Intn(len(replicas))]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cloneSpec(from *spec.Postgresql) (*spec.Postgresql, error) {
|
||||||
|
var (
|
||||||
|
buf bytes.Buffer
|
||||||
|
result *spec.Postgresql
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
enc := gob.NewEncoder(&buf)
|
||||||
|
if err = enc.Encode(*from); err != nil {
|
||||||
|
return nil, fmt.Errorf("could not encode the spec: %v", err)
|
||||||
|
}
|
||||||
|
dec := gob.NewDecoder(&buf)
|
||||||
|
if err = dec.Decode(&result); err != nil {
|
||||||
|
return nil, fmt.Errorf("could not decode the spec: %v", err)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) setSpec(newSpec *spec.Postgresql) {
|
||||||
|
c.specMu.Lock()
|
||||||
|
c.Postgresql = *newSpec
|
||||||
|
c.specMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) GetSpec() (*spec.Postgresql, error) {
|
||||||
|
c.specMu.RLock()
|
||||||
|
defer c.specMu.RUnlock()
|
||||||
|
return cloneSpec(&c.Postgresql)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
|
|
@ -32,6 +33,29 @@ func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, e
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClusterDatabasesMap returns for each cluster the list of databases running there
|
||||||
|
func (c *Controller) ClusterDatabasesMap() map[string][]string {
|
||||||
|
|
||||||
|
m := make(map[string][]string)
|
||||||
|
|
||||||
|
// avoid modifying the cluster list while we are fetching each one of them.
|
||||||
|
c.clustersMu.RLock()
|
||||||
|
defer c.clustersMu.RUnlock()
|
||||||
|
for _, cluster := range c.clusters {
|
||||||
|
// GetSpec holds the specMu lock of a cluster
|
||||||
|
if spec, err := cluster.GetSpec(); err == nil {
|
||||||
|
for database := range spec.Spec.Databases {
|
||||||
|
m[cluster.Name] = append(m[cluster.Name], database)
|
||||||
|
}
|
||||||
|
sort.Strings(m[cluster.Name])
|
||||||
|
} else {
|
||||||
|
c.logger.Warningf("could not get the list of databases for cluster %q: %v", cluster.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
// TeamClusterList returns team-clusters map
|
// TeamClusterList returns team-clusters map
|
||||||
func (c *Controller) TeamClusterList() map[string][]spec.NamespacedName {
|
func (c *Controller) TeamClusterList() map[string][]spec.NamespacedName {
|
||||||
return c.teamClusters
|
return c.teamClusters
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue