cluster history api endpoint
This commit is contained in:
parent
19de2a24b7
commit
9a66e09b88
|
|
@ -31,4 +31,5 @@ data:
|
||||||
enable_load_balancer: "true"
|
enable_load_balancer: "true"
|
||||||
api_port: "8080"
|
api_port: "8080"
|
||||||
ring_log_lines: "100"
|
ring_log_lines: "100"
|
||||||
|
cluster_history_entries: "1000"
|
||||||
pod_terminate_grace_period: 5m
|
pod_terminate_grace_period: 5m
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,7 @@ type controllerInformer interface {
|
||||||
TeamClusterList() map[string][]spec.NamespacedName
|
TeamClusterList() map[string][]spec.NamespacedName
|
||||||
ClusterStatus(team, cluster string) (*spec.ClusterStatus, error)
|
ClusterStatus(team, cluster string) (*spec.ClusterStatus, error)
|
||||||
ClusterLogs(team, cluster string) ([]*spec.LogEntry, error)
|
ClusterLogs(team, cluster string) ([]*spec.LogEntry, error)
|
||||||
|
ClusterHistory(team, cluster string) ([]*spec.Diff, error)
|
||||||
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
|
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
|
||||||
ListQueue(workerID uint32) (*spec.QueueDump, error)
|
ListQueue(workerID uint32) (*spec.QueueDump, error)
|
||||||
GetWorkersCnt() uint32
|
GetWorkersCnt() uint32
|
||||||
|
|
@ -47,6 +48,7 @@ type Server struct {
|
||||||
var (
|
var (
|
||||||
clusterStatusURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/?$`)
|
clusterStatusURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/?$`)
|
||||||
clusterLogsURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/logs/?$`)
|
clusterLogsURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/logs/?$`)
|
||||||
|
clusterHistoryURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/history/?$`)
|
||||||
teamURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/?$`)
|
teamURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/?$`)
|
||||||
workerLogsURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/logs/?$`)
|
workerLogsURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/logs/?$`)
|
||||||
workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/queue/?$`)
|
workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/queue/?$`)
|
||||||
|
|
@ -160,6 +162,8 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
|
||||||
return
|
return
|
||||||
} else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil {
|
} else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil {
|
||||||
resp, err = s.controller.ClusterLogs(matches["team"], matches["cluster"])
|
resp, err = s.controller.ClusterLogs(matches["team"], matches["cluster"])
|
||||||
|
} else if matches := util.FindNamedStringSubmatch(clusterHistoryURL, req.URL.Path); matches != nil {
|
||||||
|
resp, err = s.controller.ClusterHistory(matches["team"], matches["cluster"])
|
||||||
} else if req.URL.Path == clustersURL {
|
} else if req.URL.Path == clustersURL {
|
||||||
res := make(map[string][]string)
|
res := make(map[string][]string)
|
||||||
for team, clusters := range s.controller.TeamClusterList() {
|
for team, clusters := range s.controller.TeamClusterList() {
|
||||||
|
|
@ -171,8 +175,7 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
|
||||||
s.respond(res, nil, w)
|
s.respond(res, nil, w)
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
s.respond(nil, fmt.Errorf("page not found"), w)
|
err = fmt.Errorf("page not found")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.respond(resp, err, w)
|
s.respond(resp, err, w)
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ type Controller struct {
|
||||||
clustersMu sync.RWMutex
|
clustersMu sync.RWMutex
|
||||||
clusters map[spec.NamespacedName]*cluster.Cluster
|
clusters map[spec.NamespacedName]*cluster.Cluster
|
||||||
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
|
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
|
||||||
|
clusterHistory map[spec.NamespacedName]ringlog.RingLogger // history of the cluster changes
|
||||||
teamClusters map[string][]spec.NamespacedName
|
teamClusters map[string][]spec.NamespacedName
|
||||||
|
|
||||||
postgresqlInformer cache.SharedIndexInformer
|
postgresqlInformer cache.SharedIndexInformer
|
||||||
|
|
@ -57,6 +58,7 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller {
|
||||||
logger: logger.WithField("pkg", "controller"),
|
logger: logger.WithField("pkg", "controller"),
|
||||||
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
|
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
|
||||||
clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
|
clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
|
||||||
|
clusterHistory: make(map[spec.NamespacedName]ringlog.RingLogger),
|
||||||
teamClusters: make(map[string][]spec.NamespacedName),
|
teamClusters: make(map[string][]spec.NamespacedName),
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
podCh: make(chan spec.PodEvent),
|
podCh: make(chan spec.PodEvent),
|
||||||
|
|
|
||||||
|
|
@ -138,12 +138,14 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam
|
||||||
c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
|
c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
|
||||||
c.clusters[clusterName] = cl
|
c.clusters[clusterName] = cl
|
||||||
c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
|
c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
|
||||||
|
c.clusterHistory[clusterName] = ringlog.New(c.opConfig.ClusterHistoryEntries)
|
||||||
|
|
||||||
return cl
|
return cl
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) processEvent(event spec.ClusterEvent) {
|
func (c *Controller) processEvent(event spec.ClusterEvent) {
|
||||||
var clusterName spec.NamespacedName
|
var clusterName spec.NamespacedName
|
||||||
|
var clHistory ringlog.RingLogger
|
||||||
|
|
||||||
lg := c.logger.WithField("worker", event.WorkerID)
|
lg := c.logger.WithField("worker", event.WorkerID)
|
||||||
|
|
||||||
|
|
@ -156,6 +158,9 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
|
||||||
|
|
||||||
c.clustersMu.RLock()
|
c.clustersMu.RLock()
|
||||||
cl, clusterFound := c.clusters[clusterName]
|
cl, clusterFound := c.clusters[clusterName]
|
||||||
|
if clusterFound {
|
||||||
|
clHistory = c.clusterHistory[clusterName]
|
||||||
|
}
|
||||||
c.clustersMu.RUnlock()
|
c.clustersMu.RUnlock()
|
||||||
|
|
||||||
switch event.EventType {
|
switch event.EventType {
|
||||||
|
|
@ -192,6 +197,12 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
|
||||||
}
|
}
|
||||||
cl.Error = nil
|
cl.Error = nil
|
||||||
lg.Infoln("cluster has been updated")
|
lg.Infoln("cluster has been updated")
|
||||||
|
|
||||||
|
clHistory.Insert(&spec.Diff{
|
||||||
|
EventTime: event.EventTime,
|
||||||
|
ProcessTime: time.Now(),
|
||||||
|
Diff: util.Diff(event.OldSpec, event.NewSpec),
|
||||||
|
})
|
||||||
case spec.EventDelete:
|
case spec.EventDelete:
|
||||||
if !clusterFound {
|
if !clusterFound {
|
||||||
lg.Errorf("unknown cluster: %q", clusterName)
|
lg.Errorf("unknown cluster: %q", clusterName)
|
||||||
|
|
@ -211,6 +222,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
|
||||||
|
|
||||||
delete(c.clusters, clusterName)
|
delete(c.clusters, clusterName)
|
||||||
delete(c.clusterLogs, clusterName)
|
delete(c.clusterLogs, clusterName)
|
||||||
|
delete(c.clusterHistory, clusterName)
|
||||||
for i, val := range c.teamClusters[teamName] {
|
for i, val := range c.teamClusters[teamName] {
|
||||||
if val == clusterName {
|
if val == clusterName {
|
||||||
copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
|
copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
|
||||||
|
|
|
||||||
|
|
@ -166,3 +166,25 @@ func (c *Controller) ListQueue(workerID uint32) (*spec.QueueDump, error) {
|
||||||
func (c *Controller) GetWorkersCnt() uint32 {
|
func (c *Controller) GetWorkersCnt() uint32 {
|
||||||
return c.opConfig.Workers
|
return c.opConfig.Workers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClusterHistory dumps history of cluster changes
|
||||||
|
func (c *Controller) ClusterHistory(team, name string) ([]*spec.Diff, error) {
|
||||||
|
clusterName := spec.NamespacedName{
|
||||||
|
Namespace: c.opConfig.Namespace,
|
||||||
|
Name: team + "-" + name,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.clustersMu.RLock()
|
||||||
|
cl, ok := c.clusterHistory[clusterName]
|
||||||
|
c.clustersMu.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("could not find cluster")
|
||||||
|
}
|
||||||
|
|
||||||
|
res := make([]*spec.Diff, 0)
|
||||||
|
for _, e := range cl.Walk() {
|
||||||
|
res = append(res, e.(*spec.Diff))
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -101,6 +101,13 @@ type ClusterStatus struct {
|
||||||
Error error
|
Error error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Diff describes diff
|
||||||
|
type Diff struct {
|
||||||
|
EventTime time.Time
|
||||||
|
ProcessTime time.Time
|
||||||
|
Diff []string
|
||||||
|
}
|
||||||
|
|
||||||
// ControllerStatus describes status of the controller
|
// ControllerStatus describes status of the controller
|
||||||
type ControllerStatus struct {
|
type ControllerStatus struct {
|
||||||
LastSyncTime int64
|
LastSyncTime int64
|
||||||
|
|
|
||||||
|
|
@ -63,6 +63,7 @@ type Config struct {
|
||||||
Workers uint32 `name:"workers" default:"4"`
|
Workers uint32 `name:"workers" default:"4"`
|
||||||
APIPort int `name:"api_port" default:"8080"`
|
APIPort int `name:"api_port" default:"8080"`
|
||||||
RingLogLines int `name:"ring_log_lines" default:"100"`
|
RingLogLines int `name:"ring_log_lines" default:"100"`
|
||||||
|
ClusterHistoryEntries int `name:"cluster_history_entries" default:"1000"`
|
||||||
|
|
||||||
PodTerminateGracePeriod time.Duration `name:"pod_terminate_grace_period" default:"5m"`
|
PodTerminateGracePeriod time.Duration `name:"pod_terminate_grace_period" default:"5m"`
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue