Merge branch 'fix/graceful-shutdown' into feature/diagnostic-rest-api

# Conflicts:
#	pkg/controller/controller.go
#	pkg/controller/pod.go
#	pkg/controller/postgresql.go
This commit is contained in:
Murat Kabilov 2017-07-25 17:09:22 +02:00
commit 54defa8070
3 changed files with 91 additions and 59 deletions

View File

@ -5,7 +5,7 @@ import (
"sync" "sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"k8s.io/client-go/kubernetes" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -14,21 +14,26 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/teams" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
) )
type Config struct { type Config struct {
RestConfig *rest.Config RestConfig *rest.Config
KubeClient *kubernetes.Clientset
RestClient *rest.RESTClient
TeamsAPIClient *teams.API
InfrastructureRoles map[string]spec.PgUser InfrastructureRoles map[string]spec.PgUser
NoDatabaseAccess bool
NoTeamsAPI bool
ConfigMapName spec.NamespacedName
Namespace string
} }
type Controller struct { type Controller struct {
Config config Config
opConfig *config.Config opConfig *config.Config
logger *logrus.Entry
logger *logrus.Entry
KubeClient k8sutil.KubernetesClient
RestClient rest.Interface // kubernetes API group REST client
clustersMu sync.RWMutex clustersMu sync.RWMutex
clusters map[spec.NamespacedName]*cluster.Cluster clusters map[spec.NamespacedName]*cluster.Cluster
@ -38,23 +43,16 @@ type Controller struct {
podInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer
podCh chan spec.PodEvent podCh chan spec.PodEvent
clusterEventQueues []*cache.FIFO clusterEventQueues []*cache.FIFO
lastClusterSyncTime int64 lastClusterSyncTime int64
} }
func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { func NewController(controllerConfig *Config) *Controller {
logger := logrus.New() logger := logrus.New()
if operatorConfig.DebugLogging {
logger.Level = logrus.DebugLevel
}
controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger)
return &Controller{ return &Controller{
Config: *controllerConfig, config: *controllerConfig,
opConfig: operatorConfig, opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"), logger: logger.WithField("pkg", "controller"),
clusters: make(map[spec.NamespacedName]*cluster.Cluster), clusters: make(map[spec.NamespacedName]*cluster.Cluster),
stopChs: make(map[spec.NamespacedName]chan struct{}), stopChs: make(map[spec.NamespacedName]chan struct{}),
@ -62,54 +60,82 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
} }
} }
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) initClients() {
defer wg.Done() client, err := k8sutil.ClientSet(c.config.RestConfig)
wg.Add(1) if err != nil {
c.logger.Fatalf("couldn't create client: %v", err)
}
c.KubeClient = k8sutil.NewFromKubernetesInterface(client)
c.initController() c.RestClient, err = k8sutil.KubernetesRestClient(*c.config.RestConfig)
if err != nil {
c.logger.Fatalf("couldn't create rest client: %v", err)
}
}
go c.runPodInformer(stopCh, wg) func (c *Controller) initOperatorConfig() {
go c.runPostgresqlInformer(stopCh, wg) configMapData := make(map[string]string)
go c.podEventsDispatcher(stopCh, wg)
go c.clusterResync(stopCh, wg)
go c.restAPIServer(stopCh, wg)
for i := range c.clusterEventQueues { if c.config.ConfigMapName != (spec.NamespacedName{}) {
go c.processClusterEventsQueue(stopCh, i, wg) configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace).
Get(c.config.ConfigMapName.Name, metav1.GetOptions{})
if err != nil {
panic(err)
}
configMapData = configMap.Data
} else {
c.logger.Infoln("No ConfigMap specified. Loading default values")
} }
c.logger.Info("Started working in background") if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var
configMapData["namespace"] = c.config.Namespace
}
if c.config.NoDatabaseAccess {
configMapData["enable_database_access"] = "false"
}
if c.config.NoTeamsAPI {
configMapData["enable_teams_api"] = "false"
}
c.opConfig = config.NewFromMap(configMapData)
} }
func (c *Controller) initController() { func (c *Controller) initController() {
c.initClients()
c.initOperatorConfig()
c.logger.Infof("Config: %s", c.opConfig.MustMarshal())
if c.opConfig.DebugLogging {
c.logger.Level = logrus.DebugLevel
}
if err := c.createTPR(); err != nil { if err := c.createTPR(); err != nil {
c.logger.Fatalf("could not register ThirdPartyResource: %v", err) c.logger.Fatalf("could not register ThirdPartyResource: %v", err)
} }
if infraRoles, err := c.getInfrastructureRoles(); err != nil { if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil {
c.logger.Warningf("could not get infrastructure roles: %v", err) c.logger.Warningf("could not get infrastructure roles: %v", err)
} else { } else {
c.InfrastructureRoles = infraRoles c.config.InfrastructureRoles = infraRoles
} }
// Postgresqls // Postgresqls
clusterLw := &cache.ListWatch{
ListFunc: c.clusterListFunc,
WatchFunc: c.clusterWatchFunc,
}
c.postgresqlInformer = cache.NewSharedIndexInformer( c.postgresqlInformer = cache.NewSharedIndexInformer(
clusterLw, &cache.ListWatch{
ListFunc: c.clusterListFunc,
WatchFunc: c.clusterWatchFunc,
},
&spec.Postgresql{}, &spec.Postgresql{},
constants.QueueResyncPeriodTPR, constants.QueueResyncPeriodTPR,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) cache.Indexers{})
if err := c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.postgresqlAdd, AddFunc: c.postgresqlAdd,
UpdateFunc: c.postgresqlUpdate, UpdateFunc: c.postgresqlUpdate,
DeleteFunc: c.postgresqlDelete, DeleteFunc: c.postgresqlDelete,
}); err != nil { })
c.logger.Fatalf("could not add event handlers: %v", err)
}
// Pods // Pods
podLw := &cache.ListWatch{ podLw := &cache.ListWatch{
@ -123,13 +149,11 @@ func (c *Controller) initController() {
constants.QueueResyncPeriodPod, constants.QueueResyncPeriodPod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.podAdd, AddFunc: c.podAdd,
UpdateFunc: c.podUpdate, UpdateFunc: c.podUpdate,
DeleteFunc: c.podDelete, DeleteFunc: c.podDelete,
}); err != nil { })
c.logger.Fatalf("could not add event handlers: %v", err)
}
c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers)
for i := range c.clusterEventQueues { for i := range c.clusterEventQueues {
@ -144,20 +168,31 @@ func (c *Controller) initController() {
} }
} }
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
c.initController()
wg.Add(4)
go c.runPodInformer(stopCh, wg)
go c.runPostgresqlInformer(stopCh, wg)
go c.podEventsDispatcher(stopCh, wg)
go c.clusterResync(stopCh, wg)
for i := range c.clusterEventQueues {
wg.Add(1)
go c.processClusterEventsQueue(i, stopCh, wg)
}
c.logger.Info("Started working in background")
}
func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) runPodInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
wg.Add(1)
go c.podInformer.Run(stopCh) c.podInformer.Run(stopCh)
<-stopCh
} }
func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
wg.Add(1)
go c.postgresqlInformer.Run(stopCh) c.postgresqlInformer.Run(stopCh)
}
<-stopCh
}

View File

@ -101,7 +101,6 @@ func (c *Controller) podDelete(obj interface{}) {
func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
wg.Add(1)
c.logger.Debugln("Watching all pod events") c.logger.Debugln("Watching all pod events")
for { for {

View File

@ -22,7 +22,6 @@ import (
func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
wg.Add(1)
ticker := time.NewTicker(c.opConfig.ResyncPeriod) ticker := time.NewTicker(c.opConfig.ResyncPeriod)
for { for {
@ -229,9 +228,8 @@ func (c *Controller) processEvent(obj interface{}) error {
return nil return nil
} }
func (c *Controller) processClusterEventsQueue(stopCh <-chan struct{}, idx int, wg *sync.WaitGroup) { func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
wg.Add(1)
go func() { go func() {
if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil { if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil {