diff --git a/controllers/runner_controller.go b/controllers/runner_controller.go index 3b607f47..b624a6e1 100644 --- a/controllers/runner_controller.go +++ b/controllers/runner_controller.go @@ -602,7 +602,7 @@ func (r *RunnerReconciler) processRunnerCreation(ctx context.Context, runner v1a // - (false, err) when it postponed unregistration due to the runner being busy, or it tried to unregister the runner but failed due to // an error returned by GitHub API. func (r *RunnerReconciler) unregisterRunner(ctx context.Context, enterprise, org, repo, name string) (bool, error) { - return unregisterRunner(ctx, r.GitHubClient, enterprise, org, repo, name) + return unregisterRunner(ctx, r.GitHubClient, enterprise, org, repo, name, nil) } func (r *RunnerReconciler) updateRegistrationToken(ctx context.Context, runner v1alpha1.Runner) (bool, error) { diff --git a/controllers/runner_graceful_stop.go b/controllers/runner_graceful_stop.go index 8a6dda2c..00f13c90 100644 --- a/controllers/runner_graceful_stop.go +++ b/controllers/runner_graceful_stop.go @@ -4,19 +4,33 @@ import ( "context" "errors" "fmt" + "strconv" "time" "github.com/actions-runner-controller/actions-runner-controller/github" "github.com/go-logr/logr" gogithub "github.com/google/go-github/v39/github" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) const ( + // unregistrationCompleteTimestamp is the annotation that is added onto the pod once the previously started unregistration process has been completed. unregistrationCompleteTimestamp = "unregistration-complete-timestamp" - unregistrationStartTimestamp = "unregistration-start-timestamp" + + // unregistarionStartTimestamp is the annotation that contains the time that the requested unregistration process has been started + unregistrationStartTimestamp = "unregistration-start-timestamp" + + // unregistrationRequestTimestamp is the annotation that contains the time that the unregistration has been requested. + // This doesn't immediately start the unregistration. Instead, ARC will first check if the runner has already been registered. + // If not, ARC will hold on until the registration to complete first, and only after that it starts the unregistration process. + // This is crucial to avoid a race between ARC marking the runner pod for deletion while the actions-runner registers itself to GitHub, leaving the assigned job + // hang like forever. + unregistrationRequestTimestamp = "unregistration-request-timestamp" + + annotationKeyRunnerID = "runner-id" // DefaultUnregistrationTimeout is the duration until ARC gives up retrying the combo of ListRunners API (to detect the runner ID by name) // and RemoveRunner API (to actually unregister the runner) calls. @@ -40,48 +54,61 @@ const ( // When it wants to be retried later, the function returns a non-nil *ctrl.Result as the second return value, may or may not populating the error in the second return value. // The caller is expected to return the returned ctrl.Result and error to postpone the current reconcilation loop and trigger a scheduled retry. func tickRunnerGracefulStop(ctx context.Context, unregistrationTimeout time.Duration, retryDelay time.Duration, log logr.Logger, ghClient *github.Client, c client.Client, enterprise, organization, repository, runner string, pod *corev1.Pod) (*corev1.Pod, *ctrl.Result, error) { - if pod != nil { - if _, ok := getAnnotation(pod, unregistrationStartTimestamp); !ok { - updated := pod.DeepCopy() - setAnnotation(updated, unregistrationStartTimestamp, time.Now().Format(time.RFC3339)) - if err := c.Patch(ctx, updated, client.MergeFrom(pod)); err != nil { - log.Error(err, fmt.Sprintf("Failed to patch pod to have %s annotation", unregistrationStartTimestamp)) - return nil, &ctrl.Result{}, err - } - pod = updated - - log.Info("Runner has started unregistration") - } else { - log.Info("Runner has already started unregistration") - } + pod, err := annotatePodOnce(ctx, c, log, pod, unregistrationStartTimestamp, time.Now().Format(time.RFC3339)) + if err != nil { + return nil, &ctrl.Result{}, err } if res, err := ensureRunnerUnregistration(ctx, unregistrationTimeout, retryDelay, log, ghClient, enterprise, organization, repository, runner, pod); res != nil { return nil, res, err } - if pod != nil { - if _, ok := getAnnotation(pod, unregistrationCompleteTimestamp); !ok { - updated := pod.DeepCopy() - setAnnotation(updated, unregistrationCompleteTimestamp, time.Now().Format(time.RFC3339)) - if err := c.Patch(ctx, updated, client.MergeFrom(pod)); err != nil { - log.Error(err, fmt.Sprintf("Failed to patch pod to have %s annotation", unregistrationCompleteTimestamp)) - return nil, &ctrl.Result{}, err - } - pod = updated - - log.Info("Runner has completed unregistration") - } else { - log.Info("Runner has already completed unregistration") - } + pod, err = annotatePodOnce(ctx, c, log, pod, unregistrationCompleteTimestamp, time.Now().Format(time.RFC3339)) + if err != nil { + return nil, &ctrl.Result{}, err } return pod, nil, nil } +// annotatePodOnce annotates the pod if it wasn't. +// Returns the provided pod as-is if it was already annotated. +// Returns the updated pod if the pod was missing the annotation and the update to add the annotation succeeded. +func annotatePodOnce(ctx context.Context, c client.Client, log logr.Logger, pod *corev1.Pod, k, v string) (*corev1.Pod, error) { + if pod == nil { + return nil, nil + } + + if _, ok := getAnnotation(&pod.ObjectMeta, k); ok { + return pod, nil + } + + updated := pod.DeepCopy() + setAnnotation(&updated.ObjectMeta, k, v) + if err := c.Patch(ctx, updated, client.MergeFrom(pod)); err != nil { + log.Error(err, fmt.Sprintf("Failed to patch pod to have %s annotation", k)) + return nil, err + } + + log.V(2).Info("Annotated pod", "key", k, "value", v) + + return updated, nil +} + // If the first return value is nil, it's safe to delete the runner pod. func ensureRunnerUnregistration(ctx context.Context, unregistrationTimeout time.Duration, retryDelay time.Duration, log logr.Logger, ghClient *github.Client, enterprise, organization, repository, runner string, pod *corev1.Pod) (*ctrl.Result, error) { - ok, err := unregisterRunner(ctx, ghClient, enterprise, organization, repository, runner) + var runnerID *int64 + + if id, ok := getAnnotation(&pod.ObjectMeta, annotationKeyRunnerID); ok { + v, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return &ctrl.Result{}, err + } + + runnerID = &v + } + + ok, err := unregisterRunner(ctx, ghClient, enterprise, organization, repository, runner, runnerID) if err != nil { if errors.Is(err, &gogithub.RateLimitError{}) { // We log the underlying error when we failed calling GitHub API to list or unregisters, @@ -125,7 +152,7 @@ func ensureRunnerUnregistration(ctx context.Context, unregistrationTimeout time. return &ctrl.Result{}, err } else if ok { - log.Info("Runner has just been unregistered. Removing the runner pod.") + log.Info("Runner has just been unregistered.") } else if pod == nil { // `r.unregisterRunner()` will returns `false, nil` if the runner is not found on GitHub. // However, that doesn't always mean the pod can be safely removed. @@ -174,22 +201,47 @@ func ensureRunnerUnregistration(ctx context.Context, unregistrationTimeout time. return nil, nil } -func getAnnotation(pod *corev1.Pod, key string) (string, bool) { - if pod.Annotations == nil { +func ensureRunnerPodRegistered(ctx context.Context, log logr.Logger, ghClient *github.Client, c client.Client, enterprise, organization, repository, runner string, pod *corev1.Pod) (*corev1.Pod, *ctrl.Result, error) { + _, hasRunnerID := getAnnotation(&pod.ObjectMeta, annotationKeyRunnerID) + if runnerPodOrContainerIsStopped(pod) || hasRunnerID { + return pod, nil, nil + } + + r, err := getRunner(ctx, ghClient, enterprise, organization, repository, runner) + if err != nil { + return nil, &ctrl.Result{RequeueAfter: 10 * time.Second}, err + } + + if r == nil || r.ID == nil { + return nil, &ctrl.Result{RequeueAfter: 10 * time.Second}, err + } + + id := *r.ID + + updated, err := annotatePodOnce(ctx, c, log, pod, annotationKeyRunnerID, fmt.Sprintf("%d", id)) + if err != nil { + return nil, &ctrl.Result{RequeueAfter: 10 * time.Second}, err + } + + return updated, nil, nil +} + +func getAnnotation(meta *metav1.ObjectMeta, key string) (string, bool) { + if meta.Annotations == nil { return "", false } - v, ok := pod.Annotations[key] + v, ok := meta.Annotations[key] return v, ok } -func setAnnotation(pod *corev1.Pod, key, value string) { - if pod.Annotations == nil { - pod.Annotations = map[string]string{} +func setAnnotation(meta *metav1.ObjectMeta, key, value string) { + if meta.Annotations == nil { + meta.Annotations = map[string]string{} } - pod.Annotations[key] = value + meta.Annotations[key] = value } // unregisterRunner unregisters the runner from GitHub Actions by name. @@ -227,17 +279,19 @@ func setAnnotation(pod *corev1.Pod, key, value string) { // There isn't a single right grace period that works for everyone. // The longer the grace period is, the earlier a cluster resource shortage can occur due to throttoled runner pod deletions, // while the shorter the grace period is, the more likely you may encounter the race issue. -func unregisterRunner(ctx context.Context, client *github.Client, enterprise, org, repo, name string) (bool, error) { - runner, err := getRunner(ctx, client, enterprise, org, repo, name) - if err != nil { - return false, err - } +func unregisterRunner(ctx context.Context, client *github.Client, enterprise, org, repo, name string, id *int64) (bool, error) { + if id == nil { + runner, err := getRunner(ctx, client, enterprise, org, repo, name) + if err != nil { + return false, err + } - if runner == nil || runner.ID == nil { - return false, nil - } + if runner == nil || runner.ID == nil { + return false, nil + } - id := *runner.ID + id = runner.ID + } // For the record, historically ARC did not try to call RemoveRunner on a busy runner, but it's no longer true. // The reason ARC did so was to let a runner running a job to not stop prematurely. @@ -259,7 +313,7 @@ func unregisterRunner(ctx context.Context, client *github.Client, enterprise, or // change from 60 seconds. // // TODO: Probably we can just remove the runner by ID without seeing if the runner is busy, by treating it as busy when a remove-runner call failed with 422? - if err := client.RemoveRunner(ctx, enterprise, org, repo, id); err != nil { + if err := client.RemoveRunner(ctx, enterprise, org, repo, *id); err != nil { return false, err } diff --git a/controllers/runner_pod_controller.go b/controllers/runner_pod_controller.go index 6b5c978e..64874002 100644 --- a/controllers/runner_pod_controller.go +++ b/controllers/runner_pod_controller.go @@ -102,9 +102,13 @@ func (r *RunnerPodReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + log.V(2).Info("Added finalizer") + return ctrl.Result{}, nil } } else { + log.V(2).Info("Seen deletion-timestamp is already set") + finalizers, removed := removeFinalizer(runnerPod.ObjectMeta.Finalizers, runnerPodFinalizerName) if removed { @@ -122,7 +126,9 @@ func (r *RunnerPodReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } - log.Info("Removed runner from GitHub", "repository", repo, "organization", org) + log.V(2).Info("Removed finalizer") + + return ctrl.Result{}, nil } deletionTimeout := 1 * time.Minute @@ -160,6 +166,35 @@ func (r *RunnerPodReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, nil } + po, res, err := ensureRunnerPodRegistered(ctx, log, r.GitHubClient, r.Client, enterprise, org, repo, runnerPod.Name, &runnerPod) + if res != nil { + return *res, err + } + + runnerPod = *po + + if _, unregistrationRequested := getAnnotation(&runnerPod.ObjectMeta, unregistrationRequestTimestamp); unregistrationRequested { + log.V(2).Info("Progressing unregistration because unregistration-request timestamp is set") + + // At this point we're sure that DeletionTimestamp is not set yet, but the unregistration process is triggered by an upstream controller like runnerset-controller. + // + // In a standard scenario, ARC starts the unregistration process before marking the pod for deletion at all, + // so that it isn't subject to terminationGracePeriod and can safely take hours to finish it's work. + _, res, err := tickRunnerGracefulStop(ctx, r.unregistrationTimeout(), r.unregistrationRetryDelay(), log, r.GitHubClient, r.Client, enterprise, org, repo, runnerPod.Name, &runnerPod) + if res != nil { + return *res, err + } + + // At this point we are sure that the runner has successfully unregistered, hence is safe to be deleted. + // But we don't delete the pod here. Instead, let the upstream controller/parent object to delete this pod as + // a part of a cascade deletion. + // This is to avoid a parent object, like statefulset, to recreate the deleted pod. + // If the pod was recreated, it will start a registration process and that may race with the statefulset deleting the pod. + log.V(2).Info("Unregistration seems complete") + + return ctrl.Result{}, nil + } + // If pod has ended up succeeded we need to restart it // Happens e.g. when dind is in runner and run completes stopped := runnerPod.Status.Phase == corev1.PodSucceeded diff --git a/controllers/runnerset_controller.go b/controllers/runnerset_controller.go index d4514f53..64d61867 100644 --- a/controllers/runnerset_controller.go +++ b/controllers/runnerset_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "fmt" "reflect" "sort" "time" @@ -154,10 +155,53 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + // Statefulset termination process 4/4: Let Kubernetes cascade-delete the statefulset and the pods. if !res.statefulset.DeletionTimestamp.IsZero() { continue } + // Statefulset termination process 3/4: Set the deletionTimestamp to let Kubernetes start a cascade deletion of the statefulset and the pods. + if _, ok := getAnnotation(&res.statefulset.ObjectMeta, unregistrationCompleteTimestamp); ok { + if err := r.Client.Delete(ctx, res.statefulset); err != nil { + log.Error(err, "Failed to delete statefulset") + return ctrl.Result{}, err + } + continue + } + + // Statefulset termination process 2/4: Set unregistrationCompleteTimestamp only if all the pods managed by the statefulset + // have either unregistered or being deleted. + if _, ok := getAnnotation(&res.statefulset.ObjectMeta, unregistrationRequestTimestamp); ok { + var deletionSafe int + for _, po := range res.pods { + if _, ok := getAnnotation(&po.ObjectMeta, unregistrationCompleteTimestamp); ok { + deletionSafe++ + } else if !po.DeletionTimestamp.IsZero() { + deletionSafe++ + } + } + + log.V(2).Info("Marking statefulset for unregistration completion", "deletionSafe", deletionSafe, "total", res.total) + + if deletionSafe == res.total { + if _, ok := getAnnotation(&res.statefulset.ObjectMeta, unregistrationCompleteTimestamp); !ok { + updated := res.statefulset.DeepCopy() + setAnnotation(&updated.ObjectMeta, unregistrationCompleteTimestamp, time.Now().Format(time.RFC3339)) + + if err := r.Client.Patch(ctx, updated, client.MergeFrom(res.statefulset)); err != nil { + log.Error(err, fmt.Sprintf("Failed to patch statefulset to have %s annotation", unregistrationCompleteTimestamp)) + return ctrl.Result{}, err + } + + log.V(2).Info("Redundant statefulset has been annotated to start the deletion") + } else { + log.V(2).Info("BUG: Redundant statefulset was already annotated to start the deletion") + } + } + + continue + } + if res.statefulset.Annotations != nil { if a, ok := res.statefulset.Annotations[SyncTimeAnnotationKey]; ok { t, err := time.Parse(time.RFC3339, a) @@ -250,34 +294,58 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( log.V(2).Info("Created statefulset(s) to add more replicas", "num", num) return ctrl.Result{}, nil - } else if newDesiredReplicas < running { + } else if newDesiredReplicas <= running { var retained int - var lastIndex int + + var delete []*podsForStatefulset for i := len(currentStatefulSets) - 1; i >= 0; i-- { ss := currentStatefulSets[i] - retained += ss.running - if retained >= newDesiredReplicas { - lastIndex = i - break + + if ss.running == 0 || retained >= newDesiredReplicas { + delete = append(delete, ss) + } else if retained < newDesiredReplicas { + retained += ss.running } } if retained == newDesiredReplicas { - for i := 0; i < lastIndex; i++ { - ss := currentStatefulSets[i] + for _, ss := range delete { log := log.WithValues("statefulset", types.NamespacedName{Namespace: ss.statefulset.Namespace, Name: ss.statefulset.Name}) - if err := r.Client.Delete(ctx, ss.statefulset); err != nil { - return ctrl.Result{}, err + // Statefulset termination process 1/4: Set unregistrationRequestTimestamp only after all the pods managed by the statefulset have + // started unregistreation process. + // + // NOTE: We just mark it instead of immediately starting the deletion process. + // Otherwise, the runner pod may hit termiationGracePeriod before the unregistration completes(the max terminationGracePeriod is limited to 1h by K8s and a job can be run for more than that), + // or actions/runner may potentially misbehave on SIGTERM immediately sent by K8s. + // We'd better unregister first and then start a pod deletion process. + // The annotation works as a mark to start the pod unregistration and deletion process of ours. + for _, po := range ss.pods { + if _, err := annotatePodOnce(ctx, r.Client, log, &po, unregistrationRequestTimestamp, time.Now().Format(time.RFC3339)); err != nil { + return ctrl.Result{}, err + } + } + + if _, ok := getAnnotation(&ss.statefulset.ObjectMeta, unregistrationRequestTimestamp); !ok { + updated := ss.statefulset.DeepCopy() + setAnnotation(&updated.ObjectMeta, unregistrationRequestTimestamp, time.Now().Format(time.RFC3339)) + + if err := r.Client.Patch(ctx, updated, client.MergeFrom(ss.statefulset)); err != nil { + log.Error(err, fmt.Sprintf("Failed to patch statefulset to have %s annotation", unregistrationRequestTimestamp)) + return ctrl.Result{}, err + } + + log.V(2).Info("Redundant statefulset has been annotated to start the unregistration before deletion") + } else { + log.V(2).Info("BUG: Redundant statefulset was already annotated") } - log.V(2).Info("Deleted redundant statefulset", "i", i, "lastIndex", lastIndex) } return ctrl.Result{}, err } else if retained > newDesiredReplicas { - log.V(2).Info("Waiting sync before scale down", "retained", retained, "newDesiredReplicas", newDesiredReplicas, "lastIndex", lastIndex) + log.V(2).Info("Waiting sync before scale down", "retained", retained, "newDesiredReplicas", newDesiredReplicas) return ctrl.Result{}, nil } else { - log.Info("Invalid state", "retained", retained, "newDesiredReplicas", newDesiredReplicas, "lastIndex", lastIndex) + log.Info("Invalid state", "retained", retained, "newDesiredReplicas", newDesiredReplicas) panic("crashed due to invalid state") } } @@ -352,11 +420,15 @@ func (r *RunnerSetReconciler) getPodsForStatefulset(ctx context.Context, log log var completed, running, terminating, pending, total int + var pods []corev1.Pod + for _, pod := range podList.Items { if owner := metav1.GetControllerOf(&pod); owner == nil || owner.Kind != "StatefulSet" || owner.Name != ss.Name { continue } + pods = append(pods, pod) + total++ if runnerPodOrContainerIsStopped(&pod) { @@ -385,7 +457,7 @@ func (r *RunnerSetReconciler) getPodsForStatefulset(ctx context.Context, log log pending: pending, templateHash: templateHash, statefulset: ss, - pods: podList.Items, + pods: pods, }, nil }