diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b0e0962bd..a61ce94fd 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -68,10 +68,14 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { c.initController() - go c.runInformers(stopCh) + go c.runPodInformer(stopCh, wg) + go c.runPostgresqlInformer(stopCh, wg) + go c.podEventsDispatcher(stopCh, wg) + go c.clusterResync(stopCh, wg) + go c.restAPIServer(stopCh, wg) for i := range c.clusterEventQueues { - go c.processClusterEventsQueue(i) + go c.processClusterEventsQueue(stopCh, i, wg) } c.logger.Info("Started working in background") @@ -140,11 +144,20 @@ func (c *Controller) initController() { } } -func (c *Controller) runInformers(stopCh <-chan struct{}) { - go c.postgresqlInformer.Run(stopCh) +func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + go c.podInformer.Run(stopCh) - go c.podEventsDispatcher(stopCh) - go c.clusterResync(stopCh) + + <-stopCh +} + +func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + + go c.postgresqlInformer.Run(stopCh) <-stopCh } diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 3b31d439f..ba44625b6 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -1,6 +1,8 @@ package controller import ( + "sync" + "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/runtime" @@ -112,7 +114,10 @@ func (c *Controller) podDelete(obj interface{}) { c.podCh <- podEvent } -func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { +func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + c.logger.Debugln("Watching all pod events") for { select { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 14997c37f..d01d53eb9 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -3,6 +3,7 @@ package controller import ( "fmt" "reflect" + "sync" "sync/atomic" "time" @@ -20,7 +21,9 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/constants" ) -func (c *Controller) clusterResync(stopCh <-chan struct{}) { +func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) ticker := time.NewTicker(c.opConfig.ResyncPeriod) for { @@ -203,12 +206,17 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } -func (c *Controller) processClusterEventsQueue(idx int) { - for { +func (c *Controller) processClusterEventsQueue(stopCh <-chan struct{}, idx int, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + + go func() { if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { c.logger.Errorf("error when processing cluster events queue: %v", err) } - } + }() + + <-stopCh } func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) { diff --git a/pkg/controller/rest-server.go b/pkg/controller/rest-server.go new file mode 100644 index 000000000..3f251e562 --- /dev/null +++ b/pkg/controller/rest-server.go @@ -0,0 +1,35 @@ +package controller + +import ( + "context" + "fmt" + "net/http" + "sync" + "time" +) + +func (c *Controller) restAPIServer(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + + server := http.Server{ + Addr: fmt.Sprintf(":%d", c.opConfig.Port), + Handler: c, + } + + go func() { + err := server.ListenAndServe() + if err != http.ErrServerClosed { + c.logger.Fatalf("could not start http server: %v", err) + } + }() + c.logger.Infof("listening on %s", server.Addr) + + <-stopCh + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + server.Shutdown(ctx) +} + +func (c *Controller) ServeHTTP(w http.ResponseWriter, r *http.Request) { + fmt.Printf("Request: %+v\n", r.URL.String()) +} diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index cd038e9a8..cf0c5ed85 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -57,6 +57,7 @@ type Config struct { MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` Workers uint32 `name:"workers" default:"4"` + Port uint32 `name:"port" default:"80"` } func (c Config) MustMarshal() string {