From c6643e91084d65f7e5999f5c34b34387b96886cf Mon Sep 17 00:00:00 2001 From: Nikola Jokic Date: Sat, 16 May 2026 01:10:30 +0200 Subject: [PATCH] wip --- .../v1alpha1/ephemeralrunnerset_types.go | 4 + .../autoscalingrunnerset_controller.go | 100 +++++++- .../autoscalingrunnerset_controller_test.go | 125 ++++++++-- .../ephemeralrunnerset_controller.go | 33 +++ .../ephemeralrunnerset_controller_test.go | 222 +++++++++++++----- 5 files changed, 389 insertions(+), 95 deletions(-) diff --git a/apis/actions.github.com/v1alpha1/ephemeralrunnerset_types.go b/apis/actions.github.com/v1alpha1/ephemeralrunnerset_types.go index 299b7850..8a1a46f6 100644 --- a/apis/actions.github.com/v1alpha1/ephemeralrunnerset_types.go +++ b/apis/actions.github.com/v1alpha1/ephemeralrunnerset_types.go @@ -54,6 +54,10 @@ const ( // EphemeralRunnerSetPhaseOutdated is set when at least one ephemeral runner // contains the outdated phase EphemeralRunnerSetPhaseOutdated EphemeralRunnerSetPhase = "Outdated" + // EphemeralRunnerSetPhaseDraining is set when the runner set is being actively + // drained. In this phase, the desired replicas are expected to be 0 and patchID + // is expected to be 0. + EphemeralRunnerSetPhaseDraining EphemeralRunnerSetPhase = "Draining" ) // +kubebuilder:object:root=true diff --git a/controllers/actions.github.com/autoscalingrunnerset_controller.go b/controllers/actions.github.com/autoscalingrunnerset_controller.go index f31bfd39..608dcd55 100644 --- a/controllers/actions.github.com/autoscalingrunnerset_controller.go +++ b/controllers/actions.github.com/autoscalingrunnerset_controller.go @@ -46,8 +46,9 @@ const ( // annotationKeyValuesHash is hash of the entire values json. // This is used to determine if the values have changed, so we can // re-create listener. - annotationKeyValuesHash = "actions.github.com/values-hash" - annotationKeyChangeHash = "actions.github.com/change-hash" + annotationKeyValuesHash = "actions.github.com/values-hash" + annotationKeyChangeHash = "actions.github.com/change-hash" + annotationKeyRunnerSetDraining = "actions.github.com/runner-set-draining" autoscalingRunnerSetFinalizerName = "autoscalingrunnerset.actions.github.com/finalizer" runnerScaleSetIDAnnotationKey = "runner-scale-set-id" @@ -106,7 +107,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl } log.Info("Deleting resources") - done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, nil, log) + done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, log) if err != nil { log.Error(err, "Failed to clean up resources during deletion") return ctrl.Result{}, err @@ -138,14 +139,16 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl if !v1alpha1.IsVersionAllowed(autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion], build.Version) { if err := r.Delete(ctx, autoscalingRunnerSet); err != nil { - log.Error(err, "Failed to delete autoscaling runner set on version mismatch", + log.Error( + err, "Failed to delete autoscaling runner set on version mismatch", "buildVersion", build.Version, "autoscalingRunnerSetVersion", autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion], ) return ctrl.Result{}, nil } - log.Info("Autoscaling runner set version doesn't match the build version. Deleting the resource.", + log.Info( + "Autoscaling runner set version doesn't match the build version. Deleting the resource.", "buildVersion", build.Version, "autoscalingRunnerSetVersion", autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion], ) @@ -252,7 +255,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl } } - done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, latestRunnerSet, log) + done, err := r.cleanUpResources(ctx, autoscalingRunnerSet, log) if err != nil { log.Error(err, "Failed to clean up resources for outdated runner set") return ctrl.Result{}, err @@ -287,17 +290,22 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl if latestRunnerSet.Annotations[annotationKeyRunnerSpecHash] != autoscalingRunnerSet.RunnerSetSpecHash() { if r.drainingJobs(&latestRunnerSet.Status) { log.Info("Latest runner set spec hash does not match the current autoscaling runner set. Waiting for the running and pending runners to finish:", "running", latestRunnerSet.Status.RunningEphemeralRunners, "pending", latestRunnerSet.Status.PendingEphemeralRunners) - log.Info("Scaling down the number of desired replicas to 0") + log.Info("Scaling down the number of desired replicas to 0 and setting draining phase") // We are in the process of draining the jobs. The listener has been deleted and the ephemeral runner set replicas // need to scale down to 0 err := patch(ctx, r.Client, latestRunnerSet, func(obj *v1alpha1.EphemeralRunnerSet) { obj.Spec.Replicas = 0 obj.Spec.PatchID = 0 + if obj.Annotations == nil { + obj.Annotations = map[string]string{} + } + obj.Annotations[annotationKeyRunnerSetDraining] = "true" }) if err != nil { log.Error(err, "Failed to patch runner set to set desired count to 0") + return ctrl.Result{}, err } - return ctrl.Result{}, err + return ctrl.Result{}, nil } log.Info("Latest runner set spec hash does not match the current autoscaling runner set. Creating a new runner set") return r.createEphemeralRunnerSet(ctx, autoscalingRunnerSet, log) @@ -305,8 +313,19 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl oldRunnerSets := existingRunnerSets.old() if len(oldRunnerSets) > 0 { + log.Info("Ensuring old ephemeral runner sets are draining", "count", len(oldRunnerSets)) + drainingReady, err := r.ensureOldRunnerSetsDraining(ctx, oldRunnerSets, log) + if err != nil { + log.Error(err, "Failed to ensure old runner sets are draining") + return ctrl.Result{}, err + } + if !drainingReady { + log.Info("Waiting for old ephemeral runner sets to enter draining state") + return ctrl.Result{RequeueAfter: 2 * time.Second}, nil + } + log.Info("Cleanup old ephemeral runner sets", "count", len(oldRunnerSets)) - err := r.deleteEphemeralRunnerSets(ctx, oldRunnerSets, log) + err = r.deleteEphemeralRunnerSets(ctx, oldRunnerSets, log) if err != nil { log.Error(err, "Failed to clean up old runner sets") return ctrl.Result{}, err @@ -331,7 +350,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl return ctrl.Result{}, nil } -func (r *AutoscalingRunnerSetReconciler) cleanUpResources(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, latestRunnerSet *v1alpha1.EphemeralRunnerSet, log logr.Logger) (bool, error) { +func (r *AutoscalingRunnerSetReconciler) cleanUpResources(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, log logr.Logger) (bool, error) { log.Info("Deleting the listener") done, err := r.cleanupListener(ctx, autoscalingRunnerSet, log) if err != nil { @@ -430,6 +449,15 @@ func (r *AutoscalingRunnerSetReconciler) cleanupEphemeralRunnerSets(ctx context. return true, nil } + drainingReady, err := r.ensureOldRunnerSetsDraining(ctx, runnerSets.all(), logger) + if err != nil { + return false, fmt.Errorf("failed to set ephemeral runner sets into draining mode: %w", err) + } + if !drainingReady { + logger.Info("Waiting for ephemeral runner sets to enter draining mode") + return false, nil + } + logger.Info("Deleting all ephemeral runner sets", "count", runnerSets.count()) if err := r.deleteEphemeralRunnerSets(ctx, runnerSets.all(), logger); err != nil { return false, fmt.Errorf("failed to delete ephemeral runner sets: %w", err) @@ -440,6 +468,14 @@ func (r *AutoscalingRunnerSetReconciler) cleanupEphemeralRunnerSets(ctx context. func (r *AutoscalingRunnerSetReconciler) deleteEphemeralRunnerSets(ctx context.Context, oldRunnerSets []v1alpha1.EphemeralRunnerSet, logger logr.Logger) error { for i := range oldRunnerSets { rs := &oldRunnerSets[i] + if !r.runnerSetInDrainingMode(rs) { + logger.Info("Skip deleting ephemeral runner set until it is in draining mode", "name", rs.Name, "phase", rs.Status.Phase, "replicas", rs.Spec.Replicas, "patchID", rs.Spec.PatchID) + continue + } + if r.hasRunningOrPendingRunners(&rs.Status) { + logger.Info("Skip deleting ephemeral runner set while it still has running or pending runners", "name", rs.Name, "running", rs.Status.RunningEphemeralRunners, "pending", rs.Status.PendingEphemeralRunners) + continue + } // already deleted but contains finalizer so it still exists if !rs.DeletionTimestamp.IsZero() { logger.Info("Skip ephemeral runner set since it is already marked for deletion", "name", rs.Name) @@ -454,6 +490,47 @@ func (r *AutoscalingRunnerSetReconciler) deleteEphemeralRunnerSets(ctx context.C return nil } +func (r *AutoscalingRunnerSetReconciler) ensureOldRunnerSetsDraining(ctx context.Context, oldRunnerSets []v1alpha1.EphemeralRunnerSet, logger logr.Logger) (bool, error) { + allReady := true + for i := range oldRunnerSets { + rs := &oldRunnerSets[i] + if !rs.DeletionTimestamp.IsZero() { + continue + } + if r.runnerSetInDrainingMode(rs) { + continue + } + + allReady = false + runnerSetCopy := rs.DeepCopy() + logger.Info("Patching old ephemeral runner set into draining mode", "name", rs.Name) + err := patch(ctx, r.Client, runnerSetCopy, func(obj *v1alpha1.EphemeralRunnerSet) { + obj.Spec.Replicas = 0 + obj.Spec.PatchID = 0 + if obj.Annotations == nil { + obj.Annotations = map[string]string{} + } + obj.Annotations[annotationKeyRunnerSetDraining] = "true" + }) + if err != nil { + return false, fmt.Errorf("failed to patch old EphemeralRunnerSet %s into draining mode: %w", rs.Name, err) + } + } + + return allReady, nil +} + +func (r *AutoscalingRunnerSetReconciler) runnerSetInDrainingMode(rs *v1alpha1.EphemeralRunnerSet) bool { + return rs.Spec.Replicas == 0 && rs.Spec.PatchID == 0 && rs.Annotations[annotationKeyRunnerSetDraining] == "true" +} + +func (r *AutoscalingRunnerSetReconciler) hasRunningOrPendingRunners(status *v1alpha1.EphemeralRunnerSetStatus) bool { + if status == nil { + return false + } + return (status.RunningEphemeralRunners + status.PendingEphemeralRunners) > 0 +} + func (r *AutoscalingRunnerSetReconciler) removeFinalizersFromDependentResources(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, logger logr.Logger) error { c := autoscalingRunnerSetFinalizerDependencyCleaner{ client: r.Client, @@ -537,7 +614,8 @@ func (r *AutoscalingRunnerSetReconciler) createRunnerScaleSet(ctx context.Contex RunnerSetting: scaleset.RunnerSetting{ DisableUpdate: true, }, - }) + }, + ) if err != nil { logger.Error(err, "Failed to create a new runner scale set on Actions service") return ctrl.Result{}, err diff --git a/controllers/actions.github.com/autoscalingrunnerset_controller_test.go b/controllers/actions.github.com/autoscalingrunnerset_controller_test.go index a2d9a6b2..beac204f 100644 --- a/controllers/actions.github.com/autoscalingrunnerset_controller_test.go +++ b/controllers/actions.github.com/autoscalingrunnerset_controller_test.go @@ -153,7 +153,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return created.Finalizers[0], nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(BeEquivalentTo(autoscalingRunnerSetFinalizerName), "AutoScalingRunnerSet should have a finalizer") + autoscalingRunnerSetTestInterval, + ).Should(BeEquivalentTo(autoscalingRunnerSetFinalizerName), "AutoScalingRunnerSet should have a finalizer") // Check if runner scale set is created on service Eventually( @@ -202,7 +203,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return fmt.Sprintf("%s/%s", created.Labels[LabelKeyGitHubOrganization], created.Labels[LabelKeyGitHubRepository]), nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(BeEquivalentTo("owner/repo"), "RunnerScaleSet should be created/fetched and update the AutoScalingRunnerSet's label") + autoscalingRunnerSetTestInterval, + ).Should(BeEquivalentTo("owner/repo"), "RunnerScaleSet should be created/fetched and update the AutoScalingRunnerSet's label") // Check if ephemeral runner set is created Eventually( @@ -216,7 +218,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return len(runnerSetList.Items), nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(BeEquivalentTo(1), "Only one EphemeralRunnerSet should be created") + autoscalingRunnerSetTestInterval, + ).Should(BeEquivalentTo(1), "Only one EphemeralRunnerSet should be created") // Check if listener is created Eventually( @@ -224,7 +227,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, new(v1alpha1.AutoscalingListener)) }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be created") + autoscalingRunnerSetTestInterval, + ).Should(Succeed(), "Listener should be created") // Check if status is updated runnerSetList := new(v1alpha1.EphemeralRunnerSetList) @@ -242,7 +246,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, new(v1alpha1.AutoscalingListener)) }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be created") + autoscalingRunnerSetTestInterval, + ).Should(Succeed(), "Listener should be created") // Delete the AutoScalingRunnerSet err := k8sClient.Delete(ctx, autoscalingRunnerSet) @@ -259,7 +264,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return fmt.Errorf("listener is not deleted") }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be deleted") + autoscalingRunnerSetTestInterval, + ).Should(Succeed(), "Listener should be deleted") // Check if all the EphemeralRunnerSet is deleted Eventually( @@ -277,7 +283,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(Succeed(), "All EphemeralRunnerSet should be deleted") + autoscalingRunnerSetTestInterval, + ).Should(Succeed(), "All EphemeralRunnerSet should be deleted") // Check if the AutoScalingRunnerSet is deleted Eventually( @@ -290,7 +297,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return fmt.Errorf("AutoScalingRunnerSet is not deleted") }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(Succeed(), "AutoScalingRunnerSet should be deleted") + autoscalingRunnerSetTestInterval, + ).Should(Succeed(), "AutoScalingRunnerSet should be deleted") }) }) @@ -303,7 +311,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, listener) }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be created") + autoscalingRunnerSetTestInterval, + ).Should(Succeed(), "Listener should be created") runnerSetList := new(v1alpha1.EphemeralRunnerSetList) err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace)) @@ -339,7 +348,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return runnerSetList.Items[0].Annotations[annotationKeyRunnerSpecHash], nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(runnerSet.Annotations[annotationKeyRunnerSpecHash]), "New EphemeralRunnerSet should be created") + autoscalingRunnerSetTestInterval, + ).ShouldNot(BeEquivalentTo(runnerSet.Annotations[annotationKeyRunnerSpecHash]), "New EphemeralRunnerSet should be created") // We should create a new listener Eventually( @@ -353,7 +363,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return listener.Spec.EphemeralRunnerSetName, nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(runnerSet.Name), "New Listener should be created") + autoscalingRunnerSetTestInterval, + ).ShouldNot(BeEquivalentTo(runnerSet.Name), "New Listener should be created") // Only update the Spec for the AutoScalingListener // This should trigger re-creation of the Listener only @@ -389,7 +400,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return string(runnerSetList.Items[0].UID), nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(BeEquivalentTo(string(runnerSet.UID)), "New EphemeralRunnerSet should not be created") + autoscalingRunnerSetTestInterval, + ).Should(BeEquivalentTo(string(runnerSet.UID)), "New EphemeralRunnerSet should not be created") // We should only re-create a new listener Eventually( @@ -403,7 +415,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return string(listener.UID), nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(string(listener.UID)), "New Listener should be created") + autoscalingRunnerSetTestInterval, + ).ShouldNot(BeEquivalentTo(string(listener.UID)), "New Listener should be created") // Only update the values hash for the autoscaling runner set // This should trigger re-creation of the Listener only @@ -438,7 +451,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return string(runnerSetList.Items[0].UID), nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(BeEquivalentTo(string(runnerSet.UID)), "New EphemeralRunnerSet should not be created") + autoscalingRunnerSetTestInterval, + ).Should(BeEquivalentTo(string(runnerSet.UID)), "New EphemeralRunnerSet should not be created") // We should only re-create a new listener Eventually( @@ -452,7 +466,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return string(listener.UID), nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(string(listener.UID)), "New Listener should be created") + autoscalingRunnerSetTestInterval, + ).ShouldNot(BeEquivalentTo(string(listener.UID)), "New Listener should be created") }) It("It should update RunnerScaleSet's runner group on service when it changes", func() { @@ -463,7 +478,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, new(v1alpha1.AutoscalingListener)) }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(Succeed(), "Listener should be created") + autoscalingRunnerSetTestInterval, + ).Should(Succeed(), "Listener should be created") patched := autoscalingRunnerSet.DeepCopy() patched.Spec.RunnerGroup = "testgroup2" @@ -485,7 +501,8 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { return updated.Annotations[AnnotationKeyGitHubRunnerGroupName], nil }, autoscalingRunnerSetTestTimeout, - autoscalingRunnerSetTestInterval).Should(BeEquivalentTo("testgroup2"), "AutoScalingRunnerSet should have the new runner group in its annotation") + autoscalingRunnerSetTestInterval, + ).Should(BeEquivalentTo("testgroup2"), "AutoScalingRunnerSet should have the new runner group in its annotation") // delete the annotation and it should be re-added patched = autoscalingRunnerSet.DeepCopy() @@ -511,6 +528,65 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { autoscalingRunnerSetTestInterval, ).Should(BeEquivalentTo("testgroup2"), "AutoScalingRunnerSet should have the runner group in its annotation") }) + + It("It should patch old ephemeral runner sets into draining mode before deleting them", func() { + runnerSetList := new(v1alpha1.EphemeralRunnerSetList) + Eventually( + func() (int, error) { + err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace)) + if err != nil { + return 0, err + } + return len(runnerSetList.Items), nil + }, + autoscalingRunnerSetTestTimeout, + autoscalingRunnerSetTestInterval, + ).Should(BeEquivalentTo(1), "Only one EphemeralRunnerSet should exist initially") + + oldRunnerSet := runnerSetList.Items[0] + + patched := autoscalingRunnerSet.DeepCopy() + patched.Spec.Template.Spec.PriorityClassName = "test-priority-class-draining" + if patched.Annotations == nil { + patched.Annotations = make(map[string]string) + } + patched.Annotations[annotationKeyValuesHash] = "draining-mode-hash" + err := k8sClient.Patch(ctx, patched, client.MergeFrom(autoscalingRunnerSet)) + Expect(err).NotTo(HaveOccurred(), "failed to patch AutoScalingRunnerSet") + autoscalingRunnerSet = patched.DeepCopy() + + Eventually( + func() (int, error) { + list := new(v1alpha1.EphemeralRunnerSetList) + err := k8sClient.List(ctx, list, client.InNamespace(autoscalingRunnerSet.Namespace)) + if err != nil { + return 0, err + } + return len(list.Items), nil + }, + autoscalingRunnerSetTestTimeout, + autoscalingRunnerSetTestInterval, + ).Should(BeNumerically(">=", 2), "Expected both old and new EphemeralRunnerSet during transition") + + Eventually( + func() (string, error) { + old := new(v1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: oldRunnerSet.Name, Namespace: oldRunnerSet.Namespace}, old) + if err != nil { + if errors.IsNotFound(err) { + return "true", nil + } + return "", err + } + if old.Spec.PatchID != 0 || old.Spec.Replicas != 0 { + return "", fmt.Errorf("old runner set not patched into draining mode yet: patchID=%d replicas=%d", old.Spec.PatchID, old.Spec.Replicas) + } + return old.Annotations[annotationKeyRunnerSetDraining], nil + }, + autoscalingRunnerSetTestTimeout, + autoscalingRunnerSetTestInterval, + ).Should(Equal("true"), "Old runner set should be marked as draining") + }) }) Context("When updating an AutoscalingRunnerSet with running or pending jobs", func() { @@ -642,13 +718,14 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { ).Should(BeTrue(), "AutoscalingRunnerSet should be created") runnerSetList := new(v1alpha1.EphemeralRunnerSetList) - Eventually(func() (int, error) { - err := k8sClient.List(ctx, runnerSetList, client.InNamespace(ars.Namespace)) - if err != nil { - return 0, err - } - return len(runnerSetList.Items), nil - }, + Eventually( + func() (int, error) { + err := k8sClient.List(ctx, runnerSetList, client.InNamespace(ars.Namespace)) + if err != nil { + return 0, err + } + return len(runnerSetList.Items), nil + }, autoscalingRunnerSetTestTimeout, autoscalingRunnerSetTestInterval, ).Should(BeEquivalentTo(1), "Failed to fetch runner set list") diff --git a/controllers/actions.github.com/ephemeralrunnerset_controller.go b/controllers/actions.github.com/ephemeralrunnerset_controller.go index acfcba57..6ed3c7ea 100644 --- a/controllers/actions.github.com/ephemeralrunnerset_controller.go +++ b/controllers/actions.github.com/ephemeralrunnerset_controller.go @@ -195,6 +195,13 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R } total := ephemeralRunnersByState.scaleTotal() + if r.isDrainingMode(ephemeralRunnerSet) { + if err := r.reconcileDrainingRunnerSet(ctx, ephemeralRunnerSet, ephemeralRunnersByState, total, log); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, r.updateStatus(ctx, ephemeralRunnerSet, ephemeralRunnersByState, log) + } + if ephemeralRunnerSet.Spec.PatchID == 0 || ephemeralRunnerSet.Spec.PatchID != ephemeralRunnersByState.latestPatchID { defer func() { if err := r.cleanupFinishedEphemeralRunners(ctx, ephemeralRunnersByState.finished, log); err != nil { @@ -240,6 +247,8 @@ func (r *EphemeralRunnerSetReconciler) updateStatus(ctx context.Context, ephemer total := state.scaleTotal() var phase v1alpha1.EphemeralRunnerSetPhase switch { + case r.isDrainingMode(ephemeralRunnerSet): + phase = v1alpha1.EphemeralRunnerSetPhaseDraining case len(state.outdated) > 0: phase = v1alpha1.EphemeralRunnerSetPhaseOutdated case ephemeralRunnerSet.Status.Phase == "": @@ -270,6 +279,30 @@ func (r *EphemeralRunnerSetReconciler) updateStatus(ctx context.Context, ephemer return nil } +func (r *EphemeralRunnerSetReconciler) isDrainingMode(ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet) bool { + if ephemeralRunnerSet == nil { + return false + } + return ephemeralRunnerSet.Spec.PatchID == 0 && ephemeralRunnerSet.Spec.Replicas == 0 && ephemeralRunnerSet.Annotations[annotationKeyRunnerSetDraining] == "true" +} + +func (r *EphemeralRunnerSetReconciler) reconcileDrainingRunnerSet(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, state *ephemeralRunnersByState, total int, log logr.Logger) error { + terminated := state.terminated() + for i := range terminated { + log.Info("Deleting terminated ephemeral runner in draining mode", "name", terminated[i].Name) + if err := r.Delete(ctx, terminated[i]); err != nil && !kerrors.IsNotFound(err) { + return err + } + } + + if total == 0 { + return nil + } + + log.Info("Draining ephemeral runner set aggressively", "pending", len(state.pending), "running", len(state.running)) + return r.deleteIdleEphemeralRunners(ctx, ephemeralRunnerSet, state.pending, state.running, len(state.pending)+len(state.running), log) +} + func (r *EphemeralRunnerSetReconciler) cleanupFinishedEphemeralRunners(ctx context.Context, finishedEphemeralRunners []*v1alpha1.EphemeralRunner, log logr.Logger) error { // cleanup finished runners and proceed var errs []error diff --git a/controllers/actions.github.com/ephemeralrunnerset_controller_test.go b/controllers/actions.github.com/ephemeralrunnerset_controller_test.go index 4a0a41b3..585e7a95 100644 --- a/controllers/actions.github.com/ephemeralrunnerset_controller_test.go +++ b/controllers/actions.github.com/ephemeralrunnerset_controller_test.go @@ -113,7 +113,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return created.Finalizers[0], nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(ephemeralRunnerSetFinalizerName), "EphemeralRunnerSet should have a finalizer") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(ephemeralRunnerSetFinalizerName), "EphemeralRunnerSet should have a finalizer") // Check if the number of ephemeral runners are stay 0 Consistently( @@ -126,7 +127,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(0), "No EphemeralRunner should be created") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(0), "No EphemeralRunner should be created") // Check if the status stay 0 Consistently( @@ -140,7 +142,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return int(runnerSet.Status.CurrentReplicas), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(0), "EphemeralRunnerSet status should be 0") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(0), "EphemeralRunnerSet status should be 0") // Scaling up the EphemeralRunnerSet updated := created.DeepCopy() @@ -178,7 +181,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") // Check if the status is updated Eventually( @@ -192,7 +196,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return int(runnerSet.Status.CurrentReplicas), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "EphemeralRunnerSet status should be 5") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(5), "EphemeralRunnerSet status should be 5") }) }) @@ -238,7 +243,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") // Delete the EphemeralRunnerSet err = k8sClient.Delete(ctx, created) @@ -255,7 +261,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(0), "All EphemeralRunner should be deleted") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(0), "All EphemeralRunner should be deleted") // Check if the EphemeralRunnerSet is deleted Eventually( @@ -273,7 +280,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return fmt.Errorf("EphemeralRunnerSet is not deleted") }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(Succeed(), "EphemeralRunnerSet should be deleted") + ephemeralRunnerSetTestInterval, + ).Should(Succeed(), "EphemeralRunnerSet should be deleted") }) }) @@ -1092,6 +1100,94 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { ephemeralRunnerSetTestInterval, ).Should(BeEquivalentTo(desiredStatus), "Status is not eventually updated to the desired one") }) + + It("Should aggressively drain when patchID is 0 and replicas are 0", func() { + ers := new(v1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated := ers.DeepCopy() + updated.Spec.Replicas = 2 + updated.Spec.PatchID = 1 + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + + runnerList := new(v1alpha1.EphemeralRunnerList) + Eventually( + func() (int, error) { + err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace) + if err != nil { + return -1, err + } + + return len(runnerList.Items), nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") + + for i := range runnerList.Items { + updatedRunner := runnerList.Items[i].DeepCopy() + updatedRunner.Status.RunnerID = i + 1000 + updatedRunner.Status.Phase = v1alpha1.EphemeralRunnerPhaseRunning + err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[i])) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") + } + + ers = new(v1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + + updated = ers.DeepCopy() + updated.Spec.Replicas = 0 + updated.Spec.PatchID = 0 + if updated.Annotations == nil { + updated.Annotations = map[string]string{} + } + updated.Annotations[annotationKeyRunnerSetDraining] = "true" + + err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers)) + Expect(err).NotTo(HaveOccurred(), "failed to patch EphemeralRunnerSet into draining mode") + + Eventually( + func() (int, error) { + err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace) + if err != nil { + return -1, err + } + return len(runnerList.Items), nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(0), "All runners should be aggressively drained in draining mode") + + Eventually( + func() (v1alpha1.EphemeralRunnerSetPhase, error) { + runnerSet := new(v1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, runnerSet) + if err != nil { + return "", err + } + return runnerSet.Status.Phase, nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(Equal(v1alpha1.EphemeralRunnerSetPhaseDraining), "Phase should be Draining while in draining mode") + + Consistently( + func() (string, error) { + runnerSet := new(v1alpha1.EphemeralRunnerSet) + err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, runnerSet) + if err != nil { + return "", err + } + return runnerSet.Annotations[annotationKeyRunnerSetDraining], nil + }, + ephemeralRunnerSetTestTimeout, + ephemeralRunnerSetTestInterval, + ).Should(Equal("true"), "Draining annotation should remain set") + }) }) }) @@ -1175,29 +1271,30 @@ var _ = Describe("Test EphemeralRunnerSet controller with proxy settings", func( err = k8sClient.Create(ctx, ephemeralRunnerSet) Expect(err).NotTo(HaveOccurred(), "failed to create EphemeralRunnerSet") - Eventually(func(g Gomega) { - // Compiled / flattened proxy secret should exist at this point - actualProxySecret := &corev1.Secret{} - err = k8sClient.Get(ctx, client.ObjectKey{ - Namespace: autoscalingNS.Name, - Name: proxyEphemeralRunnerSetSecretName(ephemeralRunnerSet), - }, actualProxySecret) - g.Expect(err).NotTo(HaveOccurred(), "failed to get compiled / flattened proxy secret") - - secretFetcher := func(name string) (*corev1.Secret, error) { - secret := &corev1.Secret{} + Eventually( + func(g Gomega) { + // Compiled / flattened proxy secret should exist at this point + actualProxySecret := &corev1.Secret{} err = k8sClient.Get(ctx, client.ObjectKey{ Namespace: autoscalingNS.Name, - Name: name, - }, secret) - return secret, err - } + Name: proxyEphemeralRunnerSetSecretName(ephemeralRunnerSet), + }, actualProxySecret) + g.Expect(err).NotTo(HaveOccurred(), "failed to get compiled / flattened proxy secret") - // Assert that the proxy secret is created with the correct values - expectedData, err := ephemeralRunnerSet.Spec.EphemeralRunnerSpec.Proxy.ToSecretData(secretFetcher) - g.Expect(err).NotTo(HaveOccurred(), "failed to get proxy secret data") - g.Expect(actualProxySecret.Data).To(Equal(expectedData)) - }, + secretFetcher := func(name string) (*corev1.Secret, error) { + secret := &corev1.Secret{} + err = k8sClient.Get(ctx, client.ObjectKey{ + Namespace: autoscalingNS.Name, + Name: name, + }, secret) + return secret, err + } + + // Assert that the proxy secret is created with the correct values + expectedData, err := ephemeralRunnerSet.Spec.EphemeralRunnerSpec.Proxy.ToSecretData(secretFetcher) + g.Expect(err).NotTo(HaveOccurred(), "failed to get proxy secret data") + g.Expect(actualProxySecret.Data).To(Equal(expectedData)) + }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, ).Should(Succeed(), "compiled / flattened proxy secret should exist") @@ -1250,34 +1347,37 @@ var _ = Describe("Test EphemeralRunnerSet controller with proxy settings", func( return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(1), "1 EphemeralRunner should exist") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(1), "1 EphemeralRunner should exist") // Delete the EphemeralRunnerSet err = k8sClient.Delete(ctx, ephemeralRunnerSet) Expect(err).NotTo(HaveOccurred(), "failed to delete EphemeralRunnerSet") - Eventually(func(g Gomega) (int, error) { - runnerList := new(v1alpha1.EphemeralRunnerList) - err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace) - if err != nil { - return -1, err - } - return len(runnerList.Items), nil - }, + Eventually( + func(g Gomega) (int, error) { + runnerList := new(v1alpha1.EphemeralRunnerList) + err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace) + if err != nil { + return -1, err + } + return len(runnerList.Items), nil + }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, ).Should(BeEquivalentTo(0), "EphemeralRunners should be deleted") // Assert that the proxy secret is deleted - Eventually(func(g Gomega) { - proxySecret := &corev1.Secret{} - err = k8sClient.Get(ctx, client.ObjectKey{ - Namespace: autoscalingNS.Name, - Name: proxyEphemeralRunnerSetSecretName(ephemeralRunnerSet), - }, proxySecret) - g.Expect(err).To(HaveOccurred(), "proxy secret should be deleted") - g.Expect(kerrors.IsNotFound(err)).To(BeTrue(), "proxy secret should be deleted") - }, + Eventually( + func(g Gomega) { + proxySecret := &corev1.Secret{} + err = k8sClient.Get(ctx, client.ObjectKey{ + Namespace: autoscalingNS.Name, + Name: proxyEphemeralRunnerSetSecretName(ephemeralRunnerSet), + }, proxySecret) + g.Expect(err).To(HaveOccurred(), "proxy secret should be deleted") + g.Expect(kerrors.IsNotFound(err)).To(BeTrue(), "proxy secret should be deleted") + }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, ).Should(Succeed(), "proxy secret should be deleted") @@ -1350,14 +1450,15 @@ var _ = Describe("Test EphemeralRunnerSet controller with proxy settings", func( Expect(err).NotTo(HaveOccurred(), "failed to create EphemeralRunnerSet") runnerList := new(v1alpha1.EphemeralRunnerList) - Eventually(func() (int, error) { - err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace) - if err != nil { - return -1, err - } + Eventually( + func() (int, error) { + err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace) + if err != nil { + return -1, err + } - return len(runnerList.Items), nil - }, + return len(runnerList.Items), nil + }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, ).Should(BeEquivalentTo(1), "failed to create ephemeral runner") @@ -1474,14 +1575,15 @@ var _ = Describe("Test EphemeralRunnerSet controller with custom root CA", func( Expect(err).NotTo(HaveOccurred(), "failed to create EphemeralRunnerSet") runnerList := new(v1alpha1.EphemeralRunnerList) - Eventually(func() (int, error) { - err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace) - if err != nil { - return -1, err - } + Eventually( + func() (int, error) { + err := listEphemeralRunnersAndRemoveFinalizers(ctx, k8sClient, runnerList, ephemeralRunnerSet.Namespace) + if err != nil { + return -1, err + } - return len(runnerList.Items), nil - }, + return len(runnerList.Items), nil + }, ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestInterval, ).Should(BeEquivalentTo(1), "failed to create ephemeral runner")