From e2c8163b8c25e44230353afe35069ef62d66ad80 Mon Sep 17 00:00:00 2001 From: Yusuke Kuoka Date: Mon, 27 Jun 2022 18:31:48 +0900 Subject: [PATCH] Make webhook-based scale race-free (#1477) * Make webhook-based scale operation asynchronous This prevents race condition in the webhook-based autoscaler when it received another webhook event while processing another webhook event and both ended up scaling up the same horizontal runner autoscaler. Ref #1321 * Fix typos * Update rather than Patch HRA to avoid race among webhook-based autoscaler servers * Batch capacity reservation updates for efficient use of apiserver * Fix potential never-ending HRA update conflicts in batch update * Extract batchScaler out of webhook-based autoscaler for testability * Fix log levels and batch scaler hang on start * Correlate webhook event with scale trigger amount in logs * Fix log message --- cmd/githubwebhookserver/main.go | 3 + ...orizontal_runner_autoscaler_batch_scale.go | 207 ++++++++++++++++++ .../horizontal_runner_autoscaler_webhook.go | 86 +++----- ...zontal_runner_autoscaler_webhook_worker.go | 55 +++++ ...l_runner_autoscaler_webhook_worker_test.go | 36 +++ controllers/integration_test.go | 2 +- 6 files changed, 328 insertions(+), 61 deletions(-) create mode 100644 controllers/horizontal_runner_autoscaler_batch_scale.go create mode 100644 controllers/horizontal_runner_autoscaler_webhook_worker.go create mode 100644 controllers/horizontal_runner_autoscaler_webhook_worker_test.go diff --git a/cmd/githubwebhookserver/main.go b/cmd/githubwebhookserver/main.go index 4e3e604b..05c90ac3 100644 --- a/cmd/githubwebhookserver/main.go +++ b/cmd/githubwebhookserver/main.go @@ -72,6 +72,7 @@ func main() { enableLeaderElection bool syncPeriod time.Duration logLevel string + queueLimit int ghClient *github.Client ) @@ -92,6 +93,7 @@ func main() { "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "Determines the minimum frequency at which K8s resources managed by this controller are reconciled. When you use autoscaling, set to a lower value like 10 minute, because this corresponds to the minimum time to react on demand change") flag.StringVar(&logLevel, "log-level", logging.LogLevelDebug, `The verbosity of the logging. Valid values are "debug", "info", "warn", "error". Defaults to "debug".`) + flag.IntVar(&queueLimit, "queue-limit", controllers.DefaultQueueLimit, `The maximum length of the scale operation queue. The scale opration is enqueued per every matching webhook event, and the server returns a 500 HTTP status when the queue was already full on enqueue attempt.`) flag.StringVar(&webhookSecretToken, "github-webhook-secret-token", "", "The personal access token of GitHub.") flag.StringVar(&c.Token, "github-token", c.Token, "The personal access token of GitHub.") flag.Int64Var(&c.AppID, "github-app-id", c.AppID, "The application ID of GitHub App.") @@ -164,6 +166,7 @@ func main() { SecretKeyBytes: []byte(webhookSecretToken), Namespace: watchNamespace, GitHubClient: ghClient, + QueueLimit: queueLimit, } if err = hraGitHubWebhook.SetupWithManager(mgr); err != nil { diff --git a/controllers/horizontal_runner_autoscaler_batch_scale.go b/controllers/horizontal_runner_autoscaler_batch_scale.go new file mode 100644 index 00000000..d89d85cd --- /dev/null +++ b/controllers/horizontal_runner_autoscaler_batch_scale.go @@ -0,0 +1,207 @@ +package controllers + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1" + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type batchScaler struct { + Ctx context.Context + Client client.Client + Log logr.Logger + interval time.Duration + + queue chan *ScaleTarget + workerStart sync.Once +} + +func newBatchScaler(ctx context.Context, client client.Client, log logr.Logger) *batchScaler { + return &batchScaler{ + Ctx: ctx, + Client: client, + Log: log, + interval: 3 * time.Second, + } +} + +type batchScaleOperation struct { + namespacedName types.NamespacedName + scaleOps []scaleOperation +} + +type scaleOperation struct { + trigger v1alpha1.ScaleUpTrigger + log logr.Logger +} + +// Add the scale target to the unbounded queue, blocking until the target is successfully added to the queue. +// All the targets in the queue are dequeued every 3 seconds, grouped by the HRA, and applied. +// In a happy path, batchScaler update each HRA only once, even though the HRA had two or more associated webhook events in the 3 seconds interval, +// which results in less K8s API calls and less HRA update conflicts in case your ARC installation receives a lot of webhook events +func (s *batchScaler) Add(st *ScaleTarget) { + if st == nil { + return + } + + s.workerStart.Do(func() { + var expBackoff = []time.Duration{time.Second, 2 * time.Second, 4 * time.Second, 8 * time.Second, 16 * time.Second} + + s.queue = make(chan *ScaleTarget) + + log := s.Log + + go func() { + log.Info("Starting batch worker") + defer log.Info("Stopped batch worker") + + for { + select { + case <-s.Ctx.Done(): + return + default: + } + + log.V(2).Info("Batch worker is dequeueing operations") + + batches := map[types.NamespacedName]batchScaleOperation{} + after := time.After(s.interval) + var ops uint + + batch: + for { + select { + case <-after: + after = nil + break batch + case st := <-s.queue: + nsName := types.NamespacedName{ + Namespace: st.HorizontalRunnerAutoscaler.Namespace, + Name: st.HorizontalRunnerAutoscaler.Name, + } + b, ok := batches[nsName] + if !ok { + b = batchScaleOperation{ + namespacedName: nsName, + } + } + b.scaleOps = append(b.scaleOps, scaleOperation{ + log: *st.log, + trigger: st.ScaleUpTrigger, + }) + batches[nsName] = b + ops++ + } + } + + log.V(2).Info("Batch worker dequeued operations", "ops", ops, "batches", len(batches)) + + retry: + for i := 0; ; i++ { + failed := map[types.NamespacedName]batchScaleOperation{} + + for nsName, b := range batches { + b := b + if err := s.batchScale(context.Background(), b); err != nil { + log.V(2).Info("Failed to scale due to error", "error", err) + failed[nsName] = b + } else { + log.V(2).Info("Successfully ran batch scale", "hra", b.namespacedName) + } + } + + if len(failed) == 0 { + break retry + } + + batches = failed + + delay := 16 * time.Second + if i < len(expBackoff) { + delay = expBackoff[i] + } + time.Sleep(delay) + } + } + }() + }) + + s.queue <- st +} + +func (s *batchScaler) batchScale(ctx context.Context, batch batchScaleOperation) error { + var hra v1alpha1.HorizontalRunnerAutoscaler + + if err := s.Client.Get(ctx, batch.namespacedName, &hra); err != nil { + return err + } + + copy := hra.DeepCopy() + + copy.Spec.CapacityReservations = getValidCapacityReservations(copy) + + var added, completed int + + for _, scale := range batch.scaleOps { + amount := 1 + + if scale.trigger.Amount != 0 { + amount = scale.trigger.Amount + } + + scale.log.V(2).Info("Adding capacity reservation", "amount", amount) + + if amount > 0 { + now := time.Now() + copy.Spec.CapacityReservations = append(copy.Spec.CapacityReservations, v1alpha1.CapacityReservation{ + EffectiveTime: metav1.Time{Time: now}, + ExpirationTime: metav1.Time{Time: now.Add(scale.trigger.Duration.Duration)}, + Replicas: amount, + }) + + added += amount + } else if amount < 0 { + var reservations []v1alpha1.CapacityReservation + + var found bool + + for _, r := range copy.Spec.CapacityReservations { + if !found && r.Replicas+amount == 0 { + found = true + } else { + reservations = append(reservations, r) + } + } + + copy.Spec.CapacityReservations = reservations + + completed += amount + } + } + + before := len(hra.Spec.CapacityReservations) + expired := before - len(copy.Spec.CapacityReservations) + after := len(copy.Spec.CapacityReservations) + + s.Log.V(1).Info( + fmt.Sprintf("Updating hra %s for capacityReservations update", hra.Name), + "before", before, + "expired", expired, + "added", added, + "completed", completed, + "after", after, + ) + + if err := s.Client.Update(ctx, copy); err != nil { + return fmt.Errorf("updating horizontalrunnerautoscaler to add capacity reservation: %w", err) + } + + return nil +} diff --git a/controllers/horizontal_runner_autoscaler_webhook.go b/controllers/horizontal_runner_autoscaler_webhook.go index 31e83320..93182b55 100644 --- a/controllers/horizontal_runner_autoscaler_webhook.go +++ b/controllers/horizontal_runner_autoscaler_webhook.go @@ -23,9 +23,9 @@ import ( "io/ioutil" "net/http" "strings" + "sync" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -46,6 +46,8 @@ const ( keyPrefixEnterprise = "enterprises/" keyRunnerGroup = "/group/" + + DefaultQueueLimit = 100 ) // HorizontalRunnerAutoscalerGitHubWebhook autoscales a HorizontalRunnerAutoscaler and the RunnerDeployment on each @@ -68,6 +70,15 @@ type HorizontalRunnerAutoscalerGitHubWebhook struct { // Set to empty for letting it watch for all namespaces. Namespace string Name string + + // QueueLimit is the maximum length of the bounded queue of scale targets and their associated operations + // A scale target is enqueued on each retrieval of each eligible webhook event, so that it is processed asynchronously. + QueueLimit int + + worker *worker + workerInit sync.Once + workerStart sync.Once + batchCh chan *ScaleTarget } func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) { @@ -312,9 +323,19 @@ func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) Handle(w http.Respons return } - if err := autoscaler.tryScale(context.TODO(), target); err != nil { - log.Error(err, "could not scale up") + autoscaler.workerInit.Do(func() { + batchScaler := newBatchScaler(context.Background(), autoscaler.Client, autoscaler.Log) + queueLimit := autoscaler.QueueLimit + if queueLimit == 0 { + queueLimit = DefaultQueueLimit + } + autoscaler.worker = newWorker(context.Background(), queueLimit, batchScaler.Add) + }) + + target.log = &log + if ok := autoscaler.worker.Add(target); !ok { + log.Error(err, "Could not scale up due to queue full") return } @@ -383,6 +404,8 @@ func matchTriggerConditionAgainstEvent(types []string, eventAction *string) bool type ScaleTarget struct { v1alpha1.HorizontalRunnerAutoscaler v1alpha1.ScaleUpTrigger + + log *logr.Logger } func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) searchScaleTargets(hras []v1alpha1.HorizontalRunnerAutoscaler, f func(v1alpha1.ScaleUpTrigger) bool) []ScaleTarget { @@ -770,63 +793,6 @@ HRA: return nil, nil } -func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) tryScale(ctx context.Context, target *ScaleTarget) error { - if target == nil { - return nil - } - - copy := target.HorizontalRunnerAutoscaler.DeepCopy() - - amount := 1 - - if target.ScaleUpTrigger.Amount != 0 { - amount = target.ScaleUpTrigger.Amount - } - - capacityReservations := getValidCapacityReservations(copy) - - if amount > 0 { - now := time.Now() - copy.Spec.CapacityReservations = append(capacityReservations, v1alpha1.CapacityReservation{ - EffectiveTime: metav1.Time{Time: now}, - ExpirationTime: metav1.Time{Time: now.Add(target.ScaleUpTrigger.Duration.Duration)}, - Replicas: amount, - }) - } else if amount < 0 { - var reservations []v1alpha1.CapacityReservation - - var found bool - - for _, r := range capacityReservations { - if !found && r.Replicas+amount == 0 { - found = true - } else { - reservations = append(reservations, r) - } - } - - copy.Spec.CapacityReservations = reservations - } - - before := len(target.HorizontalRunnerAutoscaler.Spec.CapacityReservations) - expired := before - len(capacityReservations) - after := len(copy.Spec.CapacityReservations) - - autoscaler.Log.V(1).Info( - fmt.Sprintf("Patching hra %s for capacityReservations update", target.HorizontalRunnerAutoscaler.Name), - "before", before, - "expired", expired, - "amount", amount, - "after", after, - ) - - if err := autoscaler.Client.Patch(ctx, copy, client.MergeFrom(&target.HorizontalRunnerAutoscaler)); err != nil { - return fmt.Errorf("patching horizontalrunnerautoscaler to add capacity reservation: %w", err) - } - - return nil -} - func getValidCapacityReservations(autoscaler *v1alpha1.HorizontalRunnerAutoscaler) []v1alpha1.CapacityReservation { var capacityReservations []v1alpha1.CapacityReservation diff --git a/controllers/horizontal_runner_autoscaler_webhook_worker.go b/controllers/horizontal_runner_autoscaler_webhook_worker.go new file mode 100644 index 00000000..f674f458 --- /dev/null +++ b/controllers/horizontal_runner_autoscaler_webhook_worker.go @@ -0,0 +1,55 @@ +package controllers + +import ( + "context" +) + +// worker is a worker that has a non-blocking bounded queue of scale targets, dequeues scale target and executes the scale operation one by one. +type worker struct { + scaleTargetQueue chan *ScaleTarget + work func(*ScaleTarget) + done chan struct{} +} + +func newWorker(ctx context.Context, queueLimit int, work func(*ScaleTarget)) *worker { + w := &worker{ + scaleTargetQueue: make(chan *ScaleTarget, queueLimit), + work: work, + done: make(chan struct{}), + } + + go func() { + defer close(w.done) + + for { + select { + case <-ctx.Done(): + return + case t := <-w.scaleTargetQueue: + work(t) + } + } + }() + + return w +} + +// Add the scale target to the bounded queue, returning the result as a bool value. It returns true on successful enqueue, and returns false otherwise. +// When returned false, the queue is already full so the enqueue operation must be retried later. +// If the enqueue was triggered by an external source and there's no intermediate queue that we can use, +// you must instruct the source to resend the original request later. +// In case you're building a webhook server around this worker, this means that you must return a http error to the webhook server, +// so that (hopefully) the sender can resend the webhook event later, or at least the human operator can notice or be notified about the +// webhook develiery failure so that a manual retry can be done later. +func (w *worker) Add(st *ScaleTarget) bool { + select { + case w.scaleTargetQueue <- st: + return true + default: + return false + } +} + +func (w *worker) Done() chan struct{} { + return w.done +} diff --git a/controllers/horizontal_runner_autoscaler_webhook_worker_test.go b/controllers/horizontal_runner_autoscaler_webhook_worker_test.go new file mode 100644 index 00000000..e2bf0cd6 --- /dev/null +++ b/controllers/horizontal_runner_autoscaler_webhook_worker_test.go @@ -0,0 +1,36 @@ +package controllers + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWorker_Add(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w := newWorker(ctx, 2, func(st *ScaleTarget) {}) + require.True(t, w.Add(&ScaleTarget{})) + require.True(t, w.Add(&ScaleTarget{})) + require.False(t, w.Add(&ScaleTarget{})) +} + +func TestWorker_Work(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + w := newWorker(ctx, 1, func(st *ScaleTarget) { + count++ + cancel() + }) + require.True(t, w.Add(&ScaleTarget{})) + require.False(t, w.Add(&ScaleTarget{})) + + <-w.Done() + + require.Equal(t, count, 1) +} diff --git a/controllers/integration_test.go b/controllers/integration_test.go index 1ff989c4..2f04c9b4 100644 --- a/controllers/integration_test.go +++ b/controllers/integration_test.go @@ -1367,7 +1367,7 @@ func (env *testEnvironment) ExpectRegisteredNumberCountEventuallyEquals(want int return len(rs) }, - time.Second*5, time.Millisecond*500).Should(Equal(want), optionalDescriptions...) + time.Second*10, time.Millisecond*500).Should(Equal(want), optionalDescriptions...) } func (env *testEnvironment) SendOrgPullRequestEvent(org, repo, branch, action string) {