Diagnostic API

This commit is contained in:
Murat Kabilov 2017-08-15 15:53:13 +02:00 committed by GitHub
commit 5a7a3fec17
12 changed files with 635 additions and 32 deletions

View File

@ -29,3 +29,5 @@ data:
teams_api_url: http://fake-teams-api.default.svc.cluster.local teams_api_url: http://fake-teams-api.default.svc.cluster.local
workers: "4" workers: "4"
enable_load_balancer: "true" enable_load_balancer: "true"
api_port: "8080"
ring_log_lines: "100"

219
pkg/apiserver/apiserver.go Normal file
View File

@ -0,0 +1,219 @@
package apiserver
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/pprof"
"regexp"
"strconv"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
)
const (
httpAPITimeout = time.Minute * 1
shutdownTimeout = time.Second * 10
httpReadTimeout = time.Millisecond * 100
)
// ControllerInformer describes stats methods of a controller
type controllerInformer interface {
GetConfig() *spec.ControllerConfig
GetOperatorConfig() *config.Config
GetStatus() *spec.ControllerStatus
TeamClusterList() map[string][]spec.NamespacedName
ClusterStatus(team, cluster string) (*spec.ClusterStatus, error)
ClusterLogs(team, cluster string) ([]*spec.LogEntry, error)
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
ListQueue(workerID uint32) (*spec.QueueDump, error)
GetWorkersCnt() uint32
}
// Server describes HTTP API server
type Server struct {
logger *logrus.Entry
http http.Server
controller controllerInformer
}
var (
clusterStatusURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/?$`)
clusterLogsURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/logs/?$`)
teamURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/?$`)
workerLogsURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/logs/?$`)
workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/queue/?$`)
workerAllQueue = regexp.MustCompile(`^/workers/all/queue/?$`)
clustersURL = "/clusters/"
)
// New creates new HTTP API server
func New(controller controllerInformer, port int, logger *logrus.Logger) *Server {
s := &Server{
logger: logger.WithField("pkg", "apiserver"),
controller: controller,
}
mux := http.NewServeMux()
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
mux.Handle("/status/", http.HandlerFunc(s.controllerStatus))
mux.Handle("/config/", http.HandlerFunc(s.operatorConfig))
mux.HandleFunc("/clusters/", s.clusters)
mux.HandleFunc("/workers/", s.workers)
s.http = http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: http.TimeoutHandler(mux, httpAPITimeout, ""),
ReadTimeout: httpReadTimeout,
}
return s
}
// Run starts the HTTP server
func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
go func() {
err := s.http.ListenAndServe()
if err != http.ErrServerClosed {
s.logger.Fatalf("Could not start http server: %v", err)
}
}()
s.logger.Infof("Listening on %s", s.http.Addr)
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := s.http.Shutdown(ctx)
if err == context.DeadlineExceeded {
s.logger.Warnf("Shutdown timeout exceeded. closing http server")
s.http.Close()
} else if err != nil {
s.logger.Errorf("Could not shutdown http server: %v", err)
}
s.logger.Infoln("Http server shut down")
}
func (s *Server) respond(obj interface{}, err error, w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]interface{}{"error": err.Error()})
return
}
err = json.NewEncoder(w).Encode(obj)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.logger.Errorf("Could not encode: %v", err)
}
}
func (s *Server) controllerStatus(w http.ResponseWriter, req *http.Request) {
s.respond(s.controller.GetStatus(), nil, w)
}
func (s *Server) operatorConfig(w http.ResponseWriter, req *http.Request) {
s.respond(map[string]interface{}{
"controller": s.controller.GetConfig(),
"operator": s.controller.GetOperatorConfig(),
}, nil, w)
}
func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
var (
resp interface{}
err error
)
if matches := util.FindNamedStringSubmatch(clusterStatusURL, req.URL.Path); matches != nil {
resp, err = s.controller.ClusterStatus(matches["team"], matches["cluster"])
} else if matches := util.FindNamedStringSubmatch(teamURL, req.URL.Path); matches != nil {
teamClusters := s.controller.TeamClusterList()
clusters, found := teamClusters[matches["team"]]
if !found {
s.respond(nil, fmt.Errorf("could not find clusters for the team"), w)
}
clusterNames := make([]string, 0)
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name[len(matches["team"])+1:])
}
s.respond(clusterNames, nil, w)
return
} else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil {
resp, err = s.controller.ClusterLogs(matches["team"], matches["cluster"])
} else if req.URL.Path == clustersURL {
res := make(map[string][]string)
for team, clusters := range s.controller.TeamClusterList() {
for _, cluster := range clusters {
res[team] = append(res[team], cluster.Name[len(team)+1:])
}
}
s.respond(res, nil, w)
return
} else {
s.respond(nil, fmt.Errorf("page not found"), w)
return
}
s.respond(resp, err, w)
}
func (s *Server) workers(w http.ResponseWriter, req *http.Request) {
var (
resp interface{}
err error
)
if workerAllQueue.MatchString(req.URL.Path) {
s.allQueues(w, req)
return
} else if matches := util.FindNamedStringSubmatch(workerLogsURL, req.URL.Path); matches != nil {
workerID, _ := strconv.Atoi(matches["id"])
resp, err = s.controller.WorkerLogs(uint32(workerID))
} else if matches := util.FindNamedStringSubmatch(workerEventsQueueURL, req.URL.Path); matches != nil {
workerID, _ := strconv.Atoi(matches["id"])
resp, err = s.controller.ListQueue(uint32(workerID))
} else {
s.respond(nil, fmt.Errorf("page not found"), w)
return
}
s.respond(resp, err, w)
}
func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) {
workersCnt := s.controller.GetWorkersCnt()
resp := make(map[uint32]*spec.QueueDump, workersCnt)
for i := uint32(0); i < workersCnt; i++ {
queueDump, err := s.controller.ListQueue(i)
if err != nil {
s.respond(nil, err, w)
return
}
resp[i] = queueDump
}
s.respond(resp, nil, w)
}

View File

@ -644,3 +644,20 @@ func (c *Cluster) initInfrastructureRoles() error {
} }
return nil return nil
} }
// GetStatus provides status of the cluster
func (c *Cluster) GetStatus() *spec.ClusterStatus {
return &spec.ClusterStatus{
Cluster: c.Spec.ClusterName,
Team: c.Spec.TeamID,
Status: c.Status,
Spec: c.Spec,
MasterService: c.GetServiceMaster(),
ReplicaService: c.GetServiceReplica(),
Endpoint: c.GetEndpoint(),
StatefulSet: c.GetStatefulSet(),
Error: c.Error,
}
}

View File

@ -419,3 +419,23 @@ func (c *Cluster) createRoles() (err error) {
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers // TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
return c.syncRoles(false) return c.syncRoles(false)
} }
// GetServiceMaster returns cluster's kubernetes master Service
func (c *Cluster) GetServiceMaster() *v1.Service {
return c.Service[master]
}
// GetServiceReplica returns cluster's kubernetes replica Service
func (c *Cluster) GetServiceReplica() *v1.Service {
return c.Service[replica]
}
// GetEndpoint returns cluster's kubernetes Endpoint
func (c *Cluster) GetEndpoint() *v1.Endpoints {
return c.Endpoint
}
// GetStatefulSet returns cluster's kubernetes StatefulSet
func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet {
return c.Statefulset
}

View File

@ -10,11 +10,13 @@ import (
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"github.com/zalando-incubator/postgres-operator/pkg/apiserver"
"github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
"github.com/zalando-incubator/postgres-operator/pkg/util/ringlog"
) )
// Controller represents operator controller // Controller represents operator controller
@ -25,10 +27,14 @@ type Controller struct {
logger *logrus.Entry logger *logrus.Entry
KubeClient k8sutil.KubernetesClient KubeClient k8sutil.KubernetesClient
RestClient rest.Interface // kubernetes API group REST client RestClient rest.Interface // kubernetes API group REST client
apiserver *apiserver.Server
clustersMu sync.RWMutex stopCh chan struct{}
clusters map[spec.NamespacedName]*cluster.Cluster
stopChs map[spec.NamespacedName]chan struct{} clustersMu sync.RWMutex
clusters map[spec.NamespacedName]*cluster.Cluster
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
teamClusters map[string][]spec.NamespacedName
postgresqlInformer cache.SharedIndexInformer postgresqlInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer
@ -36,6 +42,8 @@ type Controller struct {
clusterEventQueues []*cache.FIFO // [workerID]Queue clusterEventQueues []*cache.FIFO // [workerID]Queue
lastClusterSyncTime int64 lastClusterSyncTime int64
workerLogs map[uint32]ringlog.RingLogger
} }
// NewController creates a new controller // NewController creates a new controller
@ -43,12 +51,16 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller {
logger := logrus.New() logger := logrus.New()
c := &Controller{ c := &Controller{
config: *controllerConfig, config: *controllerConfig,
opConfig: &config.Config{}, opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"), logger: logger.WithField("pkg", "controller"),
clusters: make(map[spec.NamespacedName]*cluster.Cluster), clusters: make(map[spec.NamespacedName]*cluster.Cluster),
podCh: make(chan spec.PodEvent), clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
teamClusters: make(map[string][]spec.NamespacedName),
stopCh: make(chan struct{}),
podCh: make(chan spec.PodEvent),
} }
logger.Hooks.Add(c)
return c return c
} }
@ -149,6 +161,7 @@ func (c *Controller) initController() {
}) })
c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers)
c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers)
for i := range c.clusterEventQueues { for i := range c.clusterEventQueues {
c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) {
e, ok := obj.(spec.ClusterEvent) e, ok := obj.(spec.ClusterEvent)
@ -159,19 +172,23 @@ func (c *Controller) initController() {
return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil
}) })
} }
c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger)
} }
// Run starts background controller processes // Run starts background controller processes
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
c.initController() c.initController()
wg.Add(3) wg.Add(4)
go c.runPodInformer(stopCh, wg) go c.runPodInformer(stopCh, wg)
go c.runPostgresqlInformer(stopCh, wg) go c.runPostgresqlInformer(stopCh, wg)
go c.clusterResync(stopCh, wg) go c.clusterResync(stopCh, wg)
go c.apiserver.Run(stopCh, wg)
for i := range c.clusterEventQueues { for i := range c.clusterEventQueues {
wg.Add(1) wg.Add(1)
c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines)
go c.processClusterEventsQueue(i, stopCh, wg) go c.processClusterEventsQueue(i, stopCh, wg)
} }

View File

@ -4,10 +4,12 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"reflect" "reflect"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/Sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -18,6 +20,7 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/ringlog"
) )
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
@ -124,6 +127,21 @@ func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interfa
}), nil }), nil
} }
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *spec.Postgresql) *cluster.Cluster {
cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg)
cl.Run(c.stopCh)
teamName := strings.ToLower(cl.Spec.TeamID)
defer c.clustersMu.Unlock()
c.clustersMu.Lock()
c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName)
c.clusters[clusterName] = cl
c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines)
return cl
}
func (c *Controller) processEvent(obj interface{}) error { func (c *Controller) processEvent(obj interface{}) error {
var clusterName spec.NamespacedName var clusterName spec.NamespacedName
@ -153,14 +171,7 @@ func (c *Controller) processEvent(obj interface{}) error {
lg.Infof("creation of the cluster started") lg.Infof("creation of the cluster started")
stopCh := make(chan struct{}) cl = c.addCluster(lg, clusterName, event.NewSpec)
cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg)
cl.Run(stopCh)
c.clustersMu.Lock()
c.clusters[clusterName] = cl
c.stopChs[clusterName] = stopCh
c.clustersMu.Unlock()
if err := cl.Create(); err != nil { if err := cl.Create(); err != nil {
cl.Error = fmt.Errorf("could not create cluster: %v", err) cl.Error = fmt.Errorf("could not create cluster: %v", err)
@ -186,7 +197,9 @@ func (c *Controller) processEvent(obj interface{}) error {
cl.Error = nil cl.Error = nil
lg.Infoln("cluster has been updated") lg.Infoln("cluster has been updated")
case spec.EventDelete: case spec.EventDelete:
lg.Infof("deletion of the %q cluster started", clusterName) teamName := strings.ToLower(cl.Spec.TeamID)
lg.Infoln("Deletion of the cluster started")
if !clusterFound { if !clusterFound {
lg.Errorf("unknown cluster: %q", clusterName) lg.Errorf("unknown cluster: %q", clusterName)
return nil return nil
@ -196,12 +209,22 @@ func (c *Controller) processEvent(obj interface{}) error {
lg.Errorf("could not delete cluster: %v", err) lg.Errorf("could not delete cluster: %v", err)
return nil return nil
} }
close(c.stopChs[clusterName])
c.clustersMu.Lock() func() {
delete(c.clusters, clusterName) defer c.clustersMu.Unlock()
delete(c.stopChs, clusterName) c.clustersMu.Lock()
c.clustersMu.Unlock()
delete(c.clusters, clusterName)
delete(c.clusterLogs, clusterName)
for i, val := range c.teamClusters[teamName] { // on relativel
if val == clusterName {
copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1]
break
}
}
}()
lg.Infof("cluster has been deleted") lg.Infof("cluster has been deleted")
case spec.EventSync: case spec.EventSync:
@ -209,18 +232,11 @@ func (c *Controller) processEvent(obj interface{}) error {
// no race condition because a cluster is always processed by single worker // no race condition because a cluster is always processed by single worker
if !clusterFound { if !clusterFound {
stopCh := make(chan struct{}) cl = c.addCluster(lg, clusterName, event.NewSpec)
cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg)
cl.Run(stopCh)
c.clustersMu.Lock()
c.clusters[clusterName] = cl
c.stopChs[clusterName] = stopCh
c.clustersMu.Unlock()
} }
if err := cl.Sync(); err != nil { if err := cl.Sync(); err != nil {
cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err) cl.Error = fmt.Errorf("could not sync cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)
return nil return nil
} }

162
pkg/controller/status.go Normal file
View File

@ -0,0 +1,162 @@
package controller
import (
"fmt"
"sync/atomic"
"github.com/Sirupsen/logrus"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
)
// ClusterStatus provides status of the cluster
func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) {
clusterName := spec.NamespacedName{
Namespace: c.opConfig.Namespace,
Name: team + "-" + cluster,
}
c.clustersMu.RLock()
cl, ok := c.clusters[clusterName]
c.clustersMu.RUnlock()
if !ok {
return nil, fmt.Errorf("could not find cluster")
}
status := cl.GetStatus()
status.Worker = c.clusterWorkerID(clusterName)
return status, nil
}
// TeamClusterList returns team-clusters map
func (c *Controller) TeamClusterList() map[string][]spec.NamespacedName {
return c.teamClusters
}
// GetConfig returns controller config
func (c *Controller) GetConfig() *spec.ControllerConfig {
return &c.config
}
// GetOperatorConfig returns operator config
func (c *Controller) GetOperatorConfig() *config.Config {
return c.opConfig
}
// GetStatus dumps current config and status of the controller
func (c *Controller) GetStatus() *spec.ControllerStatus {
c.clustersMu.RLock()
clustersCnt := len(c.clusters)
c.clustersMu.RUnlock()
return &spec.ControllerStatus{
LastSyncTime: atomic.LoadInt64(&c.lastClusterSyncTime),
Clusters: clustersCnt,
}
}
// ClusterLogs dumps cluster ring logs
func (c *Controller) ClusterLogs(team, name string) ([]*spec.LogEntry, error) {
clusterName := spec.NamespacedName{
Namespace: c.opConfig.Namespace,
Name: team + "-" + name,
}
c.clustersMu.RLock()
cl, ok := c.clusterLogs[clusterName]
c.clustersMu.RUnlock()
if !ok {
return nil, fmt.Errorf("could not find cluster")
}
res := make([]*spec.LogEntry, 0)
for _, e := range cl.Walk() {
logEntry := e.(*spec.LogEntry)
logEntry.ClusterName = nil
res = append(res, logEntry)
}
return res, nil
}
// WorkerLogs dumps logs of the worker
func (c *Controller) WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) {
lg, ok := c.workerLogs[workerID]
if !ok {
return nil, fmt.Errorf("could not find worker")
}
res := make([]*spec.LogEntry, 0)
for _, e := range lg.Walk() {
logEntry := e.(*spec.LogEntry)
logEntry.Worker = nil
res = append(res, logEntry)
}
return res, nil
}
// Levels returns logrus levels for which hook must fire
func (c *Controller) Levels() []logrus.Level {
return logrus.AllLevels
}
// Fire is a logrus hook
func (c *Controller) Fire(e *logrus.Entry) error {
var clusterName spec.NamespacedName
v, ok := e.Data["cluster-name"]
if !ok {
return nil
}
clusterName = v.(spec.NamespacedName)
c.clustersMu.RLock()
clusterRingLog, ok := c.clusterLogs[clusterName]
c.clustersMu.RUnlock()
if !ok {
return nil
}
logEntry := &spec.LogEntry{
Time: e.Time,
Level: e.Level,
ClusterName: &clusterName,
Message: e.Message,
}
if v, hasWorker := e.Data["worker"]; hasWorker {
id := v.(uint32)
logEntry.Worker = &id
}
clusterRingLog.Insert(logEntry)
if logEntry.Worker == nil {
return nil
}
c.workerLogs[*logEntry.Worker].Insert(logEntry) // workerLogs map is immutable. No need to lock it
return nil
}
// ListQueue dumps cluster event queue of the provided worker
func (c *Controller) ListQueue(workerID uint32) (*spec.QueueDump, error) {
if workerID >= uint32(len(c.clusterEventQueues)) {
return nil, fmt.Errorf("could not find worker")
}
q := c.clusterEventQueues[workerID]
return &spec.QueueDump{
Keys: q.ListKeys(),
List: q.List(),
}, nil
}
// GetWorkersCnt returns number of the workers
func (c *Controller) GetWorkersCnt() uint32 {
return c.opConfig.Workers
}

View File

@ -4,9 +4,12 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/Sirupsen/logrus"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/apps/v1beta1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
) )
@ -73,6 +76,42 @@ type UserSyncer interface {
ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error
} }
// LogEntry describes log entry in the RingLogger
type LogEntry struct {
Time time.Time
Level logrus.Level
ClusterName *NamespacedName `json:",omitempty"`
Worker *uint32 `json:",omitempty"`
Message string
}
// ClusterStatus describes status of the cluster
type ClusterStatus struct {
Team string
Cluster string
MasterService *v1.Service
ReplicaService *v1.Service
Endpoint *v1.Endpoints
StatefulSet *v1beta1.StatefulSet
Worker uint32
Status PostgresStatus
Spec PostgresSpec
Error error
}
// ControllerStatus describes status of the controller
type ControllerStatus struct {
LastSyncTime int64
Clusters int
}
// QueueDump describes cache.FIFO queue
type QueueDump struct {
Keys []string
List []interface{}
}
// ControllerConfig describes configuration of the controller // ControllerConfig describes configuration of the controller
type ControllerConfig struct { type ControllerConfig struct {
RestConfig *rest.Config `json:"-"` RestConfig *rest.Config `json:"-"`

View File

@ -61,6 +61,8 @@ type Config struct {
MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"`
ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"`
Workers uint32 `name:"workers" default:"4"` Workers uint32 `name:"workers" default:"4"`
APIPort int `name:"api_port" default:"8080"`
RingLogLines int `name:"ring_log_lines" default:"100"`
} }
// MustMarshal marshals the config or panics // MustMarshal marshals the config or panics

View File

@ -0,0 +1,59 @@
package ringlog
import (
"container/list"
"sync"
)
// RingLogger describes ring logger methods
type RingLogger interface {
Insert(interface{})
Walk() []interface{}
}
// RingLog is a capped logger with fixed size
type RingLog struct {
sync.RWMutex
size int
list *list.List
}
// New creates new Ring logger
func New(size int) *RingLog {
r := RingLog{
list: list.New(),
size: size,
}
return &r
}
// Insert inserts new LogEntry into the ring logger
func (r *RingLog) Insert(obj interface{}) {
r.Lock()
defer r.Unlock()
r.list.PushBack(obj)
if r.list.Len() > r.size {
r.list.Remove(r.list.Front())
}
}
// Walk dumps all the LogEntries from the Ring logger
func (r *RingLog) Walk() []interface{} {
res := make([]interface{}, 0)
r.RLock()
defer r.RUnlock()
st := r.list.Front()
for i := 0; i < r.size; i++ {
if st == nil {
return res
}
res = append(res, st.Value)
st = st.Next()
}
return res
}

View File

@ -10,6 +10,8 @@ import (
"github.com/motomux/pretty" "github.com/motomux/pretty"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
) )
@ -71,3 +73,29 @@ OUTER:
} }
return result, len(result) == 0 return result, len(result) == 0
} }
func FindNamedStringSubmatch(r *regexp.Regexp, s string) map[string]string {
matches := r.FindStringSubmatch(s)
grNames := r.SubexpNames()
if matches == nil {
return nil
}
groupMatches := 0
res := make(map[string]string, len(grNames))
for i, n := range grNames {
if n == "" {
continue
}
res[n] = matches[i]
groupMatches++
}
if groupMatches == 0 {
return nil
}
return res
}

View File

@ -6,6 +6,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
) )
@ -44,6 +46,17 @@ var substractTest = []struct {
{[]string{"a", "b", "c", "d"}, []string{"a", "bb", "c", "d"}, []string{"b"}, false}, {[]string{"a", "b", "c", "d"}, []string{"a", "bb", "c", "d"}, []string{"b"}, false},
} }
var substringMatch = []struct {
inRegex *regexp.Regexp
inStr string
out map[string]string
}{
{regexp.MustCompile(`aaaa (?P<num>\d+) bbbb`), "aaaa 123 bbbb", map[string]string{"num": "123"}},
{regexp.MustCompile(`aaaa (?P<num>\d+) bbbb`), "a aa 123 bbbb", nil},
{regexp.MustCompile(`aaaa \d+ bbbb`), "aaaa 123 bbbb", nil},
{regexp.MustCompile(`aaaa (\d+) bbbb`), "aaaa 123 bbbb", nil},
}
func TestRandomPassword(t *testing.T) { func TestRandomPassword(t *testing.T) {
const pwdLength = 10 const pwdLength = 10
pwd := RandomPassword(pwdLength) pwd := RandomPassword(pwdLength)
@ -100,3 +113,12 @@ func TestSubstractSlices(t *testing.T) {
} }
} }
} }
func TestFindNamedStringSubmatch(t *testing.T) {
for _, tt := range substringMatch {
actualRes := FindNamedStringSubmatch(tt.inRegex, tt.inStr)
if !reflect.DeepEqual(actualRes, tt.out) {
t.Errorf("FindNamedStringSubmatch expected: %#v, got: %#v", tt.out, actualRes)
}
}
}