diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a61ce94fd..5ab0e0fef 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -5,7 +5,7 @@ import ( "sync" "github.com/Sirupsen/logrus" - "k8s.io/client-go/kubernetes" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -14,21 +14,26 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" - "github.com/zalando-incubator/postgres-operator/pkg/util/teams" + "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) type Config struct { RestConfig *rest.Config - KubeClient *kubernetes.Clientset - RestClient *rest.RESTClient - TeamsAPIClient *teams.API InfrastructureRoles map[string]spec.PgUser + + NoDatabaseAccess bool + NoTeamsAPI bool + ConfigMapName spec.NamespacedName + Namespace string } type Controller struct { - Config + config Config opConfig *config.Config - logger *logrus.Entry + + logger *logrus.Entry + KubeClient k8sutil.KubernetesClient + RestClient rest.Interface // kubernetes API group REST client clustersMu sync.RWMutex clusters map[spec.NamespacedName]*cluster.Cluster @@ -38,23 +43,16 @@ type Controller struct { podInformer cache.SharedIndexInformer podCh chan spec.PodEvent - clusterEventQueues []*cache.FIFO - + clusterEventQueues []*cache.FIFO lastClusterSyncTime int64 } -func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { +func NewController(controllerConfig *Config) *Controller { logger := logrus.New() - if operatorConfig.DebugLogging { - logger.Level = logrus.DebugLevel - } - - controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger) - return &Controller{ - Config: *controllerConfig, - opConfig: operatorConfig, + config: *controllerConfig, + opConfig: &config.Config{}, logger: logger.WithField("pkg", "controller"), clusters: make(map[spec.NamespacedName]*cluster.Cluster), stopChs: make(map[spec.NamespacedName]chan struct{}), @@ -62,54 +60,82 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { } } -func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer wg.Done() - wg.Add(1) +func (c *Controller) initClients() { + client, err := k8sutil.ClientSet(c.config.RestConfig) + if err != nil { + c.logger.Fatalf("couldn't create client: %v", err) + } + c.KubeClient = k8sutil.NewFromKubernetesInterface(client) - c.initController() + c.RestClient, err = k8sutil.KubernetesRestClient(*c.config.RestConfig) + if err != nil { + c.logger.Fatalf("couldn't create rest client: %v", err) + } +} - go c.runPodInformer(stopCh, wg) - go c.runPostgresqlInformer(stopCh, wg) - go c.podEventsDispatcher(stopCh, wg) - go c.clusterResync(stopCh, wg) - go c.restAPIServer(stopCh, wg) +func (c *Controller) initOperatorConfig() { + configMapData := make(map[string]string) - for i := range c.clusterEventQueues { - go c.processClusterEventsQueue(stopCh, i, wg) + if c.config.ConfigMapName != (spec.NamespacedName{}) { + configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace). + Get(c.config.ConfigMapName.Name, metav1.GetOptions{}) + if err != nil { + panic(err) + } + + configMapData = configMap.Data + } else { + c.logger.Infoln("No ConfigMap specified. Loading default values") } - c.logger.Info("Started working in background") + if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var + configMapData["namespace"] = c.config.Namespace + } + if c.config.NoDatabaseAccess { + configMapData["enable_database_access"] = "false" + } + if c.config.NoTeamsAPI { + configMapData["enable_teams_api"] = "false" + } + + c.opConfig = config.NewFromMap(configMapData) } func (c *Controller) initController() { + c.initClients() + c.initOperatorConfig() + + c.logger.Infof("Config: %s", c.opConfig.MustMarshal()) + + if c.opConfig.DebugLogging { + c.logger.Level = logrus.DebugLevel + } + if err := c.createTPR(); err != nil { c.logger.Fatalf("could not register ThirdPartyResource: %v", err) } - if infraRoles, err := c.getInfrastructureRoles(); err != nil { + if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil { c.logger.Warningf("could not get infrastructure roles: %v", err) } else { - c.InfrastructureRoles = infraRoles + c.config.InfrastructureRoles = infraRoles } // Postgresqls - clusterLw := &cache.ListWatch{ - ListFunc: c.clusterListFunc, - WatchFunc: c.clusterWatchFunc, - } c.postgresqlInformer = cache.NewSharedIndexInformer( - clusterLw, + &cache.ListWatch{ + ListFunc: c.clusterListFunc, + WatchFunc: c.clusterWatchFunc, + }, &spec.Postgresql{}, constants.QueueResyncPeriodTPR, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + cache.Indexers{}) - if err := c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.postgresqlAdd, UpdateFunc: c.postgresqlUpdate, DeleteFunc: c.postgresqlDelete, - }); err != nil { - c.logger.Fatalf("could not add event handlers: %v", err) - } + }) // Pods podLw := &cache.ListWatch{ @@ -123,13 +149,11 @@ func (c *Controller) initController() { constants.QueueResyncPeriodPod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - if err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.podAdd, UpdateFunc: c.podUpdate, DeleteFunc: c.podDelete, - }); err != nil { - c.logger.Fatalf("could not add event handlers: %v", err) - } + }) c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) for i := range c.clusterEventQueues { @@ -144,20 +168,31 @@ func (c *Controller) initController() { } } +func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { + c.initController() + + wg.Add(4) + go c.runPodInformer(stopCh, wg) + go c.runPostgresqlInformer(stopCh, wg) + go c.podEventsDispatcher(stopCh, wg) + go c.clusterResync(stopCh, wg) + + for i := range c.clusterEventQueues { + wg.Add(1) + go c.processClusterEventsQueue(i, stopCh, wg) + } + + c.logger.Info("Started working in background") +} + func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() - wg.Add(1) - go c.podInformer.Run(stopCh) - - <-stopCh + c.podInformer.Run(stopCh) } func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() - wg.Add(1) - go c.postgresqlInformer.Run(stopCh) - - <-stopCh -} + c.postgresqlInformer.Run(stopCh) +} \ No newline at end of file diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 7be7b3497..a8308e86e 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -101,7 +101,6 @@ func (c *Controller) podDelete(obj interface{}) { func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() - wg.Add(1) c.logger.Debugln("Watching all pod events") for { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 6875916ab..c832476f7 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -22,7 +22,6 @@ import ( func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() - wg.Add(1) ticker := time.NewTicker(c.opConfig.ResyncPeriod) for { @@ -229,9 +228,8 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } -func (c *Controller) processClusterEventsQueue(stopCh <-chan struct{}, idx int, wg *sync.WaitGroup) { +func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() - wg.Add(1) go func() { if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil {