From 82f58b57d8a36ab5e3bfbbee5cc2401ccc4319e8 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Tue, 15 Aug 2017 12:11:06 +0200 Subject: [PATCH] add cluster and controller methods for getting status --- pkg/cluster/cluster.go | 17 ++++ pkg/cluster/resources.go | 20 +++++ pkg/controller/status.go | 162 +++++++++++++++++++++++++++++++++++++++ pkg/spec/types.go | 39 ++++++++++ 4 files changed, 238 insertions(+) create mode 100644 pkg/controller/status.go diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 618139d8a..84a391949 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -644,3 +644,20 @@ func (c *Cluster) initInfrastructureRoles() error { } return nil } + +// GetStatus provides status of the cluster +func (c *Cluster) GetStatus() *spec.ClusterStatus { + return &spec.ClusterStatus{ + Cluster: c.Spec.ClusterName, + Team: c.Spec.TeamID, + Status: c.Status, + Spec: c.Spec, + + MasterService: c.GetServiceMaster(), + ReplicaService: c.GetServiceReplica(), + Endpoint: c.GetEndpoint(), + StatefulSet: c.GetStatefulSet(), + + Error: c.Error, + } +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index a6f6cc70b..8939a7bda 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -419,3 +419,23 @@ func (c *Cluster) createRoles() (err error) { // TODO: figure out what to do with duplicate names (humans and robots) among pgUsers return c.syncRoles(false) } + +// GetServiceMaster returns cluster's kubernetes master Service +func (c *Cluster) GetServiceMaster() *v1.Service { + return c.Service[master] +} + +// GetServiceReplica returns cluster's kubernetes replica Service +func (c *Cluster) GetServiceReplica() *v1.Service { + return c.Service[replica] +} + +// GetEndpoint returns cluster's kubernetes Endpoint +func (c *Cluster) GetEndpoint() *v1.Endpoints { + return c.Endpoint +} + +// GetStatefulSet returns cluster's kubernetes StatefulSet +func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet { + return c.Statefulset +} diff --git a/pkg/controller/status.go b/pkg/controller/status.go new file mode 100644 index 000000000..4dc036a98 --- /dev/null +++ b/pkg/controller/status.go @@ -0,0 +1,162 @@ +package controller + +import ( + "fmt" + "sync/atomic" + + "github.com/Sirupsen/logrus" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util/config" +) + +// ClusterStatus provides status of the cluster +func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) { + clusterName := spec.NamespacedName{ + Namespace: c.opConfig.Namespace, + Name: team + "-" + cluster, + } + + c.clustersMu.RLock() + cl, ok := c.clusters[clusterName] + c.clustersMu.RUnlock() + if !ok { + return nil, fmt.Errorf("could not find cluster") + } + + status := cl.GetStatus() + status.Worker = c.clusterWorkerID(clusterName) + + return status, nil +} + +// TeamClusterList returns team-clusters map +func (c *Controller) TeamClusterList() map[string][]spec.NamespacedName { + return c.teamClusters +} + +// GetConfig returns controller config +func (c *Controller) GetConfig() *spec.ControllerConfig { + return &c.config +} + +// GetOperatorConfig returns operator config +func (c *Controller) GetOperatorConfig() *config.Config { + return c.opConfig +} + +// GetStatus dumps current config and status of the controller +func (c *Controller) GetStatus() *spec.ControllerStatus { + c.clustersMu.RLock() + clustersCnt := len(c.clusters) + c.clustersMu.RUnlock() + + return &spec.ControllerStatus{ + LastSyncTime: atomic.LoadInt64(&c.lastClusterSyncTime), + Clusters: clustersCnt, + } +} + +// ClusterLogs dumps cluster ring logs +func (c *Controller) ClusterLogs(team, name string) ([]*spec.LogEntry, error) { + clusterName := spec.NamespacedName{ + Namespace: c.opConfig.Namespace, + Name: team + "-" + name, + } + + c.clustersMu.RLock() + cl, ok := c.clusterLogs[clusterName] + c.clustersMu.RUnlock() + if !ok { + return nil, fmt.Errorf("could not find cluster") + } + + res := make([]*spec.LogEntry, 0) + for _, e := range cl.Walk() { + logEntry := e.(*spec.LogEntry) + logEntry.ClusterName = nil + + res = append(res, logEntry) + } + + return res, nil +} + +// WorkerLogs dumps logs of the worker +func (c *Controller) WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) { + lg, ok := c.workerLogs[workerID] + if !ok { + return nil, fmt.Errorf("could not find worker") + } + + res := make([]*spec.LogEntry, 0) + for _, e := range lg.Walk() { + logEntry := e.(*spec.LogEntry) + logEntry.Worker = nil + + res = append(res, logEntry) + } + + return res, nil +} + +// Levels returns logrus levels for which hook must fire +func (c *Controller) Levels() []logrus.Level { + return logrus.AllLevels +} + +// Fire is a logrus hook +func (c *Controller) Fire(e *logrus.Entry) error { + var clusterName spec.NamespacedName + + v, ok := e.Data["cluster-name"] + if !ok { + return nil + } + clusterName = v.(spec.NamespacedName) + c.clustersMu.RLock() + clusterRingLog, ok := c.clusterLogs[clusterName] + c.clustersMu.RUnlock() + if !ok { + return nil + } + + logEntry := &spec.LogEntry{ + Time: e.Time, + Level: e.Level, + ClusterName: &clusterName, + Message: e.Message, + } + + if v, hasWorker := e.Data["worker"]; hasWorker { + id := v.(uint32) + + logEntry.Worker = &id + } + clusterRingLog.Insert(logEntry) + + if logEntry.Worker == nil { + return nil + } + c.workerLogs[*logEntry.Worker].Insert(logEntry) // workerLogs map is immutable. No need to lock it + + return nil +} + +// ListQueue dumps cluster event queue of the provided worker +func (c *Controller) ListQueue(workerID uint32) (*spec.QueueDump, error) { + if workerID >= uint32(len(c.clusterEventQueues)) { + return nil, fmt.Errorf("could not find worker") + } + + q := c.clusterEventQueues[workerID] + return &spec.QueueDump{ + Keys: q.ListKeys(), + List: q.List(), + }, nil +} + +// GetWorkersCnt returns number of the workers +func (c *Controller) GetWorkersCnt() uint32 { + return c.opConfig.Workers +} diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 438edbdd5..43f388902 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -4,9 +4,12 @@ import ( "database/sql" "fmt" "strings" + "time" + "github.com/Sirupsen/logrus" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/rest" ) @@ -73,6 +76,42 @@ type UserSyncer interface { ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error } +// LogEntry describes log entry in the RingLogger +type LogEntry struct { + Time time.Time + Level logrus.Level + ClusterName *NamespacedName `json:",omitempty"` + Worker *uint32 `json:",omitempty"` + Message string +} + +// ClusterStatus describes status of the cluster +type ClusterStatus struct { + Team string + Cluster string + MasterService *v1.Service + ReplicaService *v1.Service + Endpoint *v1.Endpoints + StatefulSet *v1beta1.StatefulSet + + Worker uint32 + Status PostgresStatus + Spec PostgresSpec + Error error +} + +// ControllerStatus describes status of the controller +type ControllerStatus struct { + LastSyncTime int64 + Clusters int +} + +// QueueDump describes cache.FIFO queue +type QueueDump struct { + Keys []string + List []interface{} +} + // ControllerConfig describes configuration of the controller type ControllerConfig struct { RestConfig *rest.Config `json:"-"`