From 9720ac1f7e86557ffa54e4f0bbf19ca8651e5182 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 22 Dec 2017 13:06:11 +0100 Subject: [PATCH] WIP: Hold the proper locks while examining the list of databases. Introduce a new lock called specMu lock to protect the cluster spec. This lock is held on update and sync, and when retrieving the spec in the API code. There is no need to acquire it for cluster creation and deletion: creation assigns the spec to the cluster before linking it to the controller, and deletion just removes the cluster from the list in the controller, both holding the global clustersMu Lock. --- pkg/apiserver/apiserver.go | 4 ++-- pkg/cluster/cluster.go | 5 +++-- pkg/cluster/sync.go | 2 +- pkg/cluster/util.go | 33 ++++++++++++++++++++++++++++++++- pkg/controller/status.go | 14 +++++++++++--- 5 files changed, 49 insertions(+), 9 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 80409f6d1..2b0b2f98e 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -33,11 +33,11 @@ type controllerInformer interface { ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) ClusterLogs(team, cluster string) ([]*spec.LogEntry, error) ClusterHistory(team, cluster string) ([]*spec.Diff, error) + ClusterDatabasesMap() map[string][]string WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) ListQueue(workerID uint32) (*spec.QueueDump, error) GetWorkersCnt() uint32 WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) - GetClusterDatabasesMap() map[string][]string } // Server describes HTTP API server @@ -226,7 +226,7 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) { func (s *Server) databases(w http.ResponseWriter, req *http.Request) { - databaseNamesPerCluster := s.controller.GetClusterDatabasesMap() + databaseNamesPerCluster := s.controller.ClusterDatabasesMap() s.respond(databaseNamesPerCluster, nil, w) return diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 66aed2d74..3ed4c156d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -75,7 +75,8 @@ type Cluster struct { oauthTokenGetter OAuthTokenGetter KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? currentProcess spec.Process - processMu sync.RWMutex + processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex + specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex } type compareStatefulsetResult struct { @@ -437,7 +438,7 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error { defer c.mu.Unlock() c.setStatus(spec.ClusterStatusUpdating) - c.Postgresql = *newSpec + c.setSpec(newSpec) defer func() { if updateFailed { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 4a24b311e..864f2a833 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -20,7 +20,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { c.mu.Lock() defer c.mu.Unlock() - c.Postgresql = *newSpec + c.setSpec(newSpec) defer func() { if err != nil { diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 580b87251..9bd292271 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -1,9 +1,12 @@ package cluster import ( + "bytes" + "encoding/gob" "encoding/json" "fmt" "math/rand" + "sort" "strings" "time" @@ -18,7 +21,6 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" - "sort" ) // OAuthTokenGetter provides the method for fetching OAuth tokens @@ -386,3 +388,32 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName { return replicas[rand.Intn(len(replicas))] } + +func cloneSpec(from *spec.Postgresql) (*spec.Postgresql, error) { + var ( + buf bytes.Buffer + result *spec.Postgresql + err error + ) + enc := gob.NewEncoder(&buf) + if err = enc.Encode(*from); err != nil { + return nil, fmt.Errorf("could not encode the spec: %v", err) + } + dec := gob.NewDecoder(&buf) + if err = dec.Decode(&result); err != nil { + return nil, fmt.Errorf("could not decode the spec: %v", err) + } + return result, nil +} + +func (c *Cluster) setSpec(newSpec *spec.Postgresql) { + c.specMu.Lock() + c.Postgresql = *newSpec + c.specMu.Unlock() +} + +func (c *Cluster) GetSpec() (*spec.Postgresql, error) { + c.specMu.RLock() + defer c.specMu.RUnlock() + return cloneSpec(&c.Postgresql) +} diff --git a/pkg/controller/status.go b/pkg/controller/status.go index b5edcec87..fa1703aa3 100644 --- a/pkg/controller/status.go +++ b/pkg/controller/status.go @@ -33,13 +33,21 @@ func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, e } // GetClusterDatabasesMap returns for each cluster the list of databases running there -func (c *Controller) GetClusterDatabasesMap() map[string][]string { +func (c *Controller) ClusterDatabasesMap() map[string][]string { m := make(map[string][]string) + // avoid modifying the cluster list while we are fetching each one of them. + c.clustersMu.RLock() + defer c.clustersMu.RUnlock() for _, cluster := range c.clusters { - for database := range cluster.Postgresql.Spec.Databases { - m[cluster.Name] = append(m[cluster.Name], database) + // GetSpec holds the specMu lock of a cluster + if spec, err := cluster.GetSpec(); err == nil { + for database := range spec.Spec.Databases { + m[cluster.Name] = append(m[cluster.Name], database) + } + } else { + c.logger.Warningf("could not get the list of databases for cluster %q: %v", cluster.Name, err) } }