This commit is contained in:
Nikola Jokic 2026-05-16 01:10:30 +02:00
parent 8cb3f49049
commit c6643e9108
No known key found for this signature in database
GPG Key ID: 419BB425B0E501B0
5 changed files with 389 additions and 95 deletions

View File

@ -54,6 +54,10 @@ const (
// EphemeralRunnerSetPhaseOutdated is set when at least one ephemeral runner
// contains the outdated phase
EphemeralRunnerSetPhaseOutdated EphemeralRunnerSetPhase = "Outdated"
// EphemeralRunnerSetPhaseDraining is set when the runner set is being actively
// drained. In this phase, the desired replicas are expected to be 0 and patchID
// is expected to be 0.
EphemeralRunnerSetPhaseDraining EphemeralRunnerSetPhase = "Draining"
)
// +kubebuilder:object:root=true

View File

@ -46,8 +46,9 @@ const (
// annotationKeyValuesHash is hash of the entire values json.
// This is used to determine if the values have changed, so we can
// re-create listener.
annotationKeyValuesHash = "actions.github.com/values-hash"
annotationKeyChangeHash = "actions.github.com/change-hash"
annotationKeyValuesHash = "actions.github.com/values-hash"
annotationKeyChangeHash = "actions.github.com/change-hash"
annotationKeyRunnerSetDraining = "actions.github.com/runner-set-draining"
autoscalingRunnerSetFinalizerName = "autoscalingrunnerset.actions.github.com/finalizer"
runnerScaleSetIDAnnotationKey = "runner-scale-set-id"
@ -106,7 +107,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
}
log.Info("Deleting resources")
done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, nil, log)
done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, log)
if err != nil {
log.Error(err, "Failed to clean up resources during deletion")
return ctrl.Result{}, err
@ -138,14 +139,16 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
if !v1alpha1.IsVersionAllowed(autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion], build.Version) {
if err := r.Delete(ctx, autoscalingRunnerSet); err != nil {
log.Error(err, "Failed to delete autoscaling runner set on version mismatch",
log.Error(
err, "Failed to delete autoscaling runner set on version mismatch",
"buildVersion", build.Version,
"autoscalingRunnerSetVersion", autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion],
)
return ctrl.Result{}, nil
}
log.Info("Autoscaling runner set version doesn't match the build version. Deleting the resource.",
log.Info(
"Autoscaling runner set version doesn't match the build version. Deleting the resource.",
"buildVersion", build.Version,
"autoscalingRunnerSetVersion", autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion],
)
@ -252,7 +255,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
}
}
done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, latestRunnerSet, log)
done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, log)
if err != nil {
log.Error(err, "Failed to clean up resources for outdated runner set")
return ctrl.Result{}, err
@ -287,17 +290,22 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
if latestRunnerSet.Annotations[annotationKeyRunnerSpecHash] != autoscalingRunnerSet.RunnerSetSpecHash() {
if r.drainingJobs(&latestRunnerSet.Status) {
log.Info("Latest runner set spec hash does not match the current autoscaling runner set. Waiting for the running and pending runners to finish:", "running", latestRunnerSet.Status.RunningEphemeralRunners, "pending", latestRunnerSet.Status.PendingEphemeralRunners)
log.Info("Scaling down the number of desired replicas to 0")
log.Info("Scaling down the number of desired replicas to 0 and setting draining phase")
// We are in the process of draining the jobs. The listener has been deleted and the ephemeral runner set replicas
// need to scale down to 0
err := patch(ctx, r.Client, latestRunnerSet, func(obj *v1alpha1.EphemeralRunnerSet) {
obj.Spec.Replicas = 0
obj.Spec.PatchID = 0
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations[annotationKeyRunnerSetDraining] = "true"
})
if err != nil {
log.Error(err, "Failed to patch runner set to set desired count to 0")
return ctrl.Result{}, err
}
return ctrl.Result{}, err
return ctrl.Result{}, nil
}
log.Info("Latest runner set spec hash does not match the current autoscaling runner set. Creating a new runner set")
return r.createEphemeralRunnerSet(ctx, autoscalingRunnerSet, log)
@ -305,8 +313,19 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
oldRunnerSets := existingRunnerSets.old()
if len(oldRunnerSets) > 0 {
log.Info("Ensuring old ephemeral runner sets are draining", "count", len(oldRunnerSets))
drainingReady, err := r.ensureOldRunnerSetsDraining(ctx, oldRunnerSets, log)
if err != nil {
log.Error(err, "Failed to ensure old runner sets are draining")
return ctrl.Result{}, err
}
if !drainingReady {
log.Info("Waiting for old ephemeral runner sets to enter draining state")
return ctrl.Result{RequeueAfter: 2 * time.Second}, nil
}
log.Info("Cleanup old ephemeral runner sets", "count", len(oldRunnerSets))
err := r.deleteEphemeralRunnerSets(ctx, oldRunnerSets, log)
err = r.deleteEphemeralRunnerSets(ctx, oldRunnerSets, log)
if err != nil {
log.Error(err, "Failed to clean up old runner sets")
return ctrl.Result{}, err
@ -331,7 +350,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{}, nil
}
func (r *AutoscalingRunnerSetReconciler) cleanUpResources(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, latestRunnerSet *v1alpha1.EphemeralRunnerSet, log logr.Logger) (bool, error) {
func (r *AutoscalingRunnerSetReconciler) cleanUpResources(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, log logr.Logger) (bool, error) {
log.Info("Deleting the listener")
done, err := r.cleanupListener(ctx, autoscalingRunnerSet, log)
if err != nil {
@ -430,6 +449,15 @@ func (r *AutoscalingRunnerSetReconciler) cleanupEphemeralRunnerSets(ctx context.
return true, nil
}
drainingReady, err := r.ensureOldRunnerSetsDraining(ctx, runnerSets.all(), logger)
if err != nil {
return false, fmt.Errorf("failed to set ephemeral runner sets into draining mode: %w", err)
}
if !drainingReady {
logger.Info("Waiting for ephemeral runner sets to enter draining mode")
return false, nil
}
logger.Info("Deleting all ephemeral runner sets", "count", runnerSets.count())
if err := r.deleteEphemeralRunnerSets(ctx, runnerSets.all(), logger); err != nil {
return false, fmt.Errorf("failed to delete ephemeral runner sets: %w", err)
@ -440,6 +468,14 @@ func (r *AutoscalingRunnerSetReconciler) cleanupEphemeralRunnerSets(ctx context.
func (r *AutoscalingRunnerSetReconciler) deleteEphemeralRunnerSets(ctx context.Context, oldRunnerSets []v1alpha1.EphemeralRunnerSet, logger logr.Logger) error {
for i := range oldRunnerSets {
rs := &oldRunnerSets[i]
if !r.runnerSetInDrainingMode(rs) {
logger.Info("Skip deleting ephemeral runner set until it is in draining mode", "name", rs.Name, "phase", rs.Status.Phase, "replicas", rs.Spec.Replicas, "patchID", rs.Spec.PatchID)
continue
}
if r.hasRunningOrPendingRunners(&rs.Status) {
logger.Info("Skip deleting ephemeral runner set while it still has running or pending runners", "name", rs.Name, "running", rs.Status.RunningEphemeralRunners, "pending", rs.Status.PendingEphemeralRunners)
continue
}
// already deleted but contains finalizer so it still exists
if !rs.DeletionTimestamp.IsZero() {
logger.Info("Skip ephemeral runner set since it is already marked for deletion", "name", rs.Name)
@ -454,6 +490,47 @@ func (r *AutoscalingRunnerSetReconciler) deleteEphemeralRunnerSets(ctx context.C
return nil
}
func (r *AutoscalingRunnerSetReconciler) ensureOldRunnerSetsDraining(ctx context.Context, oldRunnerSets []v1alpha1.EphemeralRunnerSet, logger logr.Logger) (bool, error) {
allReady := true
for i := range oldRunnerSets {
rs := &oldRunnerSets[i]
if !rs.DeletionTimestamp.IsZero() {
continue
}
if r.runnerSetInDrainingMode(rs) {
continue
}
allReady = false
runnerSetCopy := rs.DeepCopy()
logger.Info("Patching old ephemeral runner set into draining mode", "name", rs.Name)
err := patch(ctx, r.Client, runnerSetCopy, func(obj *v1alpha1.EphemeralRunnerSet) {
obj.Spec.Replicas = 0
obj.Spec.PatchID = 0
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations[annotationKeyRunnerSetDraining] = "true"
})
if err != nil {
return false, fmt.Errorf("failed to patch old EphemeralRunnerSet %s into draining mode: %w", rs.Name, err)
}
}
return allReady, nil
}
func (r *AutoscalingRunnerSetReconciler) runnerSetInDrainingMode(rs *v1alpha1.EphemeralRunnerSet) bool {
return rs.Spec.Replicas == 0 && rs.Spec.PatchID == 0 && rs.Annotations[annotationKeyRunnerSetDraining] == "true"
}
func (r *AutoscalingRunnerSetReconciler) hasRunningOrPendingRunners(status *v1alpha1.EphemeralRunnerSetStatus) bool {
if status == nil {
return false
}
return (status.RunningEphemeralRunners + status.PendingEphemeralRunners) > 0
}
func (r *AutoscalingRunnerSetReconciler) removeFinalizersFromDependentResources(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, logger logr.Logger) error {
c := autoscalingRunnerSetFinalizerDependencyCleaner{
client: r.Client,
@ -537,7 +614,8 @@ func (r *AutoscalingRunnerSetReconciler) createRunnerScaleSet(ctx context.Contex
RunnerSetting: scaleset.RunnerSetting{
DisableUpdate: true,
},
})
},
)
if err != nil {
logger.Error(err, "Failed to create a new runner scale set on Actions service")
return ctrl.Result{}, err

View File

@ -153,7 +153,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return created.Finalizers[0], nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(BeEquivalentTo(autoscalingRunnerSetFinalizerName), "AutoScalingRunnerSet should have a finalizer")
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo(autoscalingRunnerSetFinalizerName), "AutoScalingRunnerSet should have a finalizer")
// Check if runner scale set is created on service
Eventually(
@ -202,7 +203,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return fmt.Sprintf("%s/%s", created.Labels[LabelKeyGitHubOrganization], created.Labels[LabelKeyGitHubRepository]), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(BeEquivalentTo("owner/repo"), "RunnerScaleSet should be created/fetched and update the AutoScalingRunnerSet's label")
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo("owner/repo"), "RunnerScaleSet should be created/fetched and update the AutoScalingRunnerSet's label")
// Check if ephemeral runner set is created
Eventually(
@ -216,7 +218,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return len(runnerSetList.Items), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(BeEquivalentTo(1), "Only one EphemeralRunnerSet should be created")
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo(1), "Only one EphemeralRunnerSet should be created")
// Check if listener is created
Eventually(
@ -224,7 +227,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, new(v1alpha1.AutoscalingListener))
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be created")
autoscalingRunnerSetTestInterval,
).Should(Succeed(), "Listener should be created")
// Check if status is updated
runnerSetList := new(v1alpha1.EphemeralRunnerSetList)
@ -242,7 +246,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, new(v1alpha1.AutoscalingListener))
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be created")
autoscalingRunnerSetTestInterval,
).Should(Succeed(), "Listener should be created")
// Delete the AutoScalingRunnerSet
err := k8sClient.Delete(ctx, autoscalingRunnerSet)
@ -259,7 +264,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return fmt.Errorf("listener is not deleted")
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be deleted")
autoscalingRunnerSetTestInterval,
).Should(Succeed(), "Listener should be deleted")
// Check if all the EphemeralRunnerSet is deleted
Eventually(
@ -277,7 +283,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(Succeed(), "All EphemeralRunnerSet should be deleted")
autoscalingRunnerSetTestInterval,
).Should(Succeed(), "All EphemeralRunnerSet should be deleted")
// Check if the AutoScalingRunnerSet is deleted
Eventually(
@ -290,7 +297,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return fmt.Errorf("AutoScalingRunnerSet is not deleted")
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(Succeed(), "AutoScalingRunnerSet should be deleted")
autoscalingRunnerSetTestInterval,
).Should(Succeed(), "AutoScalingRunnerSet should be deleted")
})
})
@ -303,7 +311,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, listener)
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be created")
autoscalingRunnerSetTestInterval,
).Should(Succeed(), "Listener should be created")
runnerSetList := new(v1alpha1.EphemeralRunnerSetList)
err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace))
@ -339,7 +348,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return runnerSetList.Items[0].Annotations[annotationKeyRunnerSpecHash], nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(runnerSet.Annotations[annotationKeyRunnerSpecHash]), "New EphemeralRunnerSet should be created")
autoscalingRunnerSetTestInterval,
).ShouldNot(BeEquivalentTo(runnerSet.Annotations[annotationKeyRunnerSpecHash]), "New EphemeralRunnerSet should be created")
// We should create a new listener
Eventually(
@ -353,7 +363,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return listener.Spec.EphemeralRunnerSetName, nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(runnerSet.Name), "New Listener should be created")
autoscalingRunnerSetTestInterval,
).ShouldNot(BeEquivalentTo(runnerSet.Name), "New Listener should be created")
// Only update the Spec for the AutoScalingListener
// This should trigger re-creation of the Listener only
@ -389,7 +400,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return string(runnerSetList.Items[0].UID), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(BeEquivalentTo(string(runnerSet.UID)), "New EphemeralRunnerSet should not be created")
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo(string(runnerSet.UID)), "New EphemeralRunnerSet should not be created")
// We should only re-create a new listener
Eventually(
@ -403,7 +415,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return string(listener.UID), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(string(listener.UID)), "New Listener should be created")
autoscalingRunnerSetTestInterval,
).ShouldNot(BeEquivalentTo(string(listener.UID)), "New Listener should be created")
// Only update the values hash for the autoscaling runner set
// This should trigger re-creation of the Listener only
@ -438,7 +451,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return string(runnerSetList.Items[0].UID), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(BeEquivalentTo(string(runnerSet.UID)), "New EphemeralRunnerSet should not be created")
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo(string(runnerSet.UID)), "New EphemeralRunnerSet should not be created")
// We should only re-create a new listener
Eventually(
@ -452,7 +466,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return string(listener.UID), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(string(listener.UID)), "New Listener should be created")
autoscalingRunnerSetTestInterval,
).ShouldNot(BeEquivalentTo(string(listener.UID)), "New Listener should be created")
})
It("It should update RunnerScaleSet's runner group on service when it changes", func() {
@ -463,7 +478,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, new(v1alpha1.AutoscalingListener))
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be created")
autoscalingRunnerSetTestInterval,
).Should(Succeed(), "Listener should be created")
patched := autoscalingRunnerSet.DeepCopy()
patched.Spec.RunnerGroup = "testgroup2"
@ -485,7 +501,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
return updated.Annotations[AnnotationKeyGitHubRunnerGroupName], nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval).Should(BeEquivalentTo("testgroup2"), "AutoScalingRunnerSet should have the new runner group in its annotation")
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo("testgroup2"), "AutoScalingRunnerSet should have the new runner group in its annotation")
// delete the annotation and it should be re-added
patched = autoscalingRunnerSet.DeepCopy()
@ -511,6 +528,65 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo("testgroup2"), "AutoScalingRunnerSet should have the runner group in its annotation")
})
It("It should patch old ephemeral runner sets into draining mode before deleting them", func() {
runnerSetList := new(v1alpha1.EphemeralRunnerSetList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace))
if err != nil {
return 0, err
}
return len(runnerSetList.Items), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo(1), "Only one EphemeralRunnerSet should exist initially")
oldRunnerSet := runnerSetList.Items[0]
patched := autoscalingRunnerSet.DeepCopy()
patched.Spec.Template.Spec.PriorityClassName = "test-priority-class-draining"
if patched.Annotations == nil {
patched.Annotations = make(map[string]string)
}
patched.Annotations[annotationKeyValuesHash] = "draining-mode-hash"
err := k8sClient.Patch(ctx, patched, client.MergeFrom(autoscalingRunnerSet))
Expect(err).NotTo(HaveOccurred(), "failed to patch AutoScalingRunnerSet")
autoscalingRunnerSet = patched.DeepCopy()
Eventually(
func() (int, error) {
list := new(v1alpha1.EphemeralRunnerSetList)
err := k8sClient.List(ctx, list, client.InNamespace(autoscalingRunnerSet.Namespace))
if err != nil {
return 0, err
}
return len(list.Items), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(BeNumerically(">=", 2), "Expected both old and new EphemeralRunnerSet during transition")
Eventually(
func() (string, error) {
old := new(v1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: oldRunnerSet.Name, Namespace: oldRunnerSet.Namespace}, old)
if err != nil {
if errors.IsNotFound(err) {
return "true", nil
}
return "", err
}
if old.Spec.PatchID != 0 || old.Spec.Replicas != 0 {
return "", fmt.Errorf("old runner set not patched into draining mode yet: patchID=%d replicas=%d", old.Spec.PatchID, old.Spec.Replicas)
}
return old.Annotations[annotationKeyRunnerSetDraining], nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(Equal("true"), "Old runner set should be marked as draining")
})
})
Context("When updating an AutoscalingRunnerSet with running or pending jobs", func() {
@ -642,13 +718,14 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
).Should(BeTrue(), "AutoscalingRunnerSet should be created")
runnerSetList := new(v1alpha1.EphemeralRunnerSetList)
Eventually(func() (int, error) {
err := k8sClient.List(ctx, runnerSetList, client.InNamespace(ars.Namespace))
if err != nil {
return 0, err
}
return len(runnerSetList.Items), nil
},
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerSetList, client.InNamespace(ars.Namespace))
if err != nil {
return 0, err
}
return len(runnerSetList.Items), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo(1), "Failed to fetch runner set list")

View File

@ -195,6 +195,13 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R
}
total := ephemeralRunnersByState.scaleTotal()
if r.isDrainingMode(ephemeralRunnerSet) {
if err := r.reconcileDrainingRunnerSet(ctx, ephemeralRunnerSet, ephemeralRunnersByState, total, log); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, r.updateStatus(ctx, ephemeralRunnerSet, ephemeralRunnersByState, log)
}
if ephemeralRunnerSet.Spec.PatchID == 0 || ephemeralRunnerSet.Spec.PatchID != ephemeralRunnersByState.latestPatchID {
defer func() {
if err := r.cleanupFinishedEphemeralRunners(ctx, ephemeralRunnersByState.finished, log); err != nil {
@ -240,6 +247,8 @@ func (r *EphemeralRunnerSetReconciler) updateStatus(ctx context.Context, ephemer
total := state.scaleTotal()
var phase v1alpha1.EphemeralRunnerSetPhase
switch {
case r.isDrainingMode(ephemeralRunnerSet):
phase = v1alpha1.EphemeralRunnerSetPhaseDraining
case len(state.outdated) > 0:
phase = v1alpha1.EphemeralRunnerSetPhaseOutdated
case ephemeralRunnerSet.Status.Phase == "":
@ -270,6 +279,30 @@ func (r *EphemeralRunnerSetReconciler) updateStatus(ctx context.Context, ephemer
return nil
}
func (r *EphemeralRunnerSetReconciler) isDrainingMode(ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet) bool {
if ephemeralRunnerSet == nil {
return false
}
return ephemeralRunnerSet.Spec.PatchID == 0 && ephemeralRunnerSet.Spec.Replicas == 0 && ephemeralRunnerSet.Annotations[annotationKeyRunnerSetDraining] == "true"
}
func (r *EphemeralRunnerSetReconciler) reconcileDrainingRunnerSet(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, state *ephemeralRunnersByState, total int, log logr.Logger) error {
terminated := state.terminated()
for i := range terminated {
log.Info("Deleting terminated ephemeral runner in draining mode", "name", terminated[i].Name)
if err := r.Delete(ctx, terminated[i]); err != nil && !kerrors.IsNotFound(err) {
return err
}
}
if total == 0 {
return nil
}
log.Info("Draining ephemeral runner set aggressively", "pending", len(state.pending), "running", len(state.running))
return r.deleteIdleEphemeralRunners(ctx, ephemeralRunnerSet, state.pending, state.running, len(state.pending)+len(state.running), log)
}
func (r *EphemeralRunnerSetReconciler) cleanupFinishedEphemeralRunners(ctx context.Context, finishedEphemeralRunners []*v1alpha1.EphemeralRunner, log logr.Logger) error {
// cleanup finished runners and proceed
var errs []error

View File

@ -113,7 +113,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return created.Finalizers[0], nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(ephemeralRunnerSetFinalizerName), "EphemeralRunnerSet should have a finalizer")
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(ephemeralRunnerSetFinalizerName), "EphemeralRunnerSet should have a finalizer")
// Check if the number of ephemeral runners are stay 0
Consistently(
@ -126,7 +127,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(0), "No EphemeralRunner should be created")
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(0), "No EphemeralRunner should be created")
// Check if the status stay 0
Consistently(
@ -140,7 +142,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return int(runnerSet.Status.CurrentReplicas), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(0), "EphemeralRunnerSet status should be 0")
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(0), "EphemeralRunnerSet status should be 0")
// Scaling up the EphemeralRunnerSet
updated := created.DeepCopy()
@ -178,7 +181,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created")
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created")
// Check if the status is updated
Eventually(
@ -192,7 +196,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return int(runnerSet.Status.CurrentReplicas), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "EphemeralRunnerSet status should be 5")
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(5), "EphemeralRunnerSet status should be 5")
})
})
@ -238,7 +243,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created")
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created")
// Delete the EphemeralRunnerSet
err = k8sClient.Delete(ctx, created)
@ -255,7 +261,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(0), "All EphemeralRunner should be deleted")
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(0), "All EphemeralRunner should be deleted")
// Check if the EphemeralRunnerSet is deleted
Eventually(
@ -273,7 +280,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return fmt.Errorf("EphemeralRunnerSet is not deleted")
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval).Should(Succeed(), "EphemeralRunnerSet should be deleted")
ephemeralRunnerSetTestInterval,
).Should(Succeed(), "EphemeralRunnerSet should be deleted")
})
})
@ -1092,6 +1100,94 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(desiredStatus), "Status is not eventually updated to the desired one")
})
It("Should aggressively drain when patchID is 0 and replicas are 0", func() {
ers := new(v1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated := ers.DeepCopy()
updated.Spec.Replicas = 2
updated.Spec.PatchID = 1
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList := new(v1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace)
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created")
for i := range runnerList.Items {
updatedRunner := runnerList.Items[i].DeepCopy()
updatedRunner.Status.RunnerID = i + 1000
updatedRunner.Status.Phase = v1alpha1.EphemeralRunnerPhaseRunning
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[i]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
}
ers = new(v1alpha1.EphemeralRunnerSet)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = ers.DeepCopy()
updated.Spec.Replicas = 0
updated.Spec.PatchID = 0
if updated.Annotations == nil {
updated.Annotations = map[string]string{}
}
updated.Annotations[annotationKeyRunnerSetDraining] = "true"
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to patch EphemeralRunnerSet into draining mode")
Eventually(
func() (int, error) {
err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace)
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(0), "All runners should be aggressively drained in draining mode")
Eventually(
func() (v1alpha1.EphemeralRunnerSetPhase, error) {
runnerSet := new(v1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, runnerSet)
if err != nil {
return "", err
}
return runnerSet.Status.Phase, nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(Equal(v1alpha1.EphemeralRunnerSetPhaseDraining), "Phase should be Draining while in draining mode")
Consistently(
func() (string, error) {
runnerSet := new(v1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, runnerSet)
if err != nil {
return "", err
}
return runnerSet.Annotations[annotationKeyRunnerSetDraining], nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(Equal("true"), "Draining annotation should remain set")
})
})
})
@ -1175,29 +1271,30 @@ var _ = Describe("Test EphemeralRunnerSet controller with proxy settings", func(
err = k8sClient.Create(ctx, ephemeralRunnerSet)
Expect(err).NotTo(HaveOccurred(), "failed to create EphemeralRunnerSet")
Eventually(func(g Gomega) {
// Compiled / flattened proxy secret should exist at this point
actualProxySecret := &corev1.Secret{}
err = k8sClient.Get(ctx, client.ObjectKey{
Namespace: autoscalingNS.Name,
Name: proxyEphemeralRunnerSetSecretName(ephemeralRunnerSet),
}, actualProxySecret)
g.Expect(err).NotTo(HaveOccurred(), "failed to get compiled / flattened proxy secret")
secretFetcher := func(name string) (*corev1.Secret, error) {
secret := &corev1.Secret{}
Eventually(
func(g Gomega) {
// Compiled / flattened proxy secret should exist at this point
actualProxySecret := &corev1.Secret{}
err = k8sClient.Get(ctx, client.ObjectKey{
Namespace: autoscalingNS.Name,
Name: name,
}, secret)
return secret, err
}
Name: proxyEphemeralRunnerSetSecretName(ephemeralRunnerSet),
}, actualProxySecret)
g.Expect(err).NotTo(HaveOccurred(), "failed to get compiled / flattened proxy secret")
// Assert that the proxy secret is created with the correct values
expectedData, err := ephemeralRunnerSet.Spec.EphemeralRunnerSpec.Proxy.ToSecretData(secretFetcher)
g.Expect(err).NotTo(HaveOccurred(), "failed to get proxy secret data")
g.Expect(actualProxySecret.Data).To(Equal(expectedData))
},
secretFetcher := func(name string) (*corev1.Secret, error) {
secret := &corev1.Secret{}
err = k8sClient.Get(ctx, client.ObjectKey{
Namespace: autoscalingNS.Name,
Name: name,
}, secret)
return secret, err
}
// Assert that the proxy secret is created with the correct values
expectedData, err := ephemeralRunnerSet.Spec.EphemeralRunnerSpec.Proxy.ToSecretData(secretFetcher)
g.Expect(err).NotTo(HaveOccurred(), "failed to get proxy secret data")
g.Expect(actualProxySecret.Data).To(Equal(expectedData))
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(Succeed(), "compiled / flattened proxy secret should exist")
@ -1250,34 +1347,37 @@ var _ = Describe("Test EphemeralRunnerSet controller with proxy settings", func(
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(1), "1 EphemeralRunner should exist")
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(1), "1 EphemeralRunner should exist")
// Delete the EphemeralRunnerSet
err = k8sClient.Delete(ctx, ephemeralRunnerSet)
Expect(err).NotTo(HaveOccurred(), "failed to delete EphemeralRunnerSet")
Eventually(func(g Gomega) (int, error) {
runnerList := new(v1alpha1.EphemeralRunnerList)
err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace)
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
Eventually(
func(g Gomega) (int, error) {
runnerList := new(v1alpha1.EphemeralRunnerList)
err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace)
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(0), "EphemeralRunners should be deleted")
// Assert that the proxy secret is deleted
Eventually(func(g Gomega) {
proxySecret := &corev1.Secret{}
err = k8sClient.Get(ctx, client.ObjectKey{
Namespace: autoscalingNS.Name,
Name: proxyEphemeralRunnerSetSecretName(ephemeralRunnerSet),
}, proxySecret)
g.Expect(err).To(HaveOccurred(), "proxy secret should be deleted")
g.Expect(kerrors.IsNotFound(err)).To(BeTrue(), "proxy secret should be deleted")
},
Eventually(
func(g Gomega) {
proxySecret := &corev1.Secret{}
err = k8sClient.Get(ctx, client.ObjectKey{
Namespace: autoscalingNS.Name,
Name: proxyEphemeralRunnerSetSecretName(ephemeralRunnerSet),
}, proxySecret)
g.Expect(err).To(HaveOccurred(), "proxy secret should be deleted")
g.Expect(kerrors.IsNotFound(err)).To(BeTrue(), "proxy secret should be deleted")
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(Succeed(), "proxy secret should be deleted")
@ -1350,14 +1450,15 @@ var _ = Describe("Test EphemeralRunnerSet controller with proxy settings", func(
Expect(err).NotTo(HaveOccurred(), "failed to create EphemeralRunnerSet")
runnerList := new(v1alpha1.EphemeralRunnerList)
Eventually(func() (int, error) {
err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace)
if err != nil {
return -1, err
}
Eventually(
func() (int, error) {
err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace)
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(1), "failed to create ephemeral runner")
@ -1474,14 +1575,15 @@ var _ = Describe("Test EphemeralRunnerSet controller with custom root CA", func(
Expect(err).NotTo(HaveOccurred(), "failed to create EphemeralRunnerSet")
runnerList := new(v1alpha1.EphemeralRunnerList)
Eventually(func() (int, error) {
err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace)
if err != nil {
return -1, err
}
Eventually(
func() (int, error) {
err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace)
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(1), "failed to create ephemeral runner")