diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index 270df6b0e..c7d770c69 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -29,3 +29,5 @@ data: teams_api_url: http://fake-teams-api.default.svc.cluster.local workers: "4" enable_load_balancer: "true" + api_port: "8080" + ring_log_lines: "100" diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go new file mode 100644 index 000000000..b99a4bf71 --- /dev/null +++ b/pkg/apiserver/apiserver.go @@ -0,0 +1,219 @@ +package apiserver + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/pprof" + "regexp" + "strconv" + "sync" + "time" + + "github.com/Sirupsen/logrus" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util" + "github.com/zalando-incubator/postgres-operator/pkg/util/config" +) + +const ( + httpAPITimeout = time.Minute * 1 + shutdownTimeout = time.Second * 10 + httpReadTimeout = time.Millisecond * 100 +) + +// ControllerInformer describes stats methods of a controller +type controllerInformer interface { + GetConfig() *spec.ControllerConfig + GetOperatorConfig() *config.Config + GetStatus() *spec.ControllerStatus + TeamClusterList() map[string][]spec.NamespacedName + ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) + ClusterLogs(team, cluster string) ([]*spec.LogEntry, error) + WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) + ListQueue(workerID uint32) (*spec.QueueDump, error) + GetWorkersCnt() uint32 +} + +// Server describes HTTP API server +type Server struct { + logger *logrus.Entry + http http.Server + controller controllerInformer +} + +var ( + clusterStatusURL = regexp.MustCompile(`^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/(?P[a-zA-Z][a-zA-Z0-9]*)/?$`) + clusterLogsURL = regexp.MustCompile(`^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/(?P[a-zA-Z][a-zA-Z0-9]*)/logs/?$`) + teamURL = regexp.MustCompile(`^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/?$`) + workerLogsURL = regexp.MustCompile(`^/workers/(?P\d+)/logs/?$`) + workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P\d+)/queue/?$`) + workerAllQueue = regexp.MustCompile(`^/workers/all/queue/?$`) + clustersURL = "/clusters/" +) + +// New creates new HTTP API server +func New(controller controllerInformer, port int, logger *logrus.Logger) *Server { + s := &Server{ + logger: logger.WithField("pkg", "apiserver"), + controller: controller, + } + mux := http.NewServeMux() + + mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + + mux.Handle("/status/", http.HandlerFunc(s.controllerStatus)) + mux.Handle("/config/", http.HandlerFunc(s.operatorConfig)) + + mux.HandleFunc("/clusters/", s.clusters) + mux.HandleFunc("/workers/", s.workers) + + s.http = http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: http.TimeoutHandler(mux, httpAPITimeout, ""), + ReadTimeout: httpReadTimeout, + } + + return s +} + +// Run starts the HTTP server +func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + + go func() { + err := s.http.ListenAndServe() + if err != http.ErrServerClosed { + s.logger.Fatalf("Could not start http server: %v", err) + } + }() + s.logger.Infof("Listening on %s", s.http.Addr) + + <-stopCh + + ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) + defer cancel() + err := s.http.Shutdown(ctx) + if err == context.DeadlineExceeded { + s.logger.Warnf("Shutdown timeout exceeded. closing http server") + s.http.Close() + } else if err != nil { + s.logger.Errorf("Could not shutdown http server: %v", err) + } + s.logger.Infoln("Http server shut down") +} + +func (s *Server) respond(obj interface{}, err error, w http.ResponseWriter) { + w.Header().Set("Content-Type", "application/json") + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(map[string]interface{}{"error": err.Error()}) + return + } + + err = json.NewEncoder(w).Encode(obj) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + s.logger.Errorf("Could not encode: %v", err) + } +} + +func (s *Server) controllerStatus(w http.ResponseWriter, req *http.Request) { + s.respond(s.controller.GetStatus(), nil, w) +} + +func (s *Server) operatorConfig(w http.ResponseWriter, req *http.Request) { + s.respond(map[string]interface{}{ + "controller": s.controller.GetConfig(), + "operator": s.controller.GetOperatorConfig(), + }, nil, w) +} + +func (s *Server) clusters(w http.ResponseWriter, req *http.Request) { + var ( + resp interface{} + err error + ) + + if matches := util.FindNamedStringSubmatch(clusterStatusURL, req.URL.Path); matches != nil { + resp, err = s.controller.ClusterStatus(matches["team"], matches["cluster"]) + } else if matches := util.FindNamedStringSubmatch(teamURL, req.URL.Path); matches != nil { + teamClusters := s.controller.TeamClusterList() + clusters, found := teamClusters[matches["team"]] + if !found { + s.respond(nil, fmt.Errorf("could not find clusters for the team"), w) + } + + clusterNames := make([]string, 0) + for _, cluster := range clusters { + clusterNames = append(clusterNames, cluster.Name[len(matches["team"])+1:]) + } + + s.respond(clusterNames, nil, w) + return + } else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil { + resp, err = s.controller.ClusterLogs(matches["team"], matches["cluster"]) + } else if req.URL.Path == clustersURL { + res := make(map[string][]string) + for team, clusters := range s.controller.TeamClusterList() { + for _, cluster := range clusters { + res[team] = append(res[team], cluster.Name[len(team)+1:]) + } + } + + s.respond(res, nil, w) + return + } else { + s.respond(nil, fmt.Errorf("page not found"), w) + return + } + + s.respond(resp, err, w) +} + +func (s *Server) workers(w http.ResponseWriter, req *http.Request) { + var ( + resp interface{} + err error + ) + + if workerAllQueue.MatchString(req.URL.Path) { + s.allQueues(w, req) + return + } else if matches := util.FindNamedStringSubmatch(workerLogsURL, req.URL.Path); matches != nil { + workerID, _ := strconv.Atoi(matches["id"]) + + resp, err = s.controller.WorkerLogs(uint32(workerID)) + } else if matches := util.FindNamedStringSubmatch(workerEventsQueueURL, req.URL.Path); matches != nil { + workerID, _ := strconv.Atoi(matches["id"]) + + resp, err = s.controller.ListQueue(uint32(workerID)) + } else { + s.respond(nil, fmt.Errorf("page not found"), w) + return + } + + s.respond(resp, err, w) +} + +func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) { + workersCnt := s.controller.GetWorkersCnt() + resp := make(map[uint32]*spec.QueueDump, workersCnt) + for i := uint32(0); i < workersCnt; i++ { + queueDump, err := s.controller.ListQueue(i) + if err != nil { + s.respond(nil, err, w) + return + } + + resp[i] = queueDump + } + + s.respond(resp, nil, w) +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 618139d8a..84a391949 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -644,3 +644,20 @@ func (c *Cluster) initInfrastructureRoles() error { } return nil } + +// GetStatus provides status of the cluster +func (c *Cluster) GetStatus() *spec.ClusterStatus { + return &spec.ClusterStatus{ + Cluster: c.Spec.ClusterName, + Team: c.Spec.TeamID, + Status: c.Status, + Spec: c.Spec, + + MasterService: c.GetServiceMaster(), + ReplicaService: c.GetServiceReplica(), + Endpoint: c.GetEndpoint(), + StatefulSet: c.GetStatefulSet(), + + Error: c.Error, + } +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index a6f6cc70b..8939a7bda 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -419,3 +419,23 @@ func (c *Cluster) createRoles() (err error) { // TODO: figure out what to do with duplicate names (humans and robots) among pgUsers return c.syncRoles(false) } + +// GetServiceMaster returns cluster's kubernetes master Service +func (c *Cluster) GetServiceMaster() *v1.Service { + return c.Service[master] +} + +// GetServiceReplica returns cluster's kubernetes replica Service +func (c *Cluster) GetServiceReplica() *v1.Service { + return c.Service[replica] +} + +// GetEndpoint returns cluster's kubernetes Endpoint +func (c *Cluster) GetEndpoint() *v1.Endpoints { + return c.Endpoint +} + +// GetStatefulSet returns cluster's kubernetes StatefulSet +func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet { + return c.Statefulset +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7c4dc6328..8bc9632f6 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -10,11 +10,13 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "github.com/zalando-incubator/postgres-operator/pkg/apiserver" "github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" + "github.com/zalando-incubator/postgres-operator/pkg/util/ringlog" ) // Controller represents operator controller @@ -25,10 +27,14 @@ type Controller struct { logger *logrus.Entry KubeClient k8sutil.KubernetesClient RestClient rest.Interface // kubernetes API group REST client + apiserver *apiserver.Server - clustersMu sync.RWMutex - clusters map[spec.NamespacedName]*cluster.Cluster - stopChs map[spec.NamespacedName]chan struct{} + stopCh chan struct{} + + clustersMu sync.RWMutex + clusters map[spec.NamespacedName]*cluster.Cluster + clusterLogs map[spec.NamespacedName]ringlog.RingLogger + teamClusters map[string][]spec.NamespacedName postgresqlInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer @@ -36,6 +42,8 @@ type Controller struct { clusterEventQueues []*cache.FIFO // [workerID]Queue lastClusterSyncTime int64 + + workerLogs map[uint32]ringlog.RingLogger } // NewController creates a new controller @@ -43,12 +51,16 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller { logger := logrus.New() c := &Controller{ - config: *controllerConfig, - opConfig: &config.Config{}, - logger: logger.WithField("pkg", "controller"), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), - podCh: make(chan spec.PodEvent), + config: *controllerConfig, + opConfig: &config.Config{}, + logger: logger.WithField("pkg", "controller"), + clusters: make(map[spec.NamespacedName]*cluster.Cluster), + clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger), + teamClusters: make(map[string][]spec.NamespacedName), + stopCh: make(chan struct{}), + podCh: make(chan spec.PodEvent), } + logger.Hooks.Add(c) return c } @@ -149,6 +161,7 @@ func (c *Controller) initController() { }) c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) + c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers) for i := range c.clusterEventQueues { c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { e, ok := obj.(spec.ClusterEvent) @@ -159,19 +172,23 @@ func (c *Controller) initController() { return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil }) } + + c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger) } // Run starts background controller processes func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { c.initController() - wg.Add(3) + wg.Add(4) go c.runPodInformer(stopCh, wg) go c.runPostgresqlInformer(stopCh, wg) go c.clusterResync(stopCh, wg) + go c.apiserver.Run(stopCh, wg) for i := range c.clusterEventQueues { wg.Add(1) + c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines) go c.processClusterEventsQueue(i, stopCh, wg) } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 3407164a4..64c2a32d4 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -4,10 +4,12 @@ import ( "encoding/json" "fmt" "reflect" + "strings" "sync" "sync/atomic" "time" + "github.com/Sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -18,6 +20,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/ringlog" ) func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { @@ -124,6 +127,21 @@ func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interfa }), nil } +func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *spec.Postgresql) *cluster.Cluster { + cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg) + cl.Run(c.stopCh) + teamName := strings.ToLower(cl.Spec.TeamID) + + defer c.clustersMu.Unlock() + c.clustersMu.Lock() + + c.teamClusters[teamName] = append(c.teamClusters[teamName], clusterName) + c.clusters[clusterName] = cl + c.clusterLogs[clusterName] = ringlog.New(c.opConfig.RingLogLines) + + return cl +} + func (c *Controller) processEvent(obj interface{}) error { var clusterName spec.NamespacedName @@ -153,14 +171,7 @@ func (c *Controller) processEvent(obj interface{}) error { lg.Infof("creation of the cluster started") - stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg) - cl.Run(stopCh) - - c.clustersMu.Lock() - c.clusters[clusterName] = cl - c.stopChs[clusterName] = stopCh - c.clustersMu.Unlock() + cl = c.addCluster(lg, clusterName, event.NewSpec) if err := cl.Create(); err != nil { cl.Error = fmt.Errorf("could not create cluster: %v", err) @@ -186,7 +197,9 @@ func (c *Controller) processEvent(obj interface{}) error { cl.Error = nil lg.Infoln("cluster has been updated") case spec.EventDelete: - lg.Infof("deletion of the %q cluster started", clusterName) + teamName := strings.ToLower(cl.Spec.TeamID) + + lg.Infoln("Deletion of the cluster started") if !clusterFound { lg.Errorf("unknown cluster: %q", clusterName) return nil @@ -196,12 +209,22 @@ func (c *Controller) processEvent(obj interface{}) error { lg.Errorf("could not delete cluster: %v", err) return nil } - close(c.stopChs[clusterName]) - c.clustersMu.Lock() - delete(c.clusters, clusterName) - delete(c.stopChs, clusterName) - c.clustersMu.Unlock() + func() { + defer c.clustersMu.Unlock() + c.clustersMu.Lock() + + delete(c.clusters, clusterName) + delete(c.clusterLogs, clusterName) + for i, val := range c.teamClusters[teamName] { // on relativel + if val == clusterName { + copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:]) + c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{} + c.teamClusters[teamName] = c.teamClusters[teamName][:len(c.teamClusters[teamName])-1] + break + } + } + }() lg.Infof("cluster has been deleted") case spec.EventSync: @@ -209,18 +232,11 @@ func (c *Controller) processEvent(obj interface{}) error { // no race condition because a cluster is always processed by single worker if !clusterFound { - stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, lg) - cl.Run(stopCh) - - c.clustersMu.Lock() - c.clusters[clusterName] = cl - c.stopChs[clusterName] = stopCh - c.clustersMu.Unlock() + cl = c.addCluster(lg, clusterName, event.NewSpec) } if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err) + cl.Error = fmt.Errorf("could not sync cluster: %v", err) lg.Error(cl.Error) return nil } diff --git a/pkg/controller/status.go b/pkg/controller/status.go new file mode 100644 index 000000000..4dc036a98 --- /dev/null +++ b/pkg/controller/status.go @@ -0,0 +1,162 @@ +package controller + +import ( + "fmt" + "sync/atomic" + + "github.com/Sirupsen/logrus" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util/config" +) + +// ClusterStatus provides status of the cluster +func (c *Controller) ClusterStatus(team, cluster string) (*spec.ClusterStatus, error) { + clusterName := spec.NamespacedName{ + Namespace: c.opConfig.Namespace, + Name: team + "-" + cluster, + } + + c.clustersMu.RLock() + cl, ok := c.clusters[clusterName] + c.clustersMu.RUnlock() + if !ok { + return nil, fmt.Errorf("could not find cluster") + } + + status := cl.GetStatus() + status.Worker = c.clusterWorkerID(clusterName) + + return status, nil +} + +// TeamClusterList returns team-clusters map +func (c *Controller) TeamClusterList() map[string][]spec.NamespacedName { + return c.teamClusters +} + +// GetConfig returns controller config +func (c *Controller) GetConfig() *spec.ControllerConfig { + return &c.config +} + +// GetOperatorConfig returns operator config +func (c *Controller) GetOperatorConfig() *config.Config { + return c.opConfig +} + +// GetStatus dumps current config and status of the controller +func (c *Controller) GetStatus() *spec.ControllerStatus { + c.clustersMu.RLock() + clustersCnt := len(c.clusters) + c.clustersMu.RUnlock() + + return &spec.ControllerStatus{ + LastSyncTime: atomic.LoadInt64(&c.lastClusterSyncTime), + Clusters: clustersCnt, + } +} + +// ClusterLogs dumps cluster ring logs +func (c *Controller) ClusterLogs(team, name string) ([]*spec.LogEntry, error) { + clusterName := spec.NamespacedName{ + Namespace: c.opConfig.Namespace, + Name: team + "-" + name, + } + + c.clustersMu.RLock() + cl, ok := c.clusterLogs[clusterName] + c.clustersMu.RUnlock() + if !ok { + return nil, fmt.Errorf("could not find cluster") + } + + res := make([]*spec.LogEntry, 0) + for _, e := range cl.Walk() { + logEntry := e.(*spec.LogEntry) + logEntry.ClusterName = nil + + res = append(res, logEntry) + } + + return res, nil +} + +// WorkerLogs dumps logs of the worker +func (c *Controller) WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) { + lg, ok := c.workerLogs[workerID] + if !ok { + return nil, fmt.Errorf("could not find worker") + } + + res := make([]*spec.LogEntry, 0) + for _, e := range lg.Walk() { + logEntry := e.(*spec.LogEntry) + logEntry.Worker = nil + + res = append(res, logEntry) + } + + return res, nil +} + +// Levels returns logrus levels for which hook must fire +func (c *Controller) Levels() []logrus.Level { + return logrus.AllLevels +} + +// Fire is a logrus hook +func (c *Controller) Fire(e *logrus.Entry) error { + var clusterName spec.NamespacedName + + v, ok := e.Data["cluster-name"] + if !ok { + return nil + } + clusterName = v.(spec.NamespacedName) + c.clustersMu.RLock() + clusterRingLog, ok := c.clusterLogs[clusterName] + c.clustersMu.RUnlock() + if !ok { + return nil + } + + logEntry := &spec.LogEntry{ + Time: e.Time, + Level: e.Level, + ClusterName: &clusterName, + Message: e.Message, + } + + if v, hasWorker := e.Data["worker"]; hasWorker { + id := v.(uint32) + + logEntry.Worker = &id + } + clusterRingLog.Insert(logEntry) + + if logEntry.Worker == nil { + return nil + } + c.workerLogs[*logEntry.Worker].Insert(logEntry) // workerLogs map is immutable. No need to lock it + + return nil +} + +// ListQueue dumps cluster event queue of the provided worker +func (c *Controller) ListQueue(workerID uint32) (*spec.QueueDump, error) { + if workerID >= uint32(len(c.clusterEventQueues)) { + return nil, fmt.Errorf("could not find worker") + } + + q := c.clusterEventQueues[workerID] + return &spec.QueueDump{ + Keys: q.ListKeys(), + List: q.List(), + }, nil +} + +// GetWorkersCnt returns number of the workers +func (c *Controller) GetWorkersCnt() uint32 { + return c.opConfig.Workers +} diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 438edbdd5..43f388902 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -4,9 +4,12 @@ import ( "database/sql" "fmt" "strings" + "time" + "github.com/Sirupsen/logrus" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/rest" ) @@ -73,6 +76,42 @@ type UserSyncer interface { ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error } +// LogEntry describes log entry in the RingLogger +type LogEntry struct { + Time time.Time + Level logrus.Level + ClusterName *NamespacedName `json:",omitempty"` + Worker *uint32 `json:",omitempty"` + Message string +} + +// ClusterStatus describes status of the cluster +type ClusterStatus struct { + Team string + Cluster string + MasterService *v1.Service + ReplicaService *v1.Service + Endpoint *v1.Endpoints + StatefulSet *v1beta1.StatefulSet + + Worker uint32 + Status PostgresStatus + Spec PostgresSpec + Error error +} + +// ControllerStatus describes status of the controller +type ControllerStatus struct { + LastSyncTime int64 + Clusters int +} + +// QueueDump describes cache.FIFO queue +type QueueDump struct { + Keys []string + List []interface{} +} + // ControllerConfig describes configuration of the controller type ControllerConfig struct { RestConfig *rest.Config `json:"-"` diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 18a902cc1..086d7200f 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -61,6 +61,8 @@ type Config struct { MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` Workers uint32 `name:"workers" default:"4"` + APIPort int `name:"api_port" default:"8080"` + RingLogLines int `name:"ring_log_lines" default:"100"` } // MustMarshal marshals the config or panics diff --git a/pkg/util/ringlog/ringlog.go b/pkg/util/ringlog/ringlog.go new file mode 100644 index 000000000..1c26ce3ba --- /dev/null +++ b/pkg/util/ringlog/ringlog.go @@ -0,0 +1,59 @@ +package ringlog + +import ( + "container/list" + "sync" +) + +// RingLogger describes ring logger methods +type RingLogger interface { + Insert(interface{}) + Walk() []interface{} +} + +// RingLog is a capped logger with fixed size +type RingLog struct { + sync.RWMutex + size int + list *list.List +} + +// New creates new Ring logger +func New(size int) *RingLog { + r := RingLog{ + list: list.New(), + size: size, + } + + return &r +} + +// Insert inserts new LogEntry into the ring logger +func (r *RingLog) Insert(obj interface{}) { + r.Lock() + defer r.Unlock() + + r.list.PushBack(obj) + if r.list.Len() > r.size { + r.list.Remove(r.list.Front()) + } +} + +// Walk dumps all the LogEntries from the Ring logger +func (r *RingLog) Walk() []interface{} { + res := make([]interface{}, 0) + + r.RLock() + defer r.RUnlock() + + st := r.list.Front() + for i := 0; i < r.size; i++ { + if st == nil { + return res + } + res = append(res, st.Value) + st = st.Next() + } + + return res +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 187a9809e..73273a2fb 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -10,6 +10,8 @@ import ( "github.com/motomux/pretty" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "regexp" + "github.com/zalando-incubator/postgres-operator/pkg/spec" ) @@ -71,3 +73,29 @@ OUTER: } return result, len(result) == 0 } + +func FindNamedStringSubmatch(r *regexp.Regexp, s string) map[string]string { + matches := r.FindStringSubmatch(s) + grNames := r.SubexpNames() + + if matches == nil { + return nil + } + + groupMatches := 0 + res := make(map[string]string, len(grNames)) + for i, n := range grNames { + if n == "" { + continue + } + + res[n] = matches[i] + groupMatches++ + } + + if groupMatches == 0 { + return nil + } + + return res +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 1f49a0ed3..b94852dfb 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -6,6 +6,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "regexp" + "github.com/zalando-incubator/postgres-operator/pkg/spec" ) @@ -44,6 +46,17 @@ var substractTest = []struct { {[]string{"a", "b", "c", "d"}, []string{"a", "bb", "c", "d"}, []string{"b"}, false}, } +var substringMatch = []struct { + inRegex *regexp.Regexp + inStr string + out map[string]string +}{ + {regexp.MustCompile(`aaaa (?P\d+) bbbb`), "aaaa 123 bbbb", map[string]string{"num": "123"}}, + {regexp.MustCompile(`aaaa (?P\d+) bbbb`), "a aa 123 bbbb", nil}, + {regexp.MustCompile(`aaaa \d+ bbbb`), "aaaa 123 bbbb", nil}, + {regexp.MustCompile(`aaaa (\d+) bbbb`), "aaaa 123 bbbb", nil}, +} + func TestRandomPassword(t *testing.T) { const pwdLength = 10 pwd := RandomPassword(pwdLength) @@ -100,3 +113,12 @@ func TestSubstractSlices(t *testing.T) { } } } + +func TestFindNamedStringSubmatch(t *testing.T) { + for _, tt := range substringMatch { + actualRes := FindNamedStringSubmatch(tt.inRegex, tt.inStr) + if !reflect.DeepEqual(actualRes, tt.out) { + t.Errorf("FindNamedStringSubmatch expected: %#v, got: %#v", tt.out, actualRes) + } + } +}