From e104a67260579ef672041af8731906200e799b18 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Thu, 8 Jun 2017 11:51:48 +0200 Subject: [PATCH 1/4] Fix resync of the clusters --- manifests/configmap.yaml | 1 - pkg/controller/controller.go | 9 +++++++-- pkg/controller/postgresql.go | 22 ++++++++++++++++++++++ pkg/util/config/config.go | 1 - pkg/util/constants/kubernetes.go | 3 +++ 5 files changed, 32 insertions(+), 4 deletions(-) diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index 89c352b5e..f5f1920f7 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -25,7 +25,6 @@ data: resource_check_interval: 3s resource_check_timeout: 10m resync_period: 5m - resync_period_pod: 5m super_username: postgres teams_api_url: http://fake-teams-api.default.svc.cluster.local workers: "4" diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8a3c3f110..151e9e7f7 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -3,6 +3,7 @@ package controller import ( "fmt" "sync" + "time" "github.com/Sirupsen/logrus" "k8s.io/client-go/kubernetes" @@ -13,6 +14,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/config" + "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/teams" ) @@ -38,6 +40,8 @@ type Controller struct { podCh chan spec.PodEvent clusterEventQueues []*cache.FIFO + + lastClusterSyncTime int64 } func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { @@ -93,7 +97,7 @@ func (c *Controller) initController() { c.postgresqlInformer = cache.NewSharedIndexInformer( clusterLw, &spec.Postgresql{}, - c.opConfig.ResyncPeriod, + constants.QueueResyncPeriodTPR, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) if err := c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -113,7 +117,7 @@ func (c *Controller) initController() { c.podInformer = cache.NewSharedIndexInformer( podLw, &v1.Pod{}, - c.opConfig.ResyncPeriodPod, + constants.QueueResyncPeriodPod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) if err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -141,6 +145,7 @@ func (c *Controller) runInformers(stopCh <-chan struct{}) { go c.postgresqlInformer.Run(stopCh) go c.podInformer.Run(stopCh) go c.podEventsDispatcher(stopCh) + go c.clusterResync(stopCh) <-stopCh } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index c202c9590..14997c37f 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -3,6 +3,8 @@ package controller import ( "fmt" "reflect" + "sync/atomic" + "time" "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/meta" @@ -18,6 +20,19 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/constants" ) +func (c *Controller) clusterResync(stopCh <-chan struct{}) { + ticker := time.NewTicker(c.opConfig.ResyncPeriod) + + for { + select { + case <-ticker.C: + c.clusterListFunc(api.ListOptions{ResourceVersion: "0"}) + case <-stopCh: + return + } + } +} + func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) { c.logger.Info("Getting list of currently running clusters") @@ -37,6 +52,11 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) } + if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { + c.logger.Debugln("skipping resync of clusters") + return object, err + } + var activeClustersCnt, failedClustersCnt int for _, obj := range objList { pg, ok := obj.(*spec.Postgresql) @@ -63,6 +83,8 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e c.logger.Infof("No clusters running") } + atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) + return object, err } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 6c5178ab8..7676e3e6a 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -15,7 +15,6 @@ type TPR struct { } type Resources struct { - ResyncPeriodPod time.Duration `name:"resync_period_pod" default:"5m"` ResourceCheckInterval time.Duration `name:"resource_check_interval" default:"3s"` ResourceCheckTimeout time.Duration `name:"resource_check_timeout" default:"10m"` PodLabelWaitTimeout time.Duration `name:"pod_label_wait_timeout" default:"10m"` diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go index 048a83034..3a56aa35a 100644 --- a/pkg/util/constants/kubernetes.go +++ b/pkg/util/constants/kubernetes.go @@ -10,4 +10,7 @@ const ( K8sAPIPath = "/api" StatefulsetDeletionInterval = 1 * time.Second StatefulsetDeletionTimeout = 30 * time.Second + + QueueResyncPeriodPod = 5 * time.Minute + QueueResyncPeriodTPR = 5 * time.Minute ) From 1540a2ba65dcf1c34c56e8ee66449ea50851a461 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Thu, 8 Jun 2017 11:53:57 +0200 Subject: [PATCH 2/4] fix typos; remove unnecessary tests; go fmt -s --- pkg/cluster/cluster.go | 2 +- pkg/cluster/sync.go | 2 +- pkg/cluster/volumes.go | 6 +++--- pkg/spec/postgresql_test.go | 24 ++++++++++++------------ pkg/util/teams/teams.go | 2 +- pkg/util/util_test.go | 16 ---------------- 6 files changed, 18 insertions(+), 34 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 83f068ecd..eaa56ec46 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -430,7 +430,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) } } - // only proceeed further if both old and new load balancer were present + // only proceed further if both old and new load balancer were present if !(newSpec.Spec.ReplicaLoadBalancer && c.Spec.ReplicaLoadBalancer) { continue } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index c7c071333..7dd2b9ee4 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -39,7 +39,7 @@ func (c *Cluster) Sync() error { if c.Service[role] != nil { // delete the left over replica service if err := c.deleteService(role); err != nil { - return fmt.Errorf("could not delete obsolete %s service: %v", role) + return fmt.Errorf("could not delete obsolete %s service: %v", role, err) } } continue diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 47b3effaa..920004472 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -109,8 +109,8 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume return fmt.Errorf("could not connect to the volume provider: %v", err) } defer func() { - err2 := resizer.DisconnectFromProvider(); if err2 != nil { - c.logger.Errorf("%v", err2) + if err := resizer.DisconnectFromProvider(); err != nil { + c.logger.Errorf("%v", err) } }() } @@ -127,7 +127,7 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) } - c.logger.Debugf("filesystem resize successfull on volume %s", pv.Name) + c.logger.Debugf("filesystem resize successful on volume %s", pv.Name) pv.Spec.Capacity[v1.ResourceStorage] = newQuantity c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { diff --git a/pkg/spec/postgresql_test.go b/pkg/spec/postgresql_test.go index 48314d376..d067b8d55 100644 --- a/pkg/spec/postgresql_test.go +++ b/pkg/spec/postgresql_test.go @@ -223,7 +223,7 @@ var unmarshalCluster = []struct { TeamID: "ACID", AllowedSourceRanges: []string{"127.0.0.1/32"}, NumberOfInstances: 2, - Users: map[string]UserFlags{"zalando": {"superuser", "createdb"}}, + Users: map[string]userFlags{"zalando": {"superuser", "createdb"}}, MaintenanceWindows: []MaintenanceWindow{{ Everyday: false, Weekday: time.Monday, @@ -237,7 +237,7 @@ var unmarshalCluster = []struct { }, { Everyday: true, - Weekday: time.Sunday, + Weekday: time.Sunday, StartTime: mustParseTime("05:00"), EndTime: mustParseTime("05:15"), }, @@ -263,13 +263,13 @@ var unmarshalCluster = []struct { }, []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"teapot-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null},"status":"Invalid"}`), nil}, {[]byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1"`), - Postgresql{}, - []byte{}, - errors.New("unexpected end of JSON input")}, + Postgresql{}, + []byte{}, + errors.New("unexpected end of JSON input")}, {[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster","creationTimestamp":qaz},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null},"status":"Invalid"}`), - Postgresql{}, - []byte{}, - errors.New("invalid character 'q' looking for beginning of value")}} + Postgresql{}, + []byte{}, + errors.New("invalid character 'q' looking for beginning of value")}} var postgresqlList = []struct { in []byte @@ -282,7 +282,7 @@ var postgresqlList = []struct { Kind: "List", APIVersion: "v1", }, - Items: []Postgresql{Postgresql{ + Items: []Postgresql{{ TypeMeta: unversioned.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", @@ -309,8 +309,8 @@ var postgresqlList = []struct { }, nil}, {[]byte(`{"apiVersion":"v1","items":[{"apiVersion":"acid.zalan.do/v1","kind":"Postgresql","metadata":{"labels":{"team":"acid"},"name":"acid-testcluster42","namespace"`), - PostgresqlList{}, - errors.New("unexpected end of JSON input")}} + PostgresqlList{}, + errors.New("unexpected end of JSON input")}} func mustParseTime(s string) time.Time { v, err := time.Parse("15:04", s) @@ -382,7 +382,7 @@ func TestUnmarshalMaintenanceWindow(t *testing.T) { } if !reflect.DeepEqual(m, tt.out) { - t.Errorf("Expected maintenace window: %#v, got: %#v", tt.out, m) + t.Errorf("Expected maintenance window: %#v, got: %#v", tt.out, m) } } } diff --git a/pkg/util/teams/teams.go b/pkg/util/teams/teams.go index b00e46271..566dbb517 100644 --- a/pkg/util/teams/teams.go +++ b/pkg/util/teams/teams.go @@ -72,7 +72,7 @@ func (t *API) TeamInfo(teamID, token string) (tm *team, er error) { return nil, err } defer func() { - if err:= resp.Body.Close(); err != nil { + if err := resp.Body.Close(); err != nil { er = fmt.Errorf("error when closing response; %v", err) tm = nil } diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index ddf42760f..067f64927 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -1,7 +1,6 @@ package util import ( - "fmt" "reflect" "testing" @@ -26,13 +25,6 @@ var pgUsers = []struct { MemberOf: []string{}}, "md592f413f3974bdf3799bb6fecb5f9f2c6"}} -var prettyTest = []struct { - in interface{} - out string -}{ - {pgUsers, `[{{test password [] []} md587f77988ccb5aa917c93201ba314fcd4} {{test md592f413f3974bdf3799bb6fecb5f9f2c6 [] []} md592f413f3974bdf3799bb6fecb5f9f2c6}]`}, -} - var prettyDiffTest = []struct { inA interface{} inB interface{} @@ -86,14 +78,6 @@ func TestPGUserPassword(t *testing.T) { } } -func TestPretty(t *testing.T) { - for _, tt := range prettyTest { - if actual := Pretty(tt.in); fmt.Sprintf("%v", actual) != tt.out { - t.Errorf("Pretty expected: %s, got: %s", tt.out, actual) - } - } -} - func TestPrettyDiff(t *testing.T) { for _, tt := range prettyDiffTest { if actual := PrettyDiff(tt.inA, tt.inB); actual != tt.out { From a9d746878b6832f75582e26ff5ae805c8126c2f5 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Thu, 8 Jun 2017 11:57:39 +0200 Subject: [PATCH 3/4] fix golint warn --- pkg/spec/postgresql.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index c73e104f9..172fe06f8 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -135,12 +135,12 @@ func (m *MaintenanceWindow) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("\"%s-%s\"", m.StartTime.Format("15:04"), m.EndTime.Format("15:04"))), nil - } else { - return []byte(fmt.Sprintf("\"%s:%s-%s\"", - m.Weekday.String()[:3], - m.StartTime.Format("15:04"), - m.EndTime.Format("15:04"))), nil } + + return []byte(fmt.Sprintf("\"%s:%s-%s\"", + m.Weekday.String()[:3], + m.StartTime.Format("15:04"), + m.EndTime.Format("15:04"))), nil } // UnmarshalJSON convets a JSON to the maintenance window definition. From e0dacd0ca9bffb8bf1d3b55245ed823ae41a4ab0 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Thu, 8 Jun 2017 16:05:54 +0200 Subject: [PATCH 4/4] Remove an unused export. --- pkg/controller/controller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 151e9e7f7..b0e0962bd 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -3,7 +3,6 @@ package controller import ( "fmt" "sync" - "time" "github.com/Sirupsen/logrus" "k8s.io/client-go/kubernetes"