api server
This commit is contained in:
		
							parent
							
								
									102b72e8bf
								
							
						
					
					
						commit
						4d4fc394df
					
				|  | @ -0,0 +1,104 @@ | |||
| package apiserver | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"net/http/pprof" | ||||
| 	"sync" | ||||
| 	"regexp" | ||||
| 
 | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| 	"encoding/json" | ||||
| ) | ||||
| 
 | ||||
| type ClusterInformer interface { | ||||
| 	Status() interface{} | ||||
| 	ClusterStatus(team, cluster string) interface{} | ||||
| } | ||||
| 
 | ||||
| type Server struct { | ||||
| 	logger     *logrus.Entry | ||||
| 	http       http.Server | ||||
| 	controller ClusterInformer | ||||
| } | ||||
| 
 | ||||
| var ( | ||||
| 	clusterStatusURL = regexp.MustCompile("^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/?$") | ||||
| 	teamURL          = regexp.MustCompile("^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/?$") | ||||
| ) | ||||
| 
 | ||||
| func HandlerFunc(i interface{}) http.HandlerFunc { | ||||
| 	b, err := json.Marshal(i) | ||||
| 	if err != nil { | ||||
| 		return func(w http.ResponseWriter, r *http.Request) { | ||||
| 			w.WriteHeader(http.StatusInternalServerError) | ||||
| 			fmt.Fprintf(w, "Could not marshal %T: %v", i, err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return func(w http.ResponseWriter, r *http.Request) { | ||||
| 		w.Header().Set("Content-Type", "application/json") | ||||
| 		w.Write(b) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func New(controller ClusterInformer, port int, logger *logrus.Logger) *Server { | ||||
| 	s := &Server{ | ||||
| 		logger:     logger.WithField("pkg", "apiserver"), | ||||
| 		controller: controller, | ||||
| 	} | ||||
| 
 | ||||
| 	mux := http.NewServeMux() | ||||
| 
 | ||||
| 	mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) | ||||
| 	mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) | ||||
| 	mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) | ||||
| 	mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) | ||||
| 	mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) | ||||
| 	mux.Handle("/status", HandlerFunc(s.controller.Status())) | ||||
| 	mux.HandleFunc("/clusters/", func(w http.ResponseWriter, req *http.Request) { | ||||
| 		var resp interface{} | ||||
| 		if matches := clusterStatusURL.FindAllStringSubmatch(req.URL.Path, -1); matches != nil { | ||||
| 			resp = s.controller.ClusterStatus(matches[0][1], matches[0][2]) | ||||
| 		} else if matches := teamURL.FindAllStringSubmatch(req.URL.Path, -1); matches != nil { | ||||
| 			// TODO
 | ||||
| 		} else { | ||||
| 			http.NotFound(w, req) | ||||
| 			return | ||||
| 		} | ||||
| 
 | ||||
| 		b, err := json.Marshal(resp) | ||||
| 		if err != nil { | ||||
| 			w.WriteHeader(http.StatusInternalServerError) | ||||
| 			fmt.Fprintf(w, "Could not marshal %T: %v", resp, err) | ||||
| 		} else { | ||||
| 			w.Header().Set("Content-Type", "application/json") | ||||
| 			w.Write(b) | ||||
| 		} | ||||
| 	}) | ||||
| 
 | ||||
| 	s.http = http.Server{ | ||||
| 		Addr:    fmt.Sprintf(":%d", port), | ||||
| 		Handler: mux, | ||||
| 	} | ||||
| 
 | ||||
| 	return s | ||||
| } | ||||
| 
 | ||||
| func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 
 | ||||
| 	go func() { | ||||
| 		err := s.http.ListenAndServe() | ||||
| 		if err != http.ErrServerClosed { | ||||
| 			s.logger.Fatalf("could not start http server: %v", err) | ||||
| 		} | ||||
| 	}() | ||||
| 	s.logger.Infof("listening on %s", s.http.Addr) | ||||
| 
 | ||||
| 	<-stopCh | ||||
| 
 | ||||
| 	ctx, _ := context.WithCancel(context.Background()) | ||||
| 	s.http.Shutdown(ctx) | ||||
| } | ||||
|  | @ -36,11 +36,11 @@ var ( | |||
| // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
 | ||||
| type Config struct { | ||||
| 	OpConfig            config.Config | ||||
| 	RestConfig          *rest.Config | ||||
| 	RestConfig          *rest.Config `json:"-"` | ||||
| 	InfrastructureRoles map[string]spec.PgUser // inherited from the controller
 | ||||
| } | ||||
| 
 | ||||
| type kubeResources struct { | ||||
| type KubeResources struct { | ||||
| 	Service     map[PostgresRole]*v1.Service | ||||
| 	Endpoint    *v1.Endpoints | ||||
| 	Secrets     map[types.UID]*v1.Secret | ||||
|  | @ -50,7 +50,7 @@ type kubeResources struct { | |||
| } | ||||
| 
 | ||||
| type Cluster struct { | ||||
| 	kubeResources | ||||
| 	KubeResources | ||||
| 	spec.Postgresql | ||||
| 	Config | ||||
| 	logger           *logrus.Entry | ||||
|  | @ -79,7 +79,7 @@ type compareStatefulsetResult struct { | |||
| // New creates a new cluster. This function should be called from a controller.
 | ||||
| func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { | ||||
| 	lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Name) | ||||
| 	kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} | ||||
| 	kubeResources := KubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} | ||||
| 	orphanDependents := true | ||||
| 
 | ||||
| 	podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { | ||||
|  | @ -98,7 +98,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql | |||
| 		pgUsers:          make(map[string]spec.PgUser), | ||||
| 		systemUsers:      make(map[string]spec.PgUser), | ||||
| 		podSubscribers:   make(map[spec.NamespacedName]chan spec.PodEvent), | ||||
| 		kubeResources:    kubeResources, | ||||
| 		KubeResources:    kubeResources, | ||||
| 		masterLess:       false, | ||||
| 		userSyncStrategy: users.DefaultUserSyncStrategy{}, | ||||
| 		deleteOptions:    &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, | ||||
|  |  | |||
|  | @ -15,10 +15,11 @@ import ( | |||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/apiserver" | ||||
| ) | ||||
| 
 | ||||
| type Config struct { | ||||
| 	RestConfig          *rest.Config | ||||
| 	RestConfig          *rest.Config `json:"-"` | ||||
| 	InfrastructureRoles map[string]spec.PgUser | ||||
| 
 | ||||
| 	NoDatabaseAccess bool | ||||
|  | @ -34,6 +35,7 @@ type Controller struct { | |||
| 	logger     *logrus.Entry | ||||
| 	KubeClient k8sutil.KubernetesClient | ||||
| 	RestClient rest.Interface // kubernetes API group REST client
 | ||||
| 	apiserver  *apiserver.Server | ||||
| 
 | ||||
| 	clustersMu sync.RWMutex | ||||
| 	clusters   map[spec.NamespacedName]*cluster.Cluster | ||||
|  | @ -166,6 +168,8 @@ func (c *Controller) initController() { | |||
| 			return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil | ||||
| 		}) | ||||
| 	} | ||||
| 
 | ||||
| 	c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
|  | @ -176,7 +180,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | |||
| 	go c.runPostgresqlInformer(stopCh, wg) | ||||
| 	go c.podEventsDispatcher(stopCh, wg) | ||||
| 	go c.clusterResync(stopCh, wg) | ||||
| 	go c.RunAPIServer(stopCh, wg) | ||||
| 	go c.apiserver.Run(stopCh, wg) | ||||
| 
 | ||||
| 	for i := range c.clusterEventQueues { | ||||
| 		wg.Add(1) | ||||
|  |  | |||
|  | @ -1,43 +0,0 @@ | |||
| package controller | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"net/http/pprof" | ||||
| 	"sync" | ||||
| ) | ||||
| 
 | ||||
| func (c *Controller) RunAPIServer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 
 | ||||
| 	mux := http.NewServeMux() | ||||
| 
 | ||||
| 	mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) | ||||
| 	mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) | ||||
| 	mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) | ||||
| 	mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) | ||||
| 	mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) | ||||
| 
 | ||||
| 	server := http.Server{ | ||||
| 		Addr:    fmt.Sprintf(":%d", c.opConfig.APIPort), | ||||
| 		Handler: mux, | ||||
| 	} | ||||
| 
 | ||||
| 	go func() { | ||||
| 		err := server.ListenAndServe() | ||||
| 		if err != http.ErrServerClosed { | ||||
| 			c.logger.Fatalf("could not start http server: %v", err) | ||||
| 		} | ||||
| 	}() | ||||
| 	c.logger.Infof("listening on %s", server.Addr) | ||||
| 
 | ||||
| 	<-stopCh | ||||
| 
 | ||||
| 	ctx, _ := context.WithCancel(context.Background()) | ||||
| 	server.Shutdown(ctx) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 	fmt.Printf("Request: %+v\n", r.URL.String()) | ||||
| } | ||||
|  | @ -0,0 +1,52 @@ | |||
| package controller | ||||
| 
 | ||||
| import ( | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/config" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/cluster" | ||||
| ) | ||||
| 
 | ||||
| type controllerStatus struct { | ||||
| 	ControllerConfig Config | ||||
| 	OperatorConfig   config.Config | ||||
| 	LastSyncTime     int64 | ||||
| } | ||||
| 
 | ||||
| type clusterStatus struct { | ||||
| 	Team    string | ||||
| 	Cluster string | ||||
| 
 | ||||
| 	Config    cluster.Config | ||||
| 	Status    spec.PostgresStatus | ||||
| 	Resources cluster.KubeResources | ||||
| 	Spec      spec.PostgresSpec | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) ClusterStatus(team, cluster string) interface{} { | ||||
| 	clusterName := spec.NamespacedName{ | ||||
| 		Namespace: c.opConfig.Namespace, | ||||
| 		Name:      team + "-" + cluster, | ||||
| 	} | ||||
| 
 | ||||
| 	cl, ok := c.clusters[clusterName] | ||||
| 	if !ok { | ||||
| 		return struct{}{} | ||||
| 	} | ||||
| 
 | ||||
| 	return clusterStatus{ | ||||
| 		Config:    cl.Config, | ||||
| 		Cluster:   cl.Spec.ClusterName, | ||||
| 		Team:      cl.Spec.TeamID, | ||||
| 		Status:    cl.Status, | ||||
| 		Resources: cl.KubeResources, | ||||
| 		Spec:      cl.Spec, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) Status() interface{} { | ||||
| 	return controllerStatus{ | ||||
| 		ControllerConfig: c.config, | ||||
| 		OperatorConfig:   *c.opConfig, | ||||
| 		LastSyncTime:     c.lastClusterSyncTime, | ||||
| 	} | ||||
| } | ||||
		Loading…
	
		Reference in New Issue