diagnostic rest api [WIP]
This commit is contained in:
		
							parent
							
								
									a8ed1e25b4
								
							
						
					
					
						commit
						27b485641e
					
				|  | @ -68,10 +68,14 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | |||
| 
 | ||||
| 	c.initController() | ||||
| 
 | ||||
| 	go c.runInformers(stopCh) | ||||
| 	go c.runPodInformer(stopCh, wg) | ||||
| 	go c.runPostgresqlInformer(stopCh, wg) | ||||
| 	go c.podEventsDispatcher(stopCh, wg) | ||||
| 	go c.clusterResync(stopCh, wg) | ||||
| 	go c.restAPIServer(stopCh, wg) | ||||
| 
 | ||||
| 	for i := range c.clusterEventQueues { | ||||
| 		go c.processClusterEventsQueue(i) | ||||
| 		go c.processClusterEventsQueue(stopCh, i, wg) | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Info("Started working in background") | ||||
|  | @ -140,11 +144,20 @@ func (c *Controller) initController() { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) runInformers(stopCh <-chan struct{}) { | ||||
| 	go c.postgresqlInformer.Run(stopCh) | ||||
| func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 	wg.Add(1) | ||||
| 
 | ||||
| 	go c.podInformer.Run(stopCh) | ||||
| 	go c.podEventsDispatcher(stopCh) | ||||
| 	go c.clusterResync(stopCh) | ||||
| 
 | ||||
| 	<-stopCh | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 	wg.Add(1) | ||||
| 
 | ||||
| 	go c.postgresqlInformer.Run(stopCh) | ||||
| 
 | ||||
| 	<-stopCh | ||||
| } | ||||
|  |  | |||
|  | @ -1,6 +1,8 @@ | |||
| package controller | ||||
| 
 | ||||
| import ( | ||||
| 	"sync" | ||||
| 
 | ||||
| 	"k8s.io/client-go/pkg/api" | ||||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| 	"k8s.io/client-go/pkg/runtime" | ||||
|  | @ -112,7 +114,10 @@ func (c *Controller) podDelete(obj interface{}) { | |||
| 	c.podCh <- podEvent | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { | ||||
| func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 	wg.Add(1) | ||||
| 
 | ||||
| 	c.logger.Debugln("Watching all pod events") | ||||
| 	for { | ||||
| 		select { | ||||
|  |  | |||
|  | @ -3,6 +3,7 @@ package controller | |||
| import ( | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
| 
 | ||||
|  | @ -20,7 +21,9 @@ import ( | |||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||
| ) | ||||
| 
 | ||||
| func (c *Controller) clusterResync(stopCh <-chan struct{}) { | ||||
| func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 	wg.Add(1) | ||||
| 	ticker := time.NewTicker(c.opConfig.ResyncPeriod) | ||||
| 
 | ||||
| 	for { | ||||
|  | @ -203,12 +206,17 @@ func (c *Controller) processEvent(obj interface{}) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) processClusterEventsQueue(idx int) { | ||||
| 	for { | ||||
| func (c *Controller) processClusterEventsQueue(stopCh <-chan struct{}, idx int, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 	wg.Add(1) | ||||
| 
 | ||||
| 	go func() { | ||||
| 		if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { | ||||
| 			c.logger.Errorf("error when processing cluster events queue: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 	}() | ||||
| 
 | ||||
| 	<-stopCh | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) { | ||||
|  |  | |||
|  | @ -0,0 +1,35 @@ | |||
| package controller | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| func (c *Controller) restAPIServer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 	wg.Add(1) | ||||
| 
 | ||||
| 	server := http.Server{ | ||||
| 		Addr:    fmt.Sprintf(":%d", c.opConfig.Port), | ||||
| 		Handler: c, | ||||
| 	} | ||||
| 
 | ||||
| 	go func() { | ||||
| 		err := server.ListenAndServe() | ||||
| 		if err != http.ErrServerClosed { | ||||
| 			c.logger.Fatalf("could not start http server: %v", err) | ||||
| 		} | ||||
| 	}() | ||||
| 	c.logger.Infof("listening on %s", server.Addr) | ||||
| 
 | ||||
| 	<-stopCh | ||||
| 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) | ||||
| 	server.Shutdown(ctx) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 	fmt.Printf("Request: %+v\n", r.URL.String()) | ||||
| } | ||||
|  | @ -57,6 +57,7 @@ type Config struct { | |||
| 	MasterDNSNameFormat  stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` | ||||
| 	ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` | ||||
| 	Workers              uint32         `name:"workers" default:"4"` | ||||
| 	Port                 uint32         `name:"port" default:"80"` | ||||
| } | ||||
| 
 | ||||
| func (c Config) MustMarshal() string { | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue