Cluster history API endpoint
This commit is contained in:
commit
48a43c5188
|
|
@ -32,4 +32,5 @@ data:
|
|||
enable_load_balancer: "true"
|
||||
api_port: "8080"
|
||||
ring_log_lines: "100"
|
||||
cluster_history_entries: "1000"
|
||||
pod_terminate_grace_period: 5m
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ type controllerInformer interface {
|
|||
TeamClusterList() map[string][]spec.NamespacedName
|
||||
ClusterStatus(team, cluster string) (*spec.ClusterStatus, error)
|
||||
ClusterLogs(team, cluster string) ([]*spec.LogEntry, error)
|
||||
ClusterHistory(team, cluster string) ([]*spec.Diff, error)
|
||||
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
|
||||
ListQueue(workerID uint32) (*spec.QueueDump, error)
|
||||
GetWorkersCnt() uint32
|
||||
|
|
@ -47,6 +48,7 @@ type Server struct {
|
|||
var (
|
||||
clusterStatusURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/?$`)
|
||||
clusterLogsURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/logs/?$`)
|
||||
clusterHistoryURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/history/?$`)
|
||||
teamURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/?$`)
|
||||
workerLogsURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/logs/?$`)
|
||||
workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/queue/?$`)
|
||||
|
|
@ -160,6 +162,8 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
|
|||
return
|
||||
} else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil {
|
||||
resp, err = s.controller.ClusterLogs(matches["team"], matches["cluster"])
|
||||
} else if matches := util.FindNamedStringSubmatch(clusterHistoryURL, req.URL.Path); matches != nil {
|
||||
resp, err = s.controller.ClusterHistory(matches["team"], matches["cluster"])
|
||||
} else if req.URL.Path == clustersURL {
|
||||
res := make(map[string][]string)
|
||||
for team, clusters := range s.controller.TeamClusterList() {
|
||||
|
|
@ -171,8 +175,7 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
|
|||
s.respond(res, nil, w)
|
||||
return
|
||||
} else {
|
||||
s.respond(nil, fmt.Errorf("page not found"), w)
|
||||
return
|
||||
err = fmt.Errorf("page not found")
|
||||
}
|
||||
|
||||
s.respond(resp, err, w)
|
||||
|
|
|
|||
|
|
@ -535,9 +535,11 @@ func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpo
|
|||
|
||||
func (c *Cluster) generateCloneEnvironment(description *spec.CloneDescription) []v1.EnvVar {
|
||||
result := make([]v1.EnvVar, 0)
|
||||
|
||||
if description.ClusterName == "" {
|
||||
return result
|
||||
}
|
||||
|
||||
cluster := description.ClusterName
|
||||
result = append(result, v1.EnvVar{Name: "CLONE_SCOPE", Value: cluster})
|
||||
if description.EndTimestamp == "" {
|
||||
|
|
@ -563,11 +565,11 @@ func (c *Cluster) generateCloneEnvironment(description *spec.CloneDescription) [
|
|||
})
|
||||
} else {
|
||||
// cloning with S3, find out the bucket to clone
|
||||
clone_wal_s3_bucket := c.OpConfig.WALES3Bucket
|
||||
result = append(result, v1.EnvVar{Name: "CLONE_METHOD", Value: "CLONE_WITH_WALE"})
|
||||
result = append(result, v1.EnvVar{Name: "CLONE_WAL_S3_BUCKET", Value: clone_wal_s3_bucket})
|
||||
result = append(result, v1.EnvVar{Name: "CLONE_WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket})
|
||||
result = append(result, v1.EnvVar{Name: "CLONE_TARGET_TIME", Value: description.EndTimestamp})
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,10 +32,11 @@ type Controller struct {
|
|||
|
||||
stopCh chan struct{}
|
||||
|
||||
clustersMu sync.RWMutex
|
||||
clusters map[spec.NamespacedName]*cluster.Cluster
|
||||
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
|
||||
teamClusters map[string][]spec.NamespacedName
|
||||
clustersMu sync.RWMutex
|
||||
clusters map[spec.NamespacedName]*cluster.Cluster
|
||||
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
|
||||
clusterHistory map[spec.NamespacedName]ringlog.RingLogger // history of the cluster changes
|
||||
teamClusters map[string][]spec.NamespacedName
|
||||
|
||||
postgresqlInformer cache.SharedIndexInformer
|
||||
podInformer cache.SharedIndexInformer
|
||||
|
|
@ -52,14 +53,15 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller {
|
|||
logger := logrus.New()
|
||||
|
||||
c := &Controller{
|
||||
config: *controllerConfig,
|
||||
opConfig: &config.Config{},
|
||||
logger: logger.WithField("pkg", "controller"),
|
||||
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
|
||||
clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
|
||||
teamClusters: make(map[string][]spec.NamespacedName),
|
||||
stopCh: make(chan struct{}),
|
||||
podCh: make(chan spec.PodEvent),
|
||||
config: *controllerConfig,
|
||||
opConfig: &config.Config{},
|
||||
logger: logger.WithField("pkg", "controller"),
|
||||
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
|
||||
clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
|
||||
clusterHistory: make(map[spec.NamespacedName]ringlog.RingLogger),
|
||||
teamClusters: make(map[string][]spec.NamespacedName),
|
||||
stopCh: make(chan struct{}),
|
||||
podCh: make(chan spec.PodEvent),
|
||||
}
|
||||
logger.Hooks.Add(c)
|
||||
|
||||
|
|
|
|||
|
|
@ -138,12 +138,14 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam
|
|||
c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
|
||||
c.clusters[clusterName] = cl
|
||||
c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
|
||||
c.clusterHistory[clusterName] = ringlog.New(c.opConfig.ClusterHistoryEntries)
|
||||
|
||||
return cl
|
||||
}
|
||||
|
||||
func (c *Controller) processEvent(event spec.ClusterEvent) {
|
||||
var clusterName spec.NamespacedName
|
||||
var clHistory ringlog.RingLogger
|
||||
|
||||
lg := c.logger.WithField("worker", event.WorkerID)
|
||||
|
||||
|
|
@ -156,6 +158,9 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
|
|||
|
||||
c.clustersMu.RLock()
|
||||
cl, clusterFound := c.clusters[clusterName]
|
||||
if clusterFound {
|
||||
clHistory = c.clusterHistory[clusterName]
|
||||
}
|
||||
c.clustersMu.RUnlock()
|
||||
|
||||
switch event.EventType {
|
||||
|
|
@ -192,6 +197,12 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
|
|||
}
|
||||
cl.Error = nil
|
||||
lg.Infoln("cluster has been updated")
|
||||
|
||||
clHistory.Insert(&spec.Diff{
|
||||
EventTime: event.EventTime,
|
||||
ProcessTime: time.Now(),
|
||||
Diff: util.Diff(event.OldSpec, event.NewSpec),
|
||||
})
|
||||
case spec.EventDelete:
|
||||
if !clusterFound {
|
||||
lg.Errorf("unknown cluster: %q", clusterName)
|
||||
|
|
@ -211,6 +222,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
|
|||
|
||||
delete(c.clusters, clusterName)
|
||||
delete(c.clusterLogs, clusterName)
|
||||
delete(c.clusterHistory, clusterName)
|
||||
for i, val := range c.teamClusters[teamName] {
|
||||
if val == clusterName {
|
||||
copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
|
||||
|
|
@ -298,6 +310,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
|
|||
|
||||
workerID := c.clusterWorkerID(clusterName)
|
||||
clusterEvent := spec.ClusterEvent{
|
||||
EventTime: time.Now(),
|
||||
EventType: eventType,
|
||||
UID: uid,
|
||||
OldSpec: old,
|
||||
|
|
|
|||
|
|
@ -166,3 +166,25 @@ func (c *Controller) ListQueue(workerID uint32) (*spec.QueueDump, error) {
|
|||
func (c *Controller) GetWorkersCnt() uint32 {
|
||||
return c.opConfig.Workers
|
||||
}
|
||||
|
||||
// ClusterHistory dumps history of cluster changes
|
||||
func (c *Controller) ClusterHistory(team, name string) ([]*spec.Diff, error) {
|
||||
clusterName := spec.NamespacedName{
|
||||
Namespace: c.opConfig.Namespace,
|
||||
Name: team + "-" + name,
|
||||
}
|
||||
|
||||
c.clustersMu.RLock()
|
||||
cl, ok := c.clusterHistory[clusterName]
|
||||
c.clustersMu.RUnlock()
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("could not find cluster")
|
||||
}
|
||||
|
||||
res := make([]*spec.Diff, 0)
|
||||
for _, e := range cl.Walk() {
|
||||
res = append(res, e.(*spec.Diff))
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,8 +48,7 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 {
|
|||
func (c *Controller) createTPR() error {
|
||||
tpr := thirdPartyResource(constants.TPRName)
|
||||
|
||||
_, err := c.KubeClient.ThirdPartyResources().Create(tpr)
|
||||
if err != nil {
|
||||
if _, err := c.KubeClient.ThirdPartyResources().Create(tpr); err != nil {
|
||||
if !k8sutil.ResourceAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ const (
|
|||
|
||||
// ClusterEvent carries the payload of the Cluster TPR events.
|
||||
type ClusterEvent struct {
|
||||
EventTime time.Time
|
||||
UID types.UID
|
||||
EventType EventType
|
||||
OldSpec *Postgresql
|
||||
|
|
@ -100,6 +101,13 @@ type ClusterStatus struct {
|
|||
Error error
|
||||
}
|
||||
|
||||
// Diff describes diff
|
||||
type Diff struct {
|
||||
EventTime time.Time
|
||||
ProcessTime time.Time
|
||||
Diff []string
|
||||
}
|
||||
|
||||
// ControllerStatus describes status of the controller
|
||||
type ControllerStatus struct {
|
||||
LastSyncTime int64
|
||||
|
|
|
|||
|
|
@ -47,23 +47,24 @@ type Config struct {
|
|||
TPR
|
||||
Resources
|
||||
Auth
|
||||
Namespace string `name:"namespace"`
|
||||
EtcdHost string `name:"etcd_host" default:"etcd-client.default.svc.cluster.local:2379"`
|
||||
DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4"`
|
||||
ServiceAccountName string `name:"service_account_name" default:"operator"`
|
||||
DbHostedZone string `name:"db_hosted_zone" default:"db.example.com"`
|
||||
EtcdScope string `name:"etcd_scope" default:"service"`
|
||||
WALES3Bucket string `name:"wal_s3_bucket"`
|
||||
KubeIAMRole string `name:"kube_iam_role"`
|
||||
DebugLogging bool `name:"debug_logging" default:"true"`
|
||||
EnableDBAccess bool `name:"enable_database_access" default:"true"`
|
||||
EnableTeamsAPI bool `name:"enable_teams_api" default:"true"`
|
||||
EnableLoadBalancer bool `name:"enable_load_balancer" default:"true"`
|
||||
MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"`
|
||||
ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"`
|
||||
Workers uint32 `name:"workers" default:"4"`
|
||||
APIPort int `name:"api_port" default:"8080"`
|
||||
RingLogLines int `name:"ring_log_lines" default:"100"`
|
||||
Namespace string `name:"namespace"`
|
||||
EtcdHost string `name:"etcd_host" default:"etcd-client.default.svc.cluster.local:2379"`
|
||||
DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4"`
|
||||
ServiceAccountName string `name:"service_account_name" default:"operator"`
|
||||
DbHostedZone string `name:"db_hosted_zone" default:"db.example.com"`
|
||||
EtcdScope string `name:"etcd_scope" default:"service"`
|
||||
WALES3Bucket string `name:"wal_s3_bucket"`
|
||||
KubeIAMRole string `name:"kube_iam_role"`
|
||||
DebugLogging bool `name:"debug_logging" default:"true"`
|
||||
EnableDBAccess bool `name:"enable_database_access" default:"true"`
|
||||
EnableTeamsAPI bool `name:"enable_teams_api" default:"true"`
|
||||
EnableLoadBalancer bool `name:"enable_load_balancer" default:"true"`
|
||||
MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"`
|
||||
ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"`
|
||||
Workers uint32 `name:"workers" default:"4"`
|
||||
APIPort int `name:"api_port" default:"8080"`
|
||||
RingLogLines int `name:"ring_log_lines" default:"100"`
|
||||
ClusterHistoryEntries int `name:"cluster_history_entries" default:"1000"`
|
||||
|
||||
PodTerminateGracePeriod time.Duration `name:"pod_terminate_grace_period" default:"5m"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ func New(size int) *RingLog {
|
|||
return &r
|
||||
}
|
||||
|
||||
// Insert inserts new LogEntry into the ring logger
|
||||
// Insert inserts new entry into the ring logger
|
||||
func (r *RingLog) Insert(obj interface{}) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
|
@ -39,7 +39,7 @@ func (r *RingLog) Insert(obj interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
// Walk dumps all the LogEntries from the Ring logger
|
||||
// Walk dumps all the entries from the Ring logger
|
||||
func (r *RingLog) Walk() []interface{} {
|
||||
res := make([]interface{}, 0)
|
||||
|
||||
|
|
|
|||
|
|
@ -52,10 +52,14 @@ func PGUserPassword(user spec.PgUser) string {
|
|||
return md5prefix + hex.EncodeToString(s[:])
|
||||
}
|
||||
|
||||
// Diff returns diffs between 2 objects
|
||||
func Diff(a, b interface{}) []string {
|
||||
return pretty.Diff(a, b)
|
||||
}
|
||||
|
||||
// PrettyDiff shows the diff between 2 objects in an easy to understand format. It is mainly used for debugging output.
|
||||
func PrettyDiff(a, b interface{}) (result string) {
|
||||
diff := pretty.Diff(a, b)
|
||||
return strings.Join(diff, "\n")
|
||||
func PrettyDiff(a, b interface{}) string {
|
||||
return strings.Join(Diff(a, b), "\n")
|
||||
}
|
||||
|
||||
// SubstractStringSlices finds elements in a that are not in b and return them as a result slice.
|
||||
|
|
@ -73,6 +77,7 @@ OUTER:
|
|||
return result, len(result) == 0
|
||||
}
|
||||
|
||||
// FindNamedStringSubmatch returns a map of strings holding the text of the matches of the r regular expression
|
||||
func FindNamedStringSubmatch(r *regexp.Regexp, s string) map[string]string {
|
||||
matches := r.FindStringSubmatch(s)
|
||||
grNames := r.SubexpNames()
|
||||
|
|
|
|||
Loading…
Reference in New Issue