WIP: Hold the proper locks while examining the list of databases.
Introduce a new lock called specMu lock to protect the cluster spec. This lock is held on update and sync, and when retrieving the spec in the API code. There is no need to acquire it for cluster creation and deletion: creation assigns the spec to the cluster before linking it to the controller, and deletion just removes the cluster from the list in the controller, both holding the global clustersMu Lock.
This commit is contained in:
		
							parent
							
								
									011458fb05
								
							
						
					
					
						commit
						9720ac1f7e
					
				|  | @ -33,11 +33,11 @@ type controllerInformer interface { | ||||||
| 	ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) | 	ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) | ||||||
| 	ClusterLogs(team, cluster string) ([]*spec.LogEntry, error) | 	ClusterLogs(team, cluster string) ([]*spec.LogEntry, error) | ||||||
| 	ClusterHistory(team, cluster string) ([]*spec.Diff, error) | 	ClusterHistory(team, cluster string) ([]*spec.Diff, error) | ||||||
|  | 	ClusterDatabasesMap() map[string][]string | ||||||
| 	WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) | 	WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) | ||||||
| 	ListQueue(workerID uint32) (*spec.QueueDump, error) | 	ListQueue(workerID uint32) (*spec.QueueDump, error) | ||||||
| 	GetWorkersCnt() uint32 | 	GetWorkersCnt() uint32 | ||||||
| 	WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) | 	WorkerStatus(workerID uint32) (*spec.WorkerStatus, error) | ||||||
| 	GetClusterDatabasesMap() map[string][]string |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Server describes HTTP API server
 | // Server describes HTTP API server
 | ||||||
|  | @ -226,7 +226,7 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) { | ||||||
| 
 | 
 | ||||||
| func (s *Server) databases(w http.ResponseWriter, req *http.Request) { | func (s *Server) databases(w http.ResponseWriter, req *http.Request) { | ||||||
| 
 | 
 | ||||||
| 	databaseNamesPerCluster := s.controller.GetClusterDatabasesMap() | 	databaseNamesPerCluster := s.controller.ClusterDatabasesMap() | ||||||
| 	s.respond(databaseNamesPerCluster, nil, w) | 	s.respond(databaseNamesPerCluster, nil, w) | ||||||
| 	return | 	return | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -75,7 +75,8 @@ type Cluster struct { | ||||||
| 	oauthTokenGetter OAuthTokenGetter | 	oauthTokenGetter OAuthTokenGetter | ||||||
| 	KubeClient       k8sutil.KubernetesClient //TODO: move clients to the better place?
 | 	KubeClient       k8sutil.KubernetesClient //TODO: move clients to the better place?
 | ||||||
| 	currentProcess   spec.Process | 	currentProcess   spec.Process | ||||||
| 	processMu        sync.RWMutex | 	processMu        sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
 | ||||||
|  | 	specMu           sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type compareStatefulsetResult struct { | type compareStatefulsetResult struct { | ||||||
|  | @ -437,7 +438,7 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error { | ||||||
| 	defer c.mu.Unlock() | 	defer c.mu.Unlock() | ||||||
| 
 | 
 | ||||||
| 	c.setStatus(spec.ClusterStatusUpdating) | 	c.setStatus(spec.ClusterStatusUpdating) | ||||||
| 	c.Postgresql = *newSpec | 	c.setSpec(newSpec) | ||||||
| 
 | 
 | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if updateFailed { | 		if updateFailed { | ||||||
|  |  | ||||||
|  | @ -20,7 +20,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { | ||||||
| 	c.mu.Lock() | 	c.mu.Lock() | ||||||
| 	defer c.mu.Unlock() | 	defer c.mu.Unlock() | ||||||
| 
 | 
 | ||||||
| 	c.Postgresql = *newSpec | 	c.setSpec(newSpec) | ||||||
| 
 | 
 | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  |  | ||||||
|  | @ -1,9 +1,12 @@ | ||||||
| package cluster | package cluster | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"encoding/gob" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math/rand" | 	"math/rand" | ||||||
|  | 	"sort" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | @ -18,7 +21,6 @@ import ( | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" | 	"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" | ||||||
| 	"sort" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // OAuthTokenGetter provides the method for fetching OAuth tokens
 | // OAuthTokenGetter provides the method for fetching OAuth tokens
 | ||||||
|  | @ -386,3 +388,32 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st | ||||||
| func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName { | func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName { | ||||||
| 	return replicas[rand.Intn(len(replicas))] | 	return replicas[rand.Intn(len(replicas))] | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func cloneSpec(from *spec.Postgresql) (*spec.Postgresql, error) { | ||||||
|  | 	var ( | ||||||
|  | 		buf    bytes.Buffer | ||||||
|  | 		result *spec.Postgresql | ||||||
|  | 		err    error | ||||||
|  | 	) | ||||||
|  | 	enc := gob.NewEncoder(&buf) | ||||||
|  | 	if err = enc.Encode(*from); err != nil { | ||||||
|  | 		return nil, fmt.Errorf("could not encode the spec: %v", err) | ||||||
|  | 	} | ||||||
|  | 	dec := gob.NewDecoder(&buf) | ||||||
|  | 	if err = dec.Decode(&result); err != nil { | ||||||
|  | 		return nil, fmt.Errorf("could not decode the spec: %v", err) | ||||||
|  | 	} | ||||||
|  | 	return result, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) setSpec(newSpec *spec.Postgresql) { | ||||||
|  | 	c.specMu.Lock() | ||||||
|  | 	c.Postgresql = *newSpec | ||||||
|  | 	c.specMu.Unlock() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) GetSpec() (*spec.Postgresql, error) { | ||||||
|  | 	c.specMu.RLock() | ||||||
|  | 	defer c.specMu.RUnlock() | ||||||
|  | 	return cloneSpec(&c.Postgresql) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -33,13 +33,21 @@ func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, e | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // GetClusterDatabasesMap returns for each cluster the list of databases running there
 | // GetClusterDatabasesMap returns for each cluster the list of databases running there
 | ||||||
| func (c *Controller) GetClusterDatabasesMap() map[string][]string { | func (c *Controller) ClusterDatabasesMap() map[string][]string { | ||||||
| 
 | 
 | ||||||
| 	m := make(map[string][]string) | 	m := make(map[string][]string) | ||||||
| 
 | 
 | ||||||
|  | 	// avoid modifying the cluster list while we are fetching each one of them.
 | ||||||
|  | 	c.clustersMu.RLock() | ||||||
|  | 	defer c.clustersMu.RUnlock() | ||||||
| 	for _, cluster := range c.clusters { | 	for _, cluster := range c.clusters { | ||||||
| 		for database := range cluster.Postgresql.Spec.Databases { | 		// GetSpec holds the specMu lock of a cluster
 | ||||||
| 			m[cluster.Name] = append(m[cluster.Name], database) | 		if spec, err := cluster.GetSpec(); err == nil { | ||||||
|  | 			for database := range spec.Spec.Databases { | ||||||
|  | 				m[cluster.Name] = append(m[cluster.Name], database) | ||||||
|  | 			} | ||||||
|  | 		} else { | ||||||
|  | 			c.logger.Warningf("could not get the list of databases for cluster %q: %v", cluster.Name, err) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue