239 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			239 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
package controller
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"sort"
 | 
						|
	"sync/atomic"
 | 
						|
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
 | 
						|
	"github.com/zalando-incubator/postgres-operator/pkg/cluster"
 | 
						|
	"github.com/zalando-incubator/postgres-operator/pkg/spec"
 | 
						|
	"github.com/zalando-incubator/postgres-operator/pkg/util"
 | 
						|
	"github.com/zalando-incubator/postgres-operator/pkg/util/config"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
)
 | 
						|
 | 
						|
// ClusterStatus provides status of the cluster
 | 
						|
func (c *Controller) ClusterStatus(team, namespace, cluster string) (*cluster.Status, error) {
 | 
						|
 | 
						|
	clusterName := spec.NamespacedName{
 | 
						|
		Namespace: namespace,
 | 
						|
		Name:      team + "-" + cluster,
 | 
						|
	}
 | 
						|
 | 
						|
	c.clustersMu.RLock()
 | 
						|
	cl, ok := c.clusters[clusterName]
 | 
						|
	c.clustersMu.RUnlock()
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("could not find cluster")
 | 
						|
	}
 | 
						|
 | 
						|
	status := cl.GetStatus()
 | 
						|
	status.Worker = c.clusterWorkerID(clusterName)
 | 
						|
 | 
						|
	return status, nil
 | 
						|
}
 | 
						|
 | 
						|
// ClusterDatabasesMap returns for each cluster the list of databases running there
 | 
						|
func (c *Controller) ClusterDatabasesMap() map[string][]string {
 | 
						|
 | 
						|
	m := make(map[string][]string)
 | 
						|
 | 
						|
	// avoid modifying the cluster list while we are fetching each one of them.
 | 
						|
	c.clustersMu.RLock()
 | 
						|
	defer c.clustersMu.RUnlock()
 | 
						|
	for _, cluster := range c.clusters {
 | 
						|
		// GetSpec holds the specMu lock of a cluster
 | 
						|
		if spec, err := cluster.GetSpec(); err == nil {
 | 
						|
			for database := range spec.Spec.Databases {
 | 
						|
				m[cluster.Name] = append(m[cluster.Name], database)
 | 
						|
			}
 | 
						|
			sort.Strings(m[cluster.Name])
 | 
						|
		} else {
 | 
						|
			c.logger.Warningf("could not get the list of databases for cluster %q: %v", cluster.Name, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return m
 | 
						|
}
 | 
						|
 | 
						|
// TeamClusterList returns team-clusters map
 | 
						|
func (c *Controller) TeamClusterList() map[string][]spec.NamespacedName {
 | 
						|
	return c.teamClusters
 | 
						|
}
 | 
						|
 | 
						|
// GetConfig returns controller config
 | 
						|
func (c *Controller) GetConfig() *spec.ControllerConfig {
 | 
						|
	return &c.config
 | 
						|
}
 | 
						|
 | 
						|
// GetOperatorConfig returns operator config
 | 
						|
func (c *Controller) GetOperatorConfig() *config.Config {
 | 
						|
	return c.opConfig
 | 
						|
}
 | 
						|
 | 
						|
// GetStatus dumps current config and status of the controller
 | 
						|
func (c *Controller) GetStatus() *spec.ControllerStatus {
 | 
						|
	c.clustersMu.RLock()
 | 
						|
	clustersCnt := len(c.clusters)
 | 
						|
	c.clustersMu.RUnlock()
 | 
						|
 | 
						|
	queueSizes := make(map[int]int, c.opConfig.Workers)
 | 
						|
	for workerID, queue := range c.clusterEventQueues {
 | 
						|
		queueSizes[workerID] = len(queue.ListKeys())
 | 
						|
	}
 | 
						|
 | 
						|
	return &spec.ControllerStatus{
 | 
						|
		LastSyncTime:    atomic.LoadInt64(&c.lastClusterSyncTime),
 | 
						|
		Clusters:        clustersCnt,
 | 
						|
		WorkerQueueSize: queueSizes,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ClusterLogs dumps cluster ring logs
 | 
						|
func (c *Controller) ClusterLogs(team, namespace, name string) ([]*spec.LogEntry, error) {
 | 
						|
 | 
						|
	clusterName := spec.NamespacedName{
 | 
						|
		Namespace: namespace,
 | 
						|
		Name:      team + "-" + name,
 | 
						|
	}
 | 
						|
 | 
						|
	c.clustersMu.RLock()
 | 
						|
	cl, ok := c.clusterLogs[clusterName]
 | 
						|
	c.clustersMu.RUnlock()
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("could not find cluster")
 | 
						|
	}
 | 
						|
 | 
						|
	res := make([]*spec.LogEntry, 0)
 | 
						|
	for _, e := range cl.Walk() {
 | 
						|
		logEntry := e.(*spec.LogEntry)
 | 
						|
		logEntry.ClusterName = nil
 | 
						|
 | 
						|
		res = append(res, logEntry)
 | 
						|
	}
 | 
						|
 | 
						|
	return res, nil
 | 
						|
}
 | 
						|
 | 
						|
// WorkerLogs dumps logs of the worker
 | 
						|
func (c *Controller) WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) {
 | 
						|
	lg, ok := c.workerLogs[workerID]
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("could not find worker")
 | 
						|
	}
 | 
						|
 | 
						|
	res := make([]*spec.LogEntry, 0)
 | 
						|
	for _, e := range lg.Walk() {
 | 
						|
		logEntry := e.(*spec.LogEntry)
 | 
						|
		logEntry.Worker = nil
 | 
						|
 | 
						|
		res = append(res, logEntry)
 | 
						|
	}
 | 
						|
 | 
						|
	return res, nil
 | 
						|
}
 | 
						|
 | 
						|
// Levels returns logrus levels for which hook must fire
 | 
						|
func (c *Controller) Levels() []logrus.Level {
 | 
						|
	return logrus.AllLevels
 | 
						|
}
 | 
						|
 | 
						|
// Fire is a logrus hook
 | 
						|
func (c *Controller) Fire(e *logrus.Entry) error {
 | 
						|
	var clusterName spec.NamespacedName
 | 
						|
 | 
						|
	v, ok := e.Data["cluster-name"]
 | 
						|
	if !ok {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	clusterName = v.(spec.NamespacedName)
 | 
						|
	c.clustersMu.RLock()
 | 
						|
	clusterRingLog, ok := c.clusterLogs[clusterName]
 | 
						|
	c.clustersMu.RUnlock()
 | 
						|
	if !ok {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	logEntry := &spec.LogEntry{
 | 
						|
		Time:        e.Time,
 | 
						|
		Level:       e.Level,
 | 
						|
		ClusterName: &clusterName,
 | 
						|
		Message:     e.Message,
 | 
						|
	}
 | 
						|
 | 
						|
	if v, hasWorker := e.Data["worker"]; hasWorker {
 | 
						|
		id := v.(uint32)
 | 
						|
 | 
						|
		logEntry.Worker = &id
 | 
						|
	}
 | 
						|
	clusterRingLog.Insert(logEntry)
 | 
						|
 | 
						|
	if logEntry.Worker == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	c.workerLogs[*logEntry.Worker].Insert(logEntry) // workerLogs map is immutable. No need to lock it
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// ListQueue dumps cluster event queue of the provided worker
 | 
						|
func (c *Controller) ListQueue(workerID uint32) (*spec.QueueDump, error) {
 | 
						|
	if workerID >= uint32(len(c.clusterEventQueues)) {
 | 
						|
		return nil, fmt.Errorf("could not find worker")
 | 
						|
	}
 | 
						|
 | 
						|
	q := c.clusterEventQueues[workerID]
 | 
						|
	return &spec.QueueDump{
 | 
						|
		Keys: q.ListKeys(),
 | 
						|
		List: q.List(),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// GetWorkersCnt returns number of the workers
 | 
						|
func (c *Controller) GetWorkersCnt() uint32 {
 | 
						|
	return c.opConfig.Workers
 | 
						|
}
 | 
						|
 | 
						|
//WorkerStatus provides status of the worker
 | 
						|
func (c *Controller) WorkerStatus(workerID uint32) (*cluster.WorkerStatus, error) {
 | 
						|
	obj, ok := c.curWorkerCluster.Load(workerID)
 | 
						|
	if !ok || obj == nil {
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	cl, ok := obj.(*cluster.Cluster)
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("could not cast to Cluster struct")
 | 
						|
	}
 | 
						|
 | 
						|
	return &cluster.WorkerStatus{
 | 
						|
		CurrentCluster: types.NamespacedName(util.NameFromMeta(cl.ObjectMeta)),
 | 
						|
		CurrentProcess: cl.GetCurrentProcess(),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// ClusterHistory dumps history of cluster changes
 | 
						|
func (c *Controller) ClusterHistory(team, namespace, name string) ([]*spec.Diff, error) {
 | 
						|
 | 
						|
	clusterName := spec.NamespacedName{
 | 
						|
		Namespace: namespace,
 | 
						|
		Name:      team + "-" + name,
 | 
						|
	}
 | 
						|
 | 
						|
	c.clustersMu.RLock()
 | 
						|
	cl, ok := c.clusterHistory[clusterName]
 | 
						|
	c.clustersMu.RUnlock()
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("could not find cluster")
 | 
						|
	}
 | 
						|
 | 
						|
	res := make([]*spec.Diff, 0)
 | 
						|
	for _, e := range cl.Walk() {
 | 
						|
		res = append(res, e.(*spec.Diff))
 | 
						|
	}
 | 
						|
 | 
						|
	return res, nil
 | 
						|
}
 |