Make webhook-based scale race-free (#1477)

* Make webhook-based scale operation asynchronous

This prevents race condition in the webhook-based autoscaler when it received another webhook event while processing another webhook event and both ended up
scaling up the same horizontal runner autoscaler.

Ref #1321

* Fix typos

* Update rather than Patch HRA to avoid race among webhook-based autoscaler servers

* Batch capacity reservation updates for efficient use of apiserver

* Fix potential never-ending HRA update conflicts in batch update

* Extract batchScaler out of webhook-based autoscaler for testability

* Fix log levels and batch scaler hang on start

* Correlate webhook event with scale trigger amount in logs

* Fix log message
This commit is contained in:
Yusuke Kuoka 2022-06-27 18:31:48 +09:00 committed by GitHub
parent 84d16c1c12
commit e2c8163b8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 328 additions and 61 deletions

View File

@ -72,6 +72,7 @@ func main() {
enableLeaderElection bool enableLeaderElection bool
syncPeriod time.Duration syncPeriod time.Duration
logLevel string logLevel string
queueLimit int
ghClient *github.Client ghClient *github.Client
) )
@ -92,6 +93,7 @@ func main() {
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "Determines the minimum frequency at which K8s resources managed by this controller are reconciled. When you use autoscaling, set to a lower value like 10 minute, because this corresponds to the minimum time to react on demand change") flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "Determines the minimum frequency at which K8s resources managed by this controller are reconciled. When you use autoscaling, set to a lower value like 10 minute, because this corresponds to the minimum time to react on demand change")
flag.StringVar(&logLevel, "log-level", logging.LogLevelDebug, `The verbosity of the logging. Valid values are "debug", "info", "warn", "error". Defaults to "debug".`) flag.StringVar(&logLevel, "log-level", logging.LogLevelDebug, `The verbosity of the logging. Valid values are "debug", "info", "warn", "error". Defaults to "debug".`)
flag.IntVar(&queueLimit, "queue-limit", controllers.DefaultQueueLimit, `The maximum length of the scale operation queue. The scale opration is enqueued per every matching webhook event, and the server returns a 500 HTTP status when the queue was already full on enqueue attempt.`)
flag.StringVar(&webhookSecretToken, "github-webhook-secret-token", "", "The personal access token of GitHub.") flag.StringVar(&webhookSecretToken, "github-webhook-secret-token", "", "The personal access token of GitHub.")
flag.StringVar(&c.Token, "github-token", c.Token, "The personal access token of GitHub.") flag.StringVar(&c.Token, "github-token", c.Token, "The personal access token of GitHub.")
flag.Int64Var(&c.AppID, "github-app-id", c.AppID, "The application ID of GitHub App.") flag.Int64Var(&c.AppID, "github-app-id", c.AppID, "The application ID of GitHub App.")
@ -164,6 +166,7 @@ func main() {
SecretKeyBytes: []byte(webhookSecretToken), SecretKeyBytes: []byte(webhookSecretToken),
Namespace: watchNamespace, Namespace: watchNamespace,
GitHubClient: ghClient, GitHubClient: ghClient,
QueueLimit: queueLimit,
} }
if err = hraGitHubWebhook.SetupWithManager(mgr); err != nil { if err = hraGitHubWebhook.SetupWithManager(mgr); err != nil {

View File

@ -0,0 +1,207 @@
package controllers
import (
"context"
"fmt"
"sync"
"time"
"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type batchScaler struct {
Ctx context.Context
Client client.Client
Log logr.Logger
interval time.Duration
queue chan *ScaleTarget
workerStart sync.Once
}
func newBatchScaler(ctx context.Context, client client.Client, log logr.Logger) *batchScaler {
return &batchScaler{
Ctx: ctx,
Client: client,
Log: log,
interval: 3 * time.Second,
}
}
type batchScaleOperation struct {
namespacedName types.NamespacedName
scaleOps []scaleOperation
}
type scaleOperation struct {
trigger v1alpha1.ScaleUpTrigger
log logr.Logger
}
// Add the scale target to the unbounded queue, blocking until the target is successfully added to the queue.
// All the targets in the queue are dequeued every 3 seconds, grouped by the HRA, and applied.
// In a happy path, batchScaler update each HRA only once, even though the HRA had two or more associated webhook events in the 3 seconds interval,
// which results in less K8s API calls and less HRA update conflicts in case your ARC installation receives a lot of webhook events
func (s *batchScaler) Add(st *ScaleTarget) {
if st == nil {
return
}
s.workerStart.Do(func() {
var expBackoff = []time.Duration{time.Second, 2 * time.Second, 4 * time.Second, 8 * time.Second, 16 * time.Second}
s.queue = make(chan *ScaleTarget)
log := s.Log
go func() {
log.Info("Starting batch worker")
defer log.Info("Stopped batch worker")
for {
select {
case <-s.Ctx.Done():
return
default:
}
log.V(2).Info("Batch worker is dequeueing operations")
batches := map[types.NamespacedName]batchScaleOperation{}
after := time.After(s.interval)
var ops uint
batch:
for {
select {
case <-after:
after = nil
break batch
case st := <-s.queue:
nsName := types.NamespacedName{
Namespace: st.HorizontalRunnerAutoscaler.Namespace,
Name: st.HorizontalRunnerAutoscaler.Name,
}
b, ok := batches[nsName]
if !ok {
b = batchScaleOperation{
namespacedName: nsName,
}
}
b.scaleOps = append(b.scaleOps, scaleOperation{
log: *st.log,
trigger: st.ScaleUpTrigger,
})
batches[nsName] = b
ops++
}
}
log.V(2).Info("Batch worker dequeued operations", "ops", ops, "batches", len(batches))
retry:
for i := 0; ; i++ {
failed := map[types.NamespacedName]batchScaleOperation{}
for nsName, b := range batches {
b := b
if err := s.batchScale(context.Background(), b); err != nil {
log.V(2).Info("Failed to scale due to error", "error", err)
failed[nsName] = b
} else {
log.V(2).Info("Successfully ran batch scale", "hra", b.namespacedName)
}
}
if len(failed) == 0 {
break retry
}
batches = failed
delay := 16 * time.Second
if i < len(expBackoff) {
delay = expBackoff[i]
}
time.Sleep(delay)
}
}
}()
})
s.queue <- st
}
func (s *batchScaler) batchScale(ctx context.Context, batch batchScaleOperation) error {
var hra v1alpha1.HorizontalRunnerAutoscaler
if err := s.Client.Get(ctx, batch.namespacedName, &hra); err != nil {
return err
}
copy := hra.DeepCopy()
copy.Spec.CapacityReservations = getValidCapacityReservations(copy)
var added, completed int
for _, scale := range batch.scaleOps {
amount := 1
if scale.trigger.Amount != 0 {
amount = scale.trigger.Amount
}
scale.log.V(2).Info("Adding capacity reservation", "amount", amount)
if amount > 0 {
now := time.Now()
copy.Spec.CapacityReservations = append(copy.Spec.CapacityReservations, v1alpha1.CapacityReservation{
EffectiveTime: metav1.Time{Time: now},
ExpirationTime: metav1.Time{Time: now.Add(scale.trigger.Duration.Duration)},
Replicas: amount,
})
added += amount
} else if amount < 0 {
var reservations []v1alpha1.CapacityReservation
var found bool
for _, r := range copy.Spec.CapacityReservations {
if !found && r.Replicas+amount == 0 {
found = true
} else {
reservations = append(reservations, r)
}
}
copy.Spec.CapacityReservations = reservations
completed += amount
}
}
before := len(hra.Spec.CapacityReservations)
expired := before - len(copy.Spec.CapacityReservations)
after := len(copy.Spec.CapacityReservations)
s.Log.V(1).Info(
fmt.Sprintf("Updating hra %s for capacityReservations update", hra.Name),
"before", before,
"expired", expired,
"added", added,
"completed", completed,
"after", after,
)
if err := s.Client.Update(ctx, copy); err != nil {
return fmt.Errorf("updating horizontalrunnerautoscaler to add capacity reservation: %w", err)
}
return nil
}

View File

@ -23,9 +23,9 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings" "strings"
"sync"
"time" "time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
@ -46,6 +46,8 @@ const (
keyPrefixEnterprise = "enterprises/" keyPrefixEnterprise = "enterprises/"
keyRunnerGroup = "/group/" keyRunnerGroup = "/group/"
DefaultQueueLimit = 100
) )
// HorizontalRunnerAutoscalerGitHubWebhook autoscales a HorizontalRunnerAutoscaler and the RunnerDeployment on each // HorizontalRunnerAutoscalerGitHubWebhook autoscales a HorizontalRunnerAutoscaler and the RunnerDeployment on each
@ -68,6 +70,15 @@ type HorizontalRunnerAutoscalerGitHubWebhook struct {
// Set to empty for letting it watch for all namespaces. // Set to empty for letting it watch for all namespaces.
Namespace string Namespace string
Name string Name string
// QueueLimit is the maximum length of the bounded queue of scale targets and their associated operations
// A scale target is enqueued on each retrieval of each eligible webhook event, so that it is processed asynchronously.
QueueLimit int
worker *worker
workerInit sync.Once
workerStart sync.Once
batchCh chan *ScaleTarget
} }
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) { func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
@ -312,9 +323,19 @@ func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) Handle(w http.Respons
return return
} }
if err := autoscaler.tryScale(context.TODO(), target); err != nil { autoscaler.workerInit.Do(func() {
log.Error(err, "could not scale up") batchScaler := newBatchScaler(context.Background(), autoscaler.Client, autoscaler.Log)
queueLimit := autoscaler.QueueLimit
if queueLimit == 0 {
queueLimit = DefaultQueueLimit
}
autoscaler.worker = newWorker(context.Background(), queueLimit, batchScaler.Add)
})
target.log = &log
if ok := autoscaler.worker.Add(target); !ok {
log.Error(err, "Could not scale up due to queue full")
return return
} }
@ -383,6 +404,8 @@ func matchTriggerConditionAgainstEvent(types []string, eventAction *string) bool
type ScaleTarget struct { type ScaleTarget struct {
v1alpha1.HorizontalRunnerAutoscaler v1alpha1.HorizontalRunnerAutoscaler
v1alpha1.ScaleUpTrigger v1alpha1.ScaleUpTrigger
log *logr.Logger
} }
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) searchScaleTargets(hras []v1alpha1.HorizontalRunnerAutoscaler, f func(v1alpha1.ScaleUpTrigger) bool) []ScaleTarget { func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) searchScaleTargets(hras []v1alpha1.HorizontalRunnerAutoscaler, f func(v1alpha1.ScaleUpTrigger) bool) []ScaleTarget {
@ -770,63 +793,6 @@ HRA:
return nil, nil return nil, nil
} }
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) tryScale(ctx context.Context, target *ScaleTarget) error {
if target == nil {
return nil
}
copy := target.HorizontalRunnerAutoscaler.DeepCopy()
amount := 1
if target.ScaleUpTrigger.Amount != 0 {
amount = target.ScaleUpTrigger.Amount
}
capacityReservations := getValidCapacityReservations(copy)
if amount > 0 {
now := time.Now()
copy.Spec.CapacityReservations = append(capacityReservations, v1alpha1.CapacityReservation{
EffectiveTime: metav1.Time{Time: now},
ExpirationTime: metav1.Time{Time: now.Add(target.ScaleUpTrigger.Duration.Duration)},
Replicas: amount,
})
} else if amount < 0 {
var reservations []v1alpha1.CapacityReservation
var found bool
for _, r := range capacityReservations {
if !found && r.Replicas+amount == 0 {
found = true
} else {
reservations = append(reservations, r)
}
}
copy.Spec.CapacityReservations = reservations
}
before := len(target.HorizontalRunnerAutoscaler.Spec.CapacityReservations)
expired := before - len(capacityReservations)
after := len(copy.Spec.CapacityReservations)
autoscaler.Log.V(1).Info(
fmt.Sprintf("Patching hra %s for capacityReservations update", target.HorizontalRunnerAutoscaler.Name),
"before", before,
"expired", expired,
"amount", amount,
"after", after,
)
if err := autoscaler.Client.Patch(ctx, copy, client.MergeFrom(&target.HorizontalRunnerAutoscaler)); err != nil {
return fmt.Errorf("patching horizontalrunnerautoscaler to add capacity reservation: %w", err)
}
return nil
}
func getValidCapacityReservations(autoscaler *v1alpha1.HorizontalRunnerAutoscaler) []v1alpha1.CapacityReservation { func getValidCapacityReservations(autoscaler *v1alpha1.HorizontalRunnerAutoscaler) []v1alpha1.CapacityReservation {
var capacityReservations []v1alpha1.CapacityReservation var capacityReservations []v1alpha1.CapacityReservation

View File

@ -0,0 +1,55 @@
package controllers
import (
"context"
)
// worker is a worker that has a non-blocking bounded queue of scale targets, dequeues scale target and executes the scale operation one by one.
type worker struct {
scaleTargetQueue chan *ScaleTarget
work func(*ScaleTarget)
done chan struct{}
}
func newWorker(ctx context.Context, queueLimit int, work func(*ScaleTarget)) *worker {
w := &worker{
scaleTargetQueue: make(chan *ScaleTarget, queueLimit),
work: work,
done: make(chan struct{}),
}
go func() {
defer close(w.done)
for {
select {
case <-ctx.Done():
return
case t := <-w.scaleTargetQueue:
work(t)
}
}
}()
return w
}
// Add the scale target to the bounded queue, returning the result as a bool value. It returns true on successful enqueue, and returns false otherwise.
// When returned false, the queue is already full so the enqueue operation must be retried later.
// If the enqueue was triggered by an external source and there's no intermediate queue that we can use,
// you must instruct the source to resend the original request later.
// In case you're building a webhook server around this worker, this means that you must return a http error to the webhook server,
// so that (hopefully) the sender can resend the webhook event later, or at least the human operator can notice or be notified about the
// webhook develiery failure so that a manual retry can be done later.
func (w *worker) Add(st *ScaleTarget) bool {
select {
case w.scaleTargetQueue <- st:
return true
default:
return false
}
}
func (w *worker) Done() chan struct{} {
return w.done
}

View File

@ -0,0 +1,36 @@
package controllers
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestWorker_Add(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w := newWorker(ctx, 2, func(st *ScaleTarget) {})
require.True(t, w.Add(&ScaleTarget{}))
require.True(t, w.Add(&ScaleTarget{}))
require.False(t, w.Add(&ScaleTarget{}))
}
func TestWorker_Work(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
w := newWorker(ctx, 1, func(st *ScaleTarget) {
count++
cancel()
})
require.True(t, w.Add(&ScaleTarget{}))
require.False(t, w.Add(&ScaleTarget{}))
<-w.Done()
require.Equal(t, count, 1)
}

View File

@ -1367,7 +1367,7 @@ func (env *testEnvironment) ExpectRegisteredNumberCountEventuallyEquals(want int
return len(rs) return len(rs)
}, },
time.Second*5, time.Millisecond*500).Should(Equal(want), optionalDescriptions...) time.Second*10, time.Millisecond*500).Should(Equal(want), optionalDescriptions...)
} }
func (env *testEnvironment) SendOrgPullRequestEvent(org, repo, branch, action string) { func (env *testEnvironment) SendOrgPullRequestEvent(org, repo, branch, action string) {