From 32aa7270e6f93e56105d992419d0b9d066f4dd4e Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 9 Oct 2017 16:56:27 +0200 Subject: [PATCH] Use round-robin strategy while assigning workers --- pkg/controller/controller.go | 3 +++ pkg/controller/util.go | 16 ++++++++++++++-- pkg/controller/util_test.go | 4 ++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 49fa15ccc..7f5315e27 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -30,6 +30,8 @@ type Controller struct { stopCh chan struct{} + curWorkerID uint32 //initialized with 0 + clusterWorkers map[spec.NamespacedName]uint32 clustersMu sync.RWMutex clusters map[spec.NamespacedName]*cluster.Cluster clusterLogs map[spec.NamespacedName]ringlog.RingLogger @@ -54,6 +56,7 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller { config: *controllerConfig, opConfig: &config.Config{}, logger: logger.WithField("pkg", "controller"), + clusterWorkers: make(map[spec.NamespacedName]uint32), clusters: make(map[spec.NamespacedName]*cluster.Cluster), clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger), clusterHistory: make(map[spec.NamespacedName]ringlog.RingLogger), diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 3f847d224..5f91a8995 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -2,7 +2,6 @@ package controller import ( "fmt" - "hash/crc32" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,7 +29,20 @@ func (c *Controller) makeClusterConfig() cluster.Config { } func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { - return crc32.ChecksumIEEE([]byte(clusterName.String())) % c.opConfig.Workers + workerId, ok := c.clusterWorkers[clusterName] + if ok { + return workerId + } + + c.clusterWorkers[clusterName] = c.curWorkerID + + if c.curWorkerID == c.opConfig.Workers-1 { + c.curWorkerID = 0 + } else { + c.curWorkerID++ + } + + return c.clusterWorkers[clusterName] } func (c *Controller) createCRD() error { diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index a15ef4744..41b8e2fe1 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -95,11 +95,11 @@ func TestClusterWorkerID(t *testing.T) { }{ { in: spec.NamespacedName{Namespace: "foo", Name: "bar"}, - expected: 2, + expected: 0, }, { in: spec.NamespacedName{Namespace: "default", Name: "testcluster"}, - expected: 3, + expected: 1, }, } for _, test := range testTable {