From 4d4fc394dfe6516f9a61c3f5783751ffdec29698 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Wed, 26 Jul 2017 17:00:52 +0200 Subject: [PATCH] api server --- pkg/apiserver/apiserver.go | 104 ++++++++++++++++++++++++++++++++++ pkg/cluster/cluster.go | 10 ++-- pkg/controller/controller.go | 8 ++- pkg/controller/rest-server.go | 43 -------------- pkg/controller/status.go | 52 +++++++++++++++++ 5 files changed, 167 insertions(+), 50 deletions(-) create mode 100644 pkg/apiserver/apiserver.go delete mode 100644 pkg/controller/rest-server.go create mode 100644 pkg/controller/status.go diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go new file mode 100644 index 000000000..6a2c0dfb8 --- /dev/null +++ b/pkg/apiserver/apiserver.go @@ -0,0 +1,104 @@ +package apiserver + +import ( + "context" + "fmt" + "net/http" + "net/http/pprof" + "sync" + "regexp" + + "github.com/Sirupsen/logrus" + "encoding/json" +) + +type ClusterInformer interface { + Status() interface{} + ClusterStatus(team, cluster string) interface{} +} + +type Server struct { + logger *logrus.Entry + http http.Server + controller ClusterInformer +} + +var ( + clusterStatusURL = regexp.MustCompile("^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/(?P[a-zA-Z][a-zA-Z0-9]*)/?$") + teamURL = regexp.MustCompile("^/clusters/(?P[a-zA-Z][a-zA-Z0-9]*)/?$") +) + +func HandlerFunc(i interface{}) http.HandlerFunc { + b, err := json.Marshal(i) + if err != nil { + return func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Could not marshal %T: %v", i, err) + } + } + + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write(b) + } +} + +func New(controller ClusterInformer, port int, logger *logrus.Logger) *Server { + s := &Server{ + logger: logger.WithField("pkg", "apiserver"), + controller: controller, + } + + mux := http.NewServeMux() + + mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + mux.Handle("/status", HandlerFunc(s.controller.Status())) + mux.HandleFunc("/clusters/", func(w http.ResponseWriter, req *http.Request) { + var resp interface{} + if matches := clusterStatusURL.FindAllStringSubmatch(req.URL.Path, -1); matches != nil { + resp = s.controller.ClusterStatus(matches[0][1], matches[0][2]) + } else if matches := teamURL.FindAllStringSubmatch(req.URL.Path, -1); matches != nil { + // TODO + } else { + http.NotFound(w, req) + return + } + + b, err := json.Marshal(resp) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Could not marshal %T: %v", resp, err) + } else { + w.Header().Set("Content-Type", "application/json") + w.Write(b) + } + }) + + s.http = http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: mux, + } + + return s +} + +func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + + go func() { + err := s.http.ListenAndServe() + if err != http.ErrServerClosed { + s.logger.Fatalf("could not start http server: %v", err) + } + }() + s.logger.Infof("listening on %s", s.http.Addr) + + <-stopCh + + ctx, _ := context.WithCancel(context.Background()) + s.http.Shutdown(ctx) +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1d7ffef95..2f34000d8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -36,11 +36,11 @@ var ( // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. type Config struct { OpConfig config.Config - RestConfig *rest.Config + RestConfig *rest.Config `json:"-"` InfrastructureRoles map[string]spec.PgUser // inherited from the controller } -type kubeResources struct { +type KubeResources struct { Service map[PostgresRole]*v1.Service Endpoint *v1.Endpoints Secrets map[types.UID]*v1.Secret @@ -50,7 +50,7 @@ type kubeResources struct { } type Cluster struct { - kubeResources + KubeResources spec.Postgresql Config logger *logrus.Entry @@ -79,7 +79,7 @@ type compareStatefulsetResult struct { // New creates a new cluster. This function should be called from a controller. func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Name) - kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} + kubeResources := KubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} orphanDependents := true podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { @@ -98,7 +98,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql pgUsers: make(map[string]spec.PgUser), systemUsers: make(map[string]spec.PgUser), podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), - kubeResources: kubeResources, + KubeResources: kubeResources, masterLess: false, userSyncStrategy: users.DefaultUserSyncStrategy{}, deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index c46bf0bc1..1b9ccad1b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -15,10 +15,11 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" + "github.com/zalando-incubator/postgres-operator/pkg/apiserver" ) type Config struct { - RestConfig *rest.Config + RestConfig *rest.Config `json:"-"` InfrastructureRoles map[string]spec.PgUser NoDatabaseAccess bool @@ -34,6 +35,7 @@ type Controller struct { logger *logrus.Entry KubeClient k8sutil.KubernetesClient RestClient rest.Interface // kubernetes API group REST client + apiserver *apiserver.Server clustersMu sync.RWMutex clusters map[spec.NamespacedName]*cluster.Cluster @@ -166,6 +168,8 @@ func (c *Controller) initController() { return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil }) } + + c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger) } func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { @@ -176,7 +180,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { go c.runPostgresqlInformer(stopCh, wg) go c.podEventsDispatcher(stopCh, wg) go c.clusterResync(stopCh, wg) - go c.RunAPIServer(stopCh, wg) + go c.apiserver.Run(stopCh, wg) for i := range c.clusterEventQueues { wg.Add(1) diff --git a/pkg/controller/rest-server.go b/pkg/controller/rest-server.go deleted file mode 100644 index 017a0ae0b..000000000 --- a/pkg/controller/rest-server.go +++ /dev/null @@ -1,43 +0,0 @@ -package controller - -import ( - "context" - "fmt" - "net/http" - "net/http/pprof" - "sync" -) - -func (c *Controller) RunAPIServer(stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer wg.Done() - - mux := http.NewServeMux() - - mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) - mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) - mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) - mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) - mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) - - server := http.Server{ - Addr: fmt.Sprintf(":%d", c.opConfig.APIPort), - Handler: mux, - } - - go func() { - err := server.ListenAndServe() - if err != http.ErrServerClosed { - c.logger.Fatalf("could not start http server: %v", err) - } - }() - c.logger.Infof("listening on %s", server.Addr) - - <-stopCh - - ctx, _ := context.WithCancel(context.Background()) - server.Shutdown(ctx) -} - -func (c *Controller) ServeHTTP(w http.ResponseWriter, r *http.Request) { - fmt.Printf("Request: %+v\n", r.URL.String()) -} diff --git a/pkg/controller/status.go b/pkg/controller/status.go new file mode 100644 index 000000000..bd364f825 --- /dev/null +++ b/pkg/controller/status.go @@ -0,0 +1,52 @@ +package controller + +import ( + "github.com/zalando-incubator/postgres-operator/pkg/util/config" + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/cluster" +) + +type controllerStatus struct { + ControllerConfig Config + OperatorConfig config.Config + LastSyncTime int64 +} + +type clusterStatus struct { + Team string + Cluster string + + Config cluster.Config + Status spec.PostgresStatus + Resources cluster.KubeResources + Spec spec.PostgresSpec +} + +func (c *Controller) ClusterStatus(team, cluster string) interface{} { + clusterName := spec.NamespacedName{ + Namespace: c.opConfig.Namespace, + Name: team + "-" + cluster, + } + + cl, ok := c.clusters[clusterName] + if !ok { + return struct{}{} + } + + return clusterStatus{ + Config: cl.Config, + Cluster: cl.Spec.ClusterName, + Team: cl.Spec.TeamID, + Status: cl.Status, + Resources: cl.KubeResources, + Spec: cl.Spec, + } +} + +func (c *Controller) Status() interface{} { + return controllerStatus{ + ControllerConfig: c.config, + OperatorConfig: *c.opConfig, + LastSyncTime: c.lastClusterSyncTime, + } +}