This commit is contained in:
Nikola Jokic 2026-05-25 23:09:51 +02:00
parent 30879de182
commit d1af7d66ea
No known key found for this signature in database
GPG Key ID: 419BB425B0E501B0
13 changed files with 279 additions and 312 deletions

View File

@ -50,9 +50,6 @@ args:
{{- with .Values.controller.manager.config.runnerMaxConcurrentReconciles }}
- "--runner-max-concurrent-reconciles={{ . }}"
{{- end }}
{{- with .Values.controller.manager.config.updateStrategy }}
- "--update-strategy={{ . }}"
{{- end }}
{{- if .Values.controller.metrics }}
{{- with .Values.controller.metrics }}
- "--listener-metrics-addr={{ .listenerAddr }}"

View File

@ -31,9 +31,6 @@ controller:
# The maximum number of concurrent reconciles which can be run by the EphemeralRunner controller.
runnerMaxConcurrentReconciles: 2
# How the controller handles upgrades with running jobs: "immediate" or "eventual".
updateStrategy: "immediate"
# List of label prefixes that should NOT be propagated to internal resources.
excludeLabelPropagationPrefixes: []
# Example:

View File

@ -70,9 +70,6 @@ spec:
{{- with .Values.flags.runnerMaxConcurrentReconciles }}
- "--runner-max-concurrent-reconciles={{ . }}"
{{- end }}
{{- with .Values.flags.updateStrategy }}
- "--update-strategy={{ . }}"
{{- end }}
{{- if .Values.metrics }}
{{- with .Values.metrics }}
- "--listener-metrics-addr={{ .listenerAddr }}"

View File

@ -363,7 +363,6 @@ func TestTemplate_ControllerDeployment_Defaults(t *testing.T) {
"--auto-scaling-runner-set-only",
"--log-level=debug",
"--log-format=text",
"--update-strategy=immediate",
"--metrics-addr=0",
"--listener-metrics-addr=0",
"--listener-metrics-endpoint=",
@ -431,7 +430,6 @@ func TestTemplate_ControllerDeployment_Customize(t *testing.T) {
"topologySpreadConstraints[0].maxSkew": "1",
"topologySpreadConstraints[0].topologyKey": "foo",
"priorityClassName": "test-priority-class",
"flags.updateStrategy": "eventual",
"flags.logLevel": "info",
"flags.logFormat": "json",
"volumes[0].name": "customMount",
@ -516,7 +514,6 @@ func TestTemplate_ControllerDeployment_Customize(t *testing.T) {
"--auto-scaler-image-pull-secrets=dockerhub",
"--log-level=info",
"--log-format=json",
"--update-strategy=eventual",
"--listener-metrics-addr=0",
"--listener-metrics-endpoint=",
"--metrics-addr=0",
@ -645,7 +642,6 @@ func TestTemplate_EnableLeaderElection(t *testing.T) {
"--leader-election-id=test-arc-gha-rs-controller",
"--log-level=debug",
"--log-format=text",
"--update-strategy=immediate",
"--listener-metrics-addr=0",
"--listener-metrics-endpoint=",
"--metrics-addr=0",
@ -687,7 +683,6 @@ func TestTemplate_ControllerDeployment_ForwardImagePullSecrets(t *testing.T) {
"--auto-scaler-image-pull-secrets=ghcr",
"--log-level=debug",
"--log-format=text",
"--update-strategy=immediate",
"--listener-metrics-addr=0",
"--listener-metrics-endpoint=",
"--metrics-addr=0",
@ -778,7 +773,6 @@ func TestTemplate_ControllerDeployment_WatchSingleNamespace(t *testing.T) {
"--log-level=debug",
"--log-format=text",
"--watch-single-namespace=demo",
"--update-strategy=immediate",
"--listener-metrics-addr=0",
"--listener-metrics-endpoint=",
"--metrics-addr=0",

View File

@ -115,22 +115,6 @@ flags:
# It may also increase the load on the API server and the external service (e.g. GitHub API).
runnerMaxConcurrentReconciles: 2
## Defines how the controller should handle upgrades while having running jobs.
##
## The strategies available are:
## - "immediate": (default) The controller will immediately apply the change causing the
## recreation of the listener and ephemeral runner set. This can lead to an
## overprovisioning of runners, if there are pending / running jobs. This should not
## be a problem at a small scale, but it could lead to a significant increase of
## resources if you have a lot of jobs running concurrently.
##
## - "eventual": The controller will remove the listener and ephemeral runner set
## immediately, but will not recreate them (to apply changes) until all
## pending / running jobs have completed.
## This can lead to a longer time to apply the change but it will ensure
## that you don't have any overprovisioning of runners.
updateStrategy: "immediate"
## Defines a list of prefixes that should not be propagated to internal resources.
## This is useful when you have labels that are used for internal purposes and should not be propagated to internal resources.
## See https://github.com/actions/actions-runner-controller/issues/3533 for more information.

View File

@ -19,7 +19,6 @@ package actionsgithubcom
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"
@ -28,6 +27,7 @@ import (
"github.com/actions/actions-runner-controller/build"
"github.com/actions/scaleset"
"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
@ -53,24 +53,6 @@ const (
runnerScaleSetIDAnnotationKey = "runner-scale-set-id"
)
type UpdateStrategy string
// Defines how the controller should handle upgrades while having running jobs.
const (
// "immediate": (default) The controller will immediately apply the change causing the
// recreation of the listener and ephemeral runner set. This can lead to an
// overprovisioning of runners, if there are pending / running jobs. This should not
// be a problem at a small scale, but it could lead to a significant increase of
// resources if you have a lot of jobs running concurrently.
UpdateStrategyImmediate = UpdateStrategy("immediate")
// "eventual": The controller will remove the listener and ephemeral runner set
// immediately, but will not recreate them (to apply changes) until all
// pending / running jobs have completed.
// This can lead to a longer time to apply the change but it will ensure
// that you don't have any overprovisioning of runners.
UpdateStrategyEventual = UpdateStrategy("eventual")
)
// AutoscalingRunnerSetReconciler reconciles a AutoscalingRunnerSet object
type AutoscalingRunnerSetReconciler struct {
client.Client
@ -79,7 +61,6 @@ type AutoscalingRunnerSetReconciler struct {
ControllerNamespace string
DefaultRunnerScaleSetListenerImage string
DefaultRunnerScaleSetListenerImagePullSecrets []string
UpdateStrategy UpdateStrategy
ResourceBuilder
}
@ -106,7 +87,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
}
log.Info("Deleting resources")
done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, nil, log)
done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, log)
if err != nil {
log.Error(err, "Failed to clean up resources during deletion")
return ctrl.Result{}, err
@ -166,166 +147,145 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
log.Info("Successfully added finalizer")
}
// Something has changed, work-out what
if targetHash := autoscalingRunnerSet.Hash(); autoscalingRunnerSet.Annotations[annotationKeyChangeHash] != targetHash {
if err := patch(ctx, r.Client, autoscalingRunnerSet, func(obj *v1alpha1.AutoscalingRunnerSet) {
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations[annotationKeyChangeHash] = targetHash
}); err != nil {
log.Error(err, "Failed to update autoscaling runner set with change hash annotation")
return ctrl.Result{}, err
}
if err := r.updateStatus(ctx, autoscalingRunnerSet, nil, v1alpha1.AutoscalingRunnerSetPhasePending, log); err != nil {
log.Error(err, "Failed to update autoscaling runner set status to pending")
return ctrl.Result{}, err
}
// TODO: work
}
outdated := autoscalingRunnerSet.Status.Phase == v1alpha1.AutoscalingRunnerSetPhaseOutdated
if !outdated {
scaleSetIDRaw, ok := autoscalingRunnerSet.Annotations[runnerScaleSetIDAnnotationKey]
if !ok {
// Need to create a new runner scale set on Actions service
log.Info("Runner scale set id annotation does not exist. Creating a new runner scale set.")
return r.createRunnerScaleSet(ctx, autoscalingRunnerSet, log)
}
if id, err := strconv.Atoi(scaleSetIDRaw); err != nil || id <= 0 {
log.Info("Runner scale set id annotation is not an id, or is <= 0. Creating a new runner scale set.")
// something modified the scaleSetId. Try to create one
return r.createRunnerScaleSet(ctx, autoscalingRunnerSet, log)
}
// Make sure the runner group of the scale set is up to date
currentRunnerGroupName, ok := autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerGroupName]
if !ok || (len(autoscalingRunnerSet.Spec.RunnerGroup) > 0 && !strings.EqualFold(currentRunnerGroupName, autoscalingRunnerSet.Spec.RunnerGroup)) {
log.Info("AutoScalingRunnerSet runner group changed. Updating the runner scale set.")
return r.updateRunnerScaleSetRunnerGroup(ctx, autoscalingRunnerSet, log)
}
// Make sure the runner scale set name is up to date
currentRunnerScaleSetName, ok := autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerScaleSetName]
if !ok || (len(autoscalingRunnerSet.Spec.RunnerScaleSetName) > 0 && !strings.EqualFold(currentRunnerScaleSetName, autoscalingRunnerSet.Spec.RunnerScaleSetName)) {
log.Info("AutoScalingRunnerSet runner scale set name changed. Updating the runner scale set.")
return r.updateRunnerScaleSetName(ctx, autoscalingRunnerSet, log)
}
}
existingRunnerSets, err := r.listEphemeralRunnerSets(ctx, autoscalingRunnerSet)
if err != nil {
log.Error(err, "Failed to list existing ephemeral runner sets")
return ctrl.Result{}, err
}
latestRunnerSet := existingRunnerSets.latest()
if latestRunnerSet == nil && !outdated {
log.Info("Latest runner set does not exist. Creating a new runner set.")
return r.createEphemeralRunnerSet(ctx, autoscalingRunnerSet, log)
}
for _, runnerSet := range existingRunnerSets.all() {
log.Info("Find existing ephemeral runner set", "name", runnerSet.Name, "specHash", runnerSet.Annotations[annotationKeyRunnerSpecHash])
}
outdated = outdated || (latestRunnerSet != nil && latestRunnerSet.Status.Phase == v1alpha1.EphemeralRunnerSetPhaseOutdated)
// Make sure the AutoscalingListener is up and running in the controller namespace
listener := new(v1alpha1.AutoscalingListener)
listenerFound := true
if err := r.Get(ctx, client.ObjectKey{Namespace: r.ControllerNamespace, Name: scaleSetListenerName(autoscalingRunnerSet)}, listener); err != nil {
if !kerrors.IsNotFound(err) {
log.Error(err, "Failed to get AutoscalingListener resource")
return ctrl.Result{}, err
}
listenerFound = false
log.Info("AutoscalingListener does not exist.")
}
if outdated {
log.Info("Ephemeral runner set is outdated")
if autoscalingRunnerSet.Status.Phase != v1alpha1.AutoscalingRunnerSetPhaseOutdated {
if err := r.updateStatus(ctx, autoscalingRunnerSet, latestRunnerSet, v1alpha1.AutoscalingRunnerSetPhaseOutdated, log); err != nil {
log.Error(err, "Failed to update autoscaling runner set status to outdated")
return ctrl.Result{}, err
}
}
done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, latestRunnerSet, log)
log.Info("Autoscaling runner set is in outdated phase")
done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, log)
if err != nil {
log.Error(err, "Failed to clean up resources for outdated runner set")
return ctrl.Result{}, err
}
if done {
log.Info("Successfully cleaned up resources for outdated runner set")
return ctrl.Result{}, nil
}
log.Info("Waiting for resources to be cleaned up for outdated runner set")
return ctrl.Result{
RequeueAfter: 5 * time.Second,
}, nil
}
// Our listener pod is out of date, so we need to delete it to get a new recreate.
listenerValuesHashChanged := listener.Annotations[annotationKeyValuesHash] != autoscalingRunnerSet.Annotations[annotationKeyValuesHash]
listenerSpecHashChanged := listener.Annotations[annotationKeyRunnerSpecHash] != autoscalingRunnerSet.ListenerSpecHash()
if listenerFound && (listenerValuesHashChanged ||
listenerSpecHashChanged ||
latestRunnerSet == nil ||
listener.Spec.EphemeralRunnerSetName != latestRunnerSet.Name) {
log.Info("RunnerScaleSetListener is out of date. Deleting it so that it is recreated", "name", listener.Name)
if err := r.Delete(ctx, listener); err != nil {
log.Error(err, "Failed to delete AutoscalingListener resource")
return ctrl.Result{}, err
}
log.Info("Deleted RunnerScaleSetListener since existing one is out of date")
return ctrl.Result{}, nil
if shouldCreateScaleSet(autoscalingRunnerSet) {
log.Info("Creating runner scale set")
return r.createRunnerScaleSet(ctx, autoscalingRunnerSet, log)
}
if latestRunnerSet.Annotations[annotationKeyRunnerSpecHash] != autoscalingRunnerSet.RunnerSetSpecHash() {
if r.drainingJobs(&latestRunnerSet.Status) {
log.Info("Latest runner set spec hash does not match the current autoscaling runner set. Waiting for the running and pending runners to finish:", "running", latestRunnerSet.Status.RunningEphemeralRunners, "pending", latestRunnerSet.Status.PendingEphemeralRunners)
log.Info("Scaling down the number of desired replicas to 0")
// We are in the process of draining the jobs. The listener has been deleted and the ephemeral runner set replicas
// need to scale down to 0
err := patch(ctx, r.Client, latestRunnerSet, func(obj *v1alpha1.EphemeralRunnerSet) {
obj.Spec.Replicas = 0
obj.Spec.PatchID = 0
})
if err != nil {
log.Error(err, "Failed to patch runner set to set desired count to 0")
}
return ctrl.Result{}, err
}
log.Info("Latest runner set spec hash does not match the current autoscaling runner set. Creating a new runner set")
// Make sure the runner group of the scale set is up to date
currentRunnerGroupName, ok := autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerGroupName]
if !ok || (len(autoscalingRunnerSet.Spec.RunnerGroup) > 0 && !strings.EqualFold(currentRunnerGroupName, autoscalingRunnerSet.Spec.RunnerGroup)) {
log.Info("AutoScalingRunnerSet runner group changed. Updating the runner scale set.")
return r.updateRunnerScaleSetRunnerGroup(ctx, autoscalingRunnerSet, log)
}
// Make sure the runner scale set name is up to date
currentRunnerScaleSetName, ok := autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerScaleSetName]
if !ok || (len(autoscalingRunnerSet.Spec.RunnerScaleSetName) > 0 && !strings.EqualFold(currentRunnerScaleSetName, autoscalingRunnerSet.Spec.RunnerScaleSetName)) {
log.Info("AutoScalingRunnerSet runner scale set name changed. Updating the runner scale set.")
return r.updateRunnerScaleSetName(ctx, autoscalingRunnerSet, log)
}
var ephemeralRunnerSet v1alpha1.EphemeralRunnerSet
err := r.Get(
ctx,
types.NamespacedName{
Namespace: autoscalingRunnerSet.Namespace,
Name: autoscalingRunnerSet.Name,
},
&ephemeralRunnerSet,
)
switch {
case kerrors.IsNotFound(err):
log.Info("Creating ephemeral runner set")
return r.createEphemeralRunnerSet(ctx, autoscalingRunnerSet, log)
}
oldRunnerSets := existingRunnerSets.old()
if len(oldRunnerSets) > 0 {
log.Info("Cleanup old ephemeral runner sets", "count", len(oldRunnerSets))
err := r.deleteEphemeralRunnerSets(ctx, oldRunnerSets, log)
case err != nil:
log.Error(err, "Failed to get ephemeral runner")
return ctrl.Result{}, err
case ephemeralRunnerSet.Status.Phase == v1alpha1.EphemeralRunnerSetPhaseOutdated:
log.Info("Ephemeral runner set is outdated. Cleaning up resources for the outdated runner set")
// TODO: patch autoscaling runner set as outdated
default:
desired, err := r.newEphemeralRunnerSet(autoscalingRunnerSet)
if err != nil {
log.Error(err, "Failed to clean up old runner sets")
return ctrl.Result{}, err
}
}
// Make sure the AutoscalingListener is up and running in the controller namespace
if !listenerFound {
if r.drainingJobs(&latestRunnerSet.Status) {
log.Info("Creating a new AutoscalingListener is waiting for the running and pending runners to finish. Waiting for the running and pending runners to finish:", "running", latestRunnerSet.Status.RunningEphemeralRunners, "pending", latestRunnerSet.Status.PendingEphemeralRunners)
log.Error(err, "Failed to generate ephemeral runner set spec")
return ctrl.Result{}, nil
}
if !cmp.Equal(ephemeralRunnerSet.Spec.EphemeralRunnerSpec, desired.Spec.EphemeralRunnerSpec) ||
!cmp.Equal(ephemeralRunnerSet.Spec.EphemeralRunnerMetadata, desired.Spec.EphemeralRunnerMetadata) {
desired.Labels = r.filterAndMergeLabels(ephemeralRunnerSet.Labels, desired.Labels)
desired.Annotations = r.mergeAnnotations(ephemeralRunnerSet.Annotations, desired.Annotations)
log.Info("Updating ephemeral runner set spec to match the desired spec")
if err := r.Patch(ctx, desired, client.MergeFrom(&ephemeralRunnerSet)); err != nil {
log.Error(err, "Failed to patch ephemeral runner set to match the desired spec")
return ctrl.Result{}, err
}
log.Info("Successfully patched ephemeral runner set spec to match the desired spec")
return ctrl.Result{}, nil
}
log.Info("Creating a new AutoscalingListener for the runner set", "ephemeralRunnerSetName", latestRunnerSet.Name)
return r.createAutoScalingListenerForRunnerSet(ctx, autoscalingRunnerSet, latestRunnerSet, log)
}
if err := r.updateStatus(ctx, autoscalingRunnerSet, latestRunnerSet, v1alpha1.AutoscalingRunnerSetPhaseRunning, log); err != nil {
var listener v1alpha1.AutoscalingListener
err = r.Get(
ctx,
types.NamespacedName{
Namespace: r.ControllerNamespace,
Name: scaleSetListenerName(autoscalingRunnerSet),
},
&listener,
)
switch {
case kerrors.IsNotFound(err):
log.Info("AutoscalingListener does not exist, creating autoscaling listener")
return r.createAutoScalingListenerForRunnerSet(ctx, autoscalingRunnerSet, &ephemeralRunnerSet, log)
case err != nil:
log.Error(err, "Failed to get AutoscalingListener resource")
return ctrl.Result{}, err
default:
desired, err := r.newAutoScalingListener(
autoscalingRunnerSet,
&ephemeralRunnerSet,
r.ControllerNamespace,
r.DefaultRunnerScaleSetListenerImage,
nil, // TODO: remove
)
if err != nil {
log.Error(err, "Failed to generate AutoscalingListener spec")
return ctrl.Result{}, nil
}
if !cmp.Equal(listener.Spec, desired.Spec) ||
!cmp.Equal(listener.Annotations, desired.Labels) ||
!cmp.Equal(listener.Annotations, desired.Annotations) {
desired.Labels = r.filterAndMergeLabels(listener.Labels, desired.Labels)
desired.Annotations = r.mergeAnnotations(listener.Annotations, desired.Annotations)
log.Info("Updating AutoscalingListener spec to match the desired spec")
if err := r.Patch(ctx, desired, client.MergeFrom(&listener)); err != nil {
log.Error(err, "Failed to patch AutoscalingListener to match the desired spec")
return ctrl.Result{}, err
}
log.Info("Successfully patched AutoscalingListener spec to match the desired spec")
return ctrl.Result{}, nil
}
}
log.Info("Autoscaling runner set is up to date and ready")
if err := r.updateStatus(
ctx,
autoscalingRunnerSet,
&ephemeralRunnerSet,
v1alpha1.AutoscalingRunnerSetPhaseRunning,
log,
); err != nil {
log.Error(err, "Failed to update autoscaling runner set status to running")
return ctrl.Result{}, err
}
@ -333,7 +293,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{}, nil
}
func (r *AutoscalingRunnerSetReconciler) cleanUpResources(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, latestRunnerSet *v1alpha1.EphemeralRunnerSet, log logr.Logger) (bool, error) {
func (r *AutoscalingRunnerSetReconciler) cleanUpResources(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, log logr.Logger) (bool, error) {
log.Info("Deleting the listener")
done, err := r.cleanupListener(ctx, autoscalingRunnerSet, log)
if err != nil {
@ -347,7 +307,7 @@ func (r *AutoscalingRunnerSetReconciler) cleanUpResources(ctx context.Context, a
}
log.Info("deleting ephemeral runner sets")
done, err = r.cleanupEphemeralRunnerSets(ctx, autoscalingRunnerSet, log)
done, err = r.cleanupEphemeralRunnerSet(ctx, autoscalingRunnerSet, log)
if err != nil {
log.Error(err, "Failed to clean up ephemeral runner sets")
return false, err
@ -390,20 +350,17 @@ func (r *AutoscalingRunnerSetReconciler) updateStatus(ctx context.Context, autos
return nil
}
// Prevents overprovisioning of runners.
// We reach this code path when runner scale set has been patched with a new runner spec but there are still running ephemeral runners.
// The safest approach is to wait for the running ephemeral runners to finish before creating a new runner set.
func (r *AutoscalingRunnerSetReconciler) drainingJobs(latestRunnerSetStatus *v1alpha1.EphemeralRunnerSetStatus) bool {
if r.UpdateStrategy == UpdateStrategyEventual && ((latestRunnerSetStatus.RunningEphemeralRunners + latestRunnerSetStatus.PendingEphemeralRunners) > 0) {
return true
}
return false
}
func (r *AutoscalingRunnerSetReconciler) cleanupListener(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, logger logr.Logger) (done bool, err error) {
logger.Info("Cleaning up the listener")
var listener v1alpha1.AutoscalingListener
err = r.Get(ctx, client.ObjectKey{Namespace: r.ControllerNamespace, Name: scaleSetListenerName(autoscalingRunnerSet)}, &listener)
err = r.Get(
ctx,
client.ObjectKey{
Namespace: r.ControllerNamespace,
Name: scaleSetListenerName(autoscalingRunnerSet),
},
&listener,
)
switch {
case err == nil:
if listener.DeletionTimestamp.IsZero() {
@ -421,39 +378,32 @@ func (r *AutoscalingRunnerSetReconciler) cleanupListener(ctx context.Context, au
return true, nil
}
func (r *AutoscalingRunnerSetReconciler) cleanupEphemeralRunnerSets(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, logger logr.Logger) (done bool, err error) {
logger.Info("Cleaning up ephemeral runner sets")
runnerSets, err := r.listEphemeralRunnerSets(ctx, autoscalingRunnerSet)
if err != nil {
return false, fmt.Errorf("failed to list ephemeral runner sets: %w", err)
}
if runnerSets.empty() {
logger.Info("All ephemeral runner sets are deleted")
return true, nil
func (r *AutoscalingRunnerSetReconciler) cleanupEphemeralRunnerSet(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, logger logr.Logger) (done bool, err error) {
logger.Info("Cleaning up ephemeral runner set")
var ers v1alpha1.EphemeralRunnerSet
err = r.Get(
ctx,
client.ObjectKey{
Namespace: autoscalingRunnerSet.Namespace,
Name: autoscalingRunnerSet.Name,
},
&ers,
)
switch {
case err == nil:
if ers.DeletionTimestamp.IsZero() {
logger.Info("Deleting the ephemeral runner set")
if err := r.Delete(ctx, &ers); err != nil {
return false, fmt.Errorf("failed to delete ephemeral runner set: %w", err)
}
}
return false, nil
case !kerrors.IsNotFound(err):
return false, fmt.Errorf("failed to get ephemeral runner set: %w", err)
}
logger.Info("Deleting all ephemeral runner sets", "count", runnerSets.count())
if err := r.deleteEphemeralRunnerSets(ctx, runnerSets.all(), logger); err != nil {
return false, fmt.Errorf("failed to delete ephemeral runner sets: %w", err)
}
return false, nil
}
func (r *AutoscalingRunnerSetReconciler) deleteEphemeralRunnerSets(ctx context.Context, oldRunnerSets []v1alpha1.EphemeralRunnerSet, logger logr.Logger) error {
for i := range oldRunnerSets {
rs := &oldRunnerSets[i]
// already deleted but contains finalizer so it still exists
if !rs.DeletionTimestamp.IsZero() {
logger.Info("Skip ephemeral runner set since it is already marked for deletion", "name", rs.Name)
continue
}
logger.Info("Deleting ephemeral runner set", "name", rs.Name)
if err := r.Delete(ctx, rs); err != nil {
return fmt.Errorf("failed to delete EphemeralRunnerSet resource: %w", err)
}
logger.Info("Deleted ephemeral runner set", "name", rs.Name)
}
return nil
logger.Info("Ephemeral runner set is deleted")
return true, nil
}
func (r *AutoscalingRunnerSetReconciler) removeFinalizersFromDependentResources(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, logger logr.Logger) error {
@ -669,7 +619,7 @@ func (r *AutoscalingRunnerSetReconciler) deleteRunnerScaleSet(ctx context.Contex
//
// 2. The scale set has been deleted by the controller.
// In that case, the controller will clean up annotation because the scale set does not exist anymore.
// Removal of the scale set id is also useful because permission cleanup will eventually lose permission
// Removal of the scale set id is also useful because permission cleanup will later lose permission
// assigned to it on a GitHub secret, causing actions client from secret to result in permission denied
//
// 3. Annotation is removed manually.
@ -741,7 +691,13 @@ func (r *AutoscalingRunnerSetReconciler) createAutoScalingListenerForRunnerSet(c
})
}
autoscalingListener, err := r.newAutoScalingListener(autoscalingRunnerSet, ephemeralRunnerSet, r.ControllerNamespace, r.DefaultRunnerScaleSetListenerImage, imagePullSecrets)
autoscalingListener, err := r.newAutoScalingListener(
autoscalingRunnerSet,
ephemeralRunnerSet,
r.ControllerNamespace,
r.DefaultRunnerScaleSetListenerImage,
imagePullSecrets,
)
if err != nil {
log.Error(err, "Could not create AutoscalingListener spec")
return ctrl.Result{}, err
@ -757,13 +713,13 @@ func (r *AutoscalingRunnerSetReconciler) createAutoScalingListenerForRunnerSet(c
return ctrl.Result{}, nil
}
func (r *AutoscalingRunnerSetReconciler) listEphemeralRunnerSets(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet) (*EphemeralRunnerSets, error) {
list := new(v1alpha1.EphemeralRunnerSetList)
if err := r.List(ctx, list, client.InNamespace(autoscalingRunnerSet.Namespace), client.MatchingFields{resourceOwnerKey: autoscalingRunnerSet.Name}); err != nil {
return nil, fmt.Errorf("failed to list ephemeral runner sets: %w", err)
func shouldCreateScaleSet(autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet) bool {
scaleSetIDRaw, ok := autoscalingRunnerSet.Annotations[runnerScaleSetIDAnnotationKey]
if !ok {
return true
}
return &EphemeralRunnerSets{list: list}, nil
id, err := strconv.Atoi(scaleSetIDRaw)
return err != nil || id <= 0
}
// SetupWithManager sets up the controller with the Manager.
@ -1103,54 +1059,3 @@ func (c *autoscalingRunnerSetFinalizerDependencyCleaner) removeManagerRoleFinali
return
}
}
// NOTE: if this is logic should be used for other resources,
// consider using generics
type EphemeralRunnerSets struct {
list *v1alpha1.EphemeralRunnerSetList
sorted bool
}
func (rs *EphemeralRunnerSets) latest() *v1alpha1.EphemeralRunnerSet {
if rs.empty() {
return nil
}
if !rs.sorted {
rs.sort()
}
return rs.list.Items[0].DeepCopy()
}
func (rs *EphemeralRunnerSets) old() []v1alpha1.EphemeralRunnerSet {
if rs.empty() {
return nil
}
if !rs.sorted {
rs.sort()
}
copy := rs.list.DeepCopy()
return copy.Items[1:]
}
func (rs *EphemeralRunnerSets) all() []v1alpha1.EphemeralRunnerSet {
if rs.empty() {
return nil
}
copy := rs.list.DeepCopy()
return copy.Items
}
func (rs *EphemeralRunnerSets) empty() bool {
return rs.list == nil || len(rs.list.Items) == 0
}
func (rs *EphemeralRunnerSets) sort() {
sort.Slice(rs.list.Items, func(i, j int) bool {
return rs.list.Items[i].GetCreationTimestamp().After(rs.list.Items[j].GetCreationTimestamp().Time)
})
}
func (rs *EphemeralRunnerSets) count() int {
return len(rs.list.Items)
}

View File

@ -441,6 +441,114 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
Expect(listener.Spec.EphemeralRunnerSetName).To(Equal(latestERSName), "listener should still reference latest ERS")
})
It("should converge multiple ERS to one canonical ERS named after ARS and stay idempotent", func() {
// wait until initial canonical ERS exists
canonical := new(v1alpha1.EphemeralRunnerSet)
Eventually(
func() error {
err := k8sClient.Get(ctx, client.ObjectKey{Name: autoscalingRunnerSet.Name, Namespace: autoscalingRunnerSet.Namespace}, canonical)
if err != nil {
return err
}
return nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(Succeed(), "canonical ERS should be created")
// create extra non-canonical ERS that belong to the same ARS
extraOne := canonical.DeepCopy()
extraOne.ObjectMeta = metav1.ObjectMeta{
Name: autoscalingRunnerSet.Name + "-legacy-1",
Namespace: autoscalingRunnerSet.Namespace,
Labels: canonical.Labels,
Annotations: map[string]string{
annotationKeyRunnerSpecHash: canonical.Annotations[annotationKeyRunnerSpecHash],
},
OwnerReferences: canonical.OwnerReferences,
}
extraTwo := canonical.DeepCopy()
extraTwo.ObjectMeta = metav1.ObjectMeta{
Name: autoscalingRunnerSet.Name + "-legacy-2",
Namespace: autoscalingRunnerSet.Namespace,
Labels: canonical.Labels,
Annotations: map[string]string{
annotationKeyRunnerSpecHash: canonical.Annotations[annotationKeyRunnerSpecHash],
},
OwnerReferences: canonical.OwnerReferences,
}
Expect(k8sClient.Create(ctx, extraOne)).To(Succeed(), "should create extra non-canonical ERS #1")
Expect(k8sClient.Create(ctx, extraTwo)).To(Succeed(), "should create extra non-canonical ERS #2")
// controller should converge to exactly one active canonical ERS
Eventually(
func() (int, error) {
runnerSetList := new(v1alpha1.EphemeralRunnerSetList)
err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace))
if err != nil {
return 0, err
}
activeCount := 0
for _, ers := range runnerSetList.Items {
for _, owner := range ers.OwnerReferences {
if owner.UID != autoscalingRunnerSet.UID {
continue
}
if ers.Name != autoscalingRunnerSet.Name {
return 0, fmt.Errorf("found non-canonical ERS %q while converging", ers.Name)
}
if ers.DeletionTimestamp.IsZero() {
activeCount++
}
break
}
}
if activeCount != 1 {
return activeCount, fmt.Errorf("expected exactly 1 active canonical ERS, got %d", activeCount)
}
return activeCount, nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(Equal(1), "controller should converge to one active canonical ERS")
// idempotency: repeated reconciles should keep same single canonical ERS
Consistently(
func() (int, error) {
runnerSetList := new(v1alpha1.EphemeralRunnerSetList)
err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace))
if err != nil {
return 0, err
}
activeCount := 0
for _, ers := range runnerSetList.Items {
for _, owner := range ers.OwnerReferences {
if owner.UID != autoscalingRunnerSet.UID {
continue
}
if ers.Name != autoscalingRunnerSet.Name {
return 0, fmt.Errorf("found non-canonical ERS %q after convergence", ers.Name)
}
if ers.DeletionTimestamp.IsZero() {
activeCount++
}
break
}
}
return activeCount, nil
},
time.Second*5,
autoscalingRunnerSetTestInterval,
).Should(Equal(1), "reconcile should stay idempotent with one active canonical ERS")
})
Context("When updating a new AutoScalingRunnerSet", func() {
It("It should re-create EphemeralRunnerSet and Listener as needed when updating AutoScalingRunnerSet", func() {
// Wait till the listener is created
@ -491,19 +599,20 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
).ShouldNot(BeEquivalentTo(runnerSet.Annotations[annotationKeyRunnerSpecHash]), "New EphemeralRunnerSet should be created")
// We should create a new listener
oldListenerUID := listener.UID
Eventually(
func() (string, error) {
func() (types.UID, error) {
listener := new(v1alpha1.AutoscalingListener)
err := k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, listener)
if err != nil {
return "", err
}
return listener.Spec.EphemeralRunnerSetName, nil
return listener.UID, nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).ShouldNot(BeEquivalentTo(runnerSet.Name), "New Listener should be created")
).ShouldNot(BeEquivalentTo(oldListenerUID), "New Listener should be created")
// Only update the Spec for the AutoScalingListener
// This should trigger re-creation of the Listener only
@ -670,9 +779,7 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
})
Context("When updating an AutoscalingRunnerSet with running or pending jobs", func() {
It("It should wait for running and pending jobs to finish before applying the update. Update Strategy is set to eventual.", func() {
// Switch update strategy to eventual (drain jobs )
controller.UpdateStrategy = UpdateStrategyEventual
It("It should wait for running and pending jobs to finish before applying the update.", func() {
// Wait till the listener is created
listener := new(v1alpha1.AutoscalingListener)
Eventually(

View File

@ -585,7 +585,7 @@ func (b *ResourceBuilder) newEphemeralRunnerSet(autoscalingRunnerSet *v1alpha1.A
newEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
GenerateName: autoscalingRunnerSet.Name + "-",
GenerateName: autoscalingRunnerSet.Name,
Namespace: autoscalingRunnerSet.Namespace,
Labels: labels,
Annotations: annotations,

14
main.go
View File

@ -91,7 +91,6 @@ func main() {
autoScalingRunnerSetOnly bool
enableLeaderElection bool
disableAdmissionWebhook bool
updateStrategy string
leaderElectionID string
port int
syncPeriod time.Duration
@ -160,7 +159,6 @@ func main() {
flag.StringVar(&logLevel, "log-level", logging.LogLevelDebug, `The verbosity of the logging. Valid values are "debug", "info", "warn", "error". Defaults to "debug".`)
flag.StringVar(&logFormat, "log-format", "text", `The log format. Valid options are "text" and "json". Defaults to "text"`)
flag.BoolVar(&autoScalingRunnerSetOnly, "auto-scaling-runner-set-only", false, "Make controller only reconcile AutoRunnerScaleSet object.")
flag.StringVar(&updateStrategy, "update-strategy", "immediate", `Resources reconciliation strategy on upgrade with running/pending jobs. Valid values are: "immediate", "eventual". Defaults to "immediate".`)
flag.Var(&autoScalerImagePullSecrets, "auto-scaler-image-pull-secrets", "The default image-pull secret name for auto-scaler listener container.")
flag.IntVar(&k8sClientRateLimiterQPS, "k8s-client-rate-limiter-qps", 20, "The QPS value of the K8s client rate limiter.")
flag.IntVar(&k8sClientRateLimiterBurst, "k8s-client-rate-limiter-burst", 30, "The burst value of the K8s client rate limiter.")
@ -211,14 +209,6 @@ func main() {
defaultNamespaces[watchSingleNamespace] = cache.Config{}
defaultNamespaces[managerNamespace] = cache.Config{}
}
switch updateStrategy {
case "eventual", "immediate":
log.Info(`Update strategy set to:`, "updateStrategy", updateStrategy)
default:
log.Info(`Update strategy not recognized. Defaulting to "immediately"`, "updateStrategy", updateStrategy)
updateStrategy = "immediate"
}
}
if actionsgithubcom.SetListenerLoggingParameters(logLevel, logFormat) {
@ -320,7 +310,8 @@ func main() {
switch workqueueRateLimiter {
case "typed_rate_limiter":
log.Info("Using typed rate limiter (per-item only, no global token bucket)")
controllerOpts = append(controllerOpts,
controllerOpts = append(
controllerOpts,
actionsgithubcom.WithTypedRateLimiter(workqueue.DefaultTypedItemBasedRateLimiter[reconcile.Request]()),
)
case "bucket_rate_limiter", "":
@ -336,7 +327,6 @@ func main() {
Scheme: mgr.GetScheme(),
ControllerNamespace: managerNamespace,
DefaultRunnerScaleSetListenerImage: managerImage,
UpdateStrategy: actionsgithubcom.UpdateStrategy(updateStrategy),
DefaultRunnerScaleSetListenerImagePullSecrets: autoScalerImagePullSecrets,
ResourceBuilder: rb,
}).SetupWithManager(mgr, controllerOpts...); err != nil {

View File

@ -25,7 +25,6 @@ function install_arc() {
--namespace "arc-systems" \
--create-namespace \
--set controller.manager.container.image="${IMAGE_NAME}:${IMAGE_TAG}" \
--set controller.manager.config.updateStrategy="eventual" \
"${ROOT_DIR}/charts/gha-runner-scale-set-controller-experimental" \
--debug

View File

@ -26,7 +26,6 @@ function install_arc() {
--create-namespace \
--set image.repository="${IMAGE_NAME}" \
--set image.tag="${IMAGE_TAG}" \
--set flags.updateStrategy="eventual" \
"${ROOT_DIR}/charts/gha-runner-scale-set-controller" \
--debug

View File

@ -25,7 +25,6 @@ function install_arc() {
--namespace "${ARC_NAMESPACE}" \
--create-namespace \
--set controller.manager.container.image="${IMAGE_NAME}:${IMAGE_TAG}" \
--set controller.manager.config.updateStrategy="eventual" \
"${ROOT_DIR}/charts/gha-runner-scale-set-controller-experimental" \
--debug

View File

@ -26,7 +26,6 @@ function install_arc() {
--create-namespace \
--set image.repository="${IMAGE_NAME}" \
--set image.tag="${IMAGE_TAG}" \
--set flags.updateStrategy="eventual" \
"${ROOT_DIR}/charts/gha-runner-scale-set-controller" \
--debug