add cluster and controller methods for getting status
This commit is contained in:
		
							parent
							
								
									4ee28e3818
								
							
						
					
					
						commit
						82f58b57d8
					
				|  | @ -644,3 +644,20 @@ func (c *Cluster) initInfrastructureRoles() error { | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // GetStatus provides status of the cluster
 | ||||||
|  | func (c *Cluster) GetStatus() *spec.ClusterStatus { | ||||||
|  | 	return &spec.ClusterStatus{ | ||||||
|  | 		Cluster: c.Spec.ClusterName, | ||||||
|  | 		Team:    c.Spec.TeamID, | ||||||
|  | 		Status:  c.Status, | ||||||
|  | 		Spec:    c.Spec, | ||||||
|  | 
 | ||||||
|  | 		MasterService:  c.GetServiceMaster(), | ||||||
|  | 		ReplicaService: c.GetServiceReplica(), | ||||||
|  | 		Endpoint:       c.GetEndpoint(), | ||||||
|  | 		StatefulSet:    c.GetStatefulSet(), | ||||||
|  | 
 | ||||||
|  | 		Error: c.Error, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -419,3 +419,23 @@ func (c *Cluster) createRoles() (err error) { | ||||||
| 	// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
 | 	// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
 | ||||||
| 	return c.syncRoles(false) | 	return c.syncRoles(false) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // GetServiceMaster returns cluster's kubernetes master Service
 | ||||||
|  | func (c *Cluster) GetServiceMaster() *v1.Service { | ||||||
|  | 	return c.Service[master] | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetServiceReplica returns cluster's kubernetes replica Service
 | ||||||
|  | func (c *Cluster) GetServiceReplica() *v1.Service { | ||||||
|  | 	return c.Service[replica] | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetEndpoint returns cluster's kubernetes Endpoint
 | ||||||
|  | func (c *Cluster) GetEndpoint() *v1.Endpoints { | ||||||
|  | 	return c.Endpoint | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetStatefulSet returns cluster's kubernetes StatefulSet
 | ||||||
|  | func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet { | ||||||
|  | 	return c.Statefulset | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -0,0 +1,162 @@ | ||||||
|  | package controller | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"sync/atomic" | ||||||
|  | 
 | ||||||
|  | 	"github.com/Sirupsen/logrus" | ||||||
|  | 
 | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||||
|  | 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // ClusterStatus provides status of the cluster
 | ||||||
|  | func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) { | ||||||
|  | 	clusterName := spec.NamespacedName{ | ||||||
|  | 		Namespace: c.opConfig.Namespace, | ||||||
|  | 		Name:      team + "-" + cluster, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	c.clustersMu.RLock() | ||||||
|  | 	cl, ok := c.clusters[clusterName] | ||||||
|  | 	c.clustersMu.RUnlock() | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil, fmt.Errorf("could not find cluster") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	status := cl.GetStatus() | ||||||
|  | 	status.Worker = c.clusterWorkerID(clusterName) | ||||||
|  | 
 | ||||||
|  | 	return status, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // TeamClusterList returns team-clusters map
 | ||||||
|  | func (c *Controller) TeamClusterList() map[string][]spec.NamespacedName { | ||||||
|  | 	return c.teamClusters | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetConfig returns controller config
 | ||||||
|  | func (c *Controller) GetConfig() *spec.ControllerConfig { | ||||||
|  | 	return &c.config | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetOperatorConfig returns operator config
 | ||||||
|  | func (c *Controller) GetOperatorConfig() *config.Config { | ||||||
|  | 	return c.opConfig | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetStatus dumps current config and status of the controller
 | ||||||
|  | func (c *Controller) GetStatus() *spec.ControllerStatus { | ||||||
|  | 	c.clustersMu.RLock() | ||||||
|  | 	clustersCnt := len(c.clusters) | ||||||
|  | 	c.clustersMu.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	return &spec.ControllerStatus{ | ||||||
|  | 		LastSyncTime: atomic.LoadInt64(&c.lastClusterSyncTime), | ||||||
|  | 		Clusters:     clustersCnt, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // ClusterLogs dumps cluster ring logs
 | ||||||
|  | func (c *Controller) ClusterLogs(team, name string) ([]*spec.LogEntry, error) { | ||||||
|  | 	clusterName := spec.NamespacedName{ | ||||||
|  | 		Namespace: c.opConfig.Namespace, | ||||||
|  | 		Name:      team + "-" + name, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	c.clustersMu.RLock() | ||||||
|  | 	cl, ok := c.clusterLogs[clusterName] | ||||||
|  | 	c.clustersMu.RUnlock() | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil, fmt.Errorf("could not find cluster") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	res := make([]*spec.LogEntry, 0) | ||||||
|  | 	for _, e := range cl.Walk() { | ||||||
|  | 		logEntry := e.(*spec.LogEntry) | ||||||
|  | 		logEntry.ClusterName = nil | ||||||
|  | 
 | ||||||
|  | 		res = append(res, logEntry) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return res, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // WorkerLogs dumps logs of the worker
 | ||||||
|  | func (c *Controller) WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) { | ||||||
|  | 	lg, ok := c.workerLogs[workerID] | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil, fmt.Errorf("could not find worker") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	res := make([]*spec.LogEntry, 0) | ||||||
|  | 	for _, e := range lg.Walk() { | ||||||
|  | 		logEntry := e.(*spec.LogEntry) | ||||||
|  | 		logEntry.Worker = nil | ||||||
|  | 
 | ||||||
|  | 		res = append(res, logEntry) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return res, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Levels returns logrus levels for which hook must fire
 | ||||||
|  | func (c *Controller) Levels() []logrus.Level { | ||||||
|  | 	return logrus.AllLevels | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Fire is a logrus hook
 | ||||||
|  | func (c *Controller) Fire(e *logrus.Entry) error { | ||||||
|  | 	var clusterName spec.NamespacedName | ||||||
|  | 
 | ||||||
|  | 	v, ok := e.Data["cluster-name"] | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	clusterName = v.(spec.NamespacedName) | ||||||
|  | 	c.clustersMu.RLock() | ||||||
|  | 	clusterRingLog, ok := c.clusterLogs[clusterName] | ||||||
|  | 	c.clustersMu.RUnlock() | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	logEntry := &spec.LogEntry{ | ||||||
|  | 		Time:        e.Time, | ||||||
|  | 		Level:       e.Level, | ||||||
|  | 		ClusterName: &clusterName, | ||||||
|  | 		Message:     e.Message, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if v, hasWorker := e.Data["worker"]; hasWorker { | ||||||
|  | 		id := v.(uint32) | ||||||
|  | 
 | ||||||
|  | 		logEntry.Worker = &id | ||||||
|  | 	} | ||||||
|  | 	clusterRingLog.Insert(logEntry) | ||||||
|  | 
 | ||||||
|  | 	if logEntry.Worker == nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	c.workerLogs[*logEntry.Worker].Insert(logEntry) // workerLogs map is immutable. No need to lock it
 | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // ListQueue dumps cluster event queue of the provided worker
 | ||||||
|  | func (c *Controller) ListQueue(workerID uint32) (*spec.QueueDump, error) { | ||||||
|  | 	if workerID >= uint32(len(c.clusterEventQueues)) { | ||||||
|  | 		return nil, fmt.Errorf("could not find worker") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	q := c.clusterEventQueues[workerID] | ||||||
|  | 	return &spec.QueueDump{ | ||||||
|  | 		Keys: q.ListKeys(), | ||||||
|  | 		List: q.List(), | ||||||
|  | 	}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetWorkersCnt returns number of the workers
 | ||||||
|  | func (c *Controller) GetWorkersCnt() uint32 { | ||||||
|  | 	return c.opConfig.Workers | ||||||
|  | } | ||||||
|  | @ -4,9 +4,12 @@ import ( | ||||||
| 	"database/sql" | 	"database/sql" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"strings" | 	"strings" | ||||||
|  | 	"time" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/Sirupsen/logrus" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	"k8s.io/client-go/pkg/api/v1" | 	"k8s.io/client-go/pkg/api/v1" | ||||||
|  | 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | ||||||
| 	"k8s.io/client-go/rest" | 	"k8s.io/client-go/rest" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -73,6 +76,42 @@ type UserSyncer interface { | ||||||
| 	ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error | 	ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // LogEntry describes log entry in the RingLogger
 | ||||||
|  | type LogEntry struct { | ||||||
|  | 	Time        time.Time | ||||||
|  | 	Level       logrus.Level | ||||||
|  | 	ClusterName *NamespacedName `json:",omitempty"` | ||||||
|  | 	Worker      *uint32         `json:",omitempty"` | ||||||
|  | 	Message     string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // ClusterStatus describes status of the cluster
 | ||||||
|  | type ClusterStatus struct { | ||||||
|  | 	Team           string | ||||||
|  | 	Cluster        string | ||||||
|  | 	MasterService  *v1.Service | ||||||
|  | 	ReplicaService *v1.Service | ||||||
|  | 	Endpoint       *v1.Endpoints | ||||||
|  | 	StatefulSet    *v1beta1.StatefulSet | ||||||
|  | 
 | ||||||
|  | 	Worker uint32 | ||||||
|  | 	Status PostgresStatus | ||||||
|  | 	Spec   PostgresSpec | ||||||
|  | 	Error  error | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // ControllerStatus describes status of the controller
 | ||||||
|  | type ControllerStatus struct { | ||||||
|  | 	LastSyncTime int64 | ||||||
|  | 	Clusters     int | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // QueueDump describes cache.FIFO queue
 | ||||||
|  | type QueueDump struct { | ||||||
|  | 	Keys []string | ||||||
|  | 	List []interface{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // ControllerConfig describes configuration of the controller
 | // ControllerConfig describes configuration of the controller
 | ||||||
| type ControllerConfig struct { | type ControllerConfig struct { | ||||||
| 	RestConfig          *rest.Config `json:"-"` | 	RestConfig          *rest.Config `json:"-"` | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue