Fix resync of the clusters

This commit is contained in:
Murat Kabilov 2017-06-08 11:51:48 +02:00 committed by GitHub
parent bdc2db97ac
commit e104a67260
5 changed files with 32 additions and 4 deletions

View File

@ -25,7 +25,6 @@ data:
resource_check_interval: 3s resource_check_interval: 3s
resource_check_timeout: 10m resource_check_timeout: 10m
resync_period: 5m resync_period: 5m
resync_period_pod: 5m
super_username: postgres super_username: postgres
teams_api_url: http://fake-teams-api.default.svc.cluster.local teams_api_url: http://fake-teams-api.default.svc.cluster.local
workers: "4" workers: "4"

View File

@ -3,6 +3,7 @@ package controller
import ( import (
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -13,6 +14,7 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/teams" "github.com/zalando-incubator/postgres-operator/pkg/util/teams"
) )
@ -38,6 +40,8 @@ type Controller struct {
podCh chan spec.PodEvent podCh chan spec.PodEvent
clusterEventQueues []*cache.FIFO clusterEventQueues []*cache.FIFO
lastClusterSyncTime int64
} }
func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
@ -93,7 +97,7 @@ func (c *Controller) initController() {
c.postgresqlInformer = cache.NewSharedIndexInformer( c.postgresqlInformer = cache.NewSharedIndexInformer(
clusterLw, clusterLw,
&spec.Postgresql{}, &spec.Postgresql{},
c.opConfig.ResyncPeriod, constants.QueueResyncPeriodTPR,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if err := c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ if err := c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -113,7 +117,7 @@ func (c *Controller) initController() {
c.podInformer = cache.NewSharedIndexInformer( c.podInformer = cache.NewSharedIndexInformer(
podLw, podLw,
&v1.Pod{}, &v1.Pod{},
c.opConfig.ResyncPeriodPod, constants.QueueResyncPeriodPod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ if err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -141,6 +145,7 @@ func (c *Controller) runInformers(stopCh <-chan struct{}) {
go c.postgresqlInformer.Run(stopCh) go c.postgresqlInformer.Run(stopCh)
go c.podInformer.Run(stopCh) go c.podInformer.Run(stopCh)
go c.podEventsDispatcher(stopCh) go c.podEventsDispatcher(stopCh)
go c.clusterResync(stopCh)
<-stopCh <-stopCh
} }

View File

@ -3,6 +3,8 @@ package controller
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"sync/atomic"
"time"
"k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/meta" "k8s.io/client-go/pkg/api/meta"
@ -18,6 +20,19 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
) )
func (c *Controller) clusterResync(stopCh <-chan struct{}) {
ticker := time.NewTicker(c.opConfig.ResyncPeriod)
for {
select {
case <-ticker.C:
c.clusterListFunc(api.ListOptions{ResourceVersion: "0"})
case <-stopCh:
return
}
}
}
func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) { func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) {
c.logger.Info("Getting list of currently running clusters") c.logger.Info("Getting list of currently running clusters")
@ -37,6 +52,11 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e
return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err)
} }
if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) {
c.logger.Debugln("skipping resync of clusters")
return object, err
}
var activeClustersCnt, failedClustersCnt int var activeClustersCnt, failedClustersCnt int
for _, obj := range objList { for _, obj := range objList {
pg, ok := obj.(*spec.Postgresql) pg, ok := obj.(*spec.Postgresql)
@ -63,6 +83,8 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e
c.logger.Infof("No clusters running") c.logger.Infof("No clusters running")
} }
atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix())
return object, err return object, err
} }

View File

@ -15,7 +15,6 @@ type TPR struct {
} }
type Resources struct { type Resources struct {
ResyncPeriodPod time.Duration `name:"resync_period_pod" default:"5m"`
ResourceCheckInterval time.Duration `name:"resource_check_interval" default:"3s"` ResourceCheckInterval time.Duration `name:"resource_check_interval" default:"3s"`
ResourceCheckTimeout time.Duration `name:"resource_check_timeout" default:"10m"` ResourceCheckTimeout time.Duration `name:"resource_check_timeout" default:"10m"`
PodLabelWaitTimeout time.Duration `name:"pod_label_wait_timeout" default:"10m"` PodLabelWaitTimeout time.Duration `name:"pod_label_wait_timeout" default:"10m"`

View File

@ -10,4 +10,7 @@ const (
K8sAPIPath = "/api" K8sAPIPath = "/api"
StatefulsetDeletionInterval = 1 * time.Second StatefulsetDeletionInterval = 1 * time.Second
StatefulsetDeletionTimeout = 30 * time.Second StatefulsetDeletionTimeout = 30 * time.Second
QueueResyncPeriodPod = 5 * time.Minute
QueueResyncPeriodTPR = 5 * time.Minute
) )