Merge pull request #1179 from actions-runner-controller/refactor-runner-and-runnerset

Refactor Runner and RunnerSet so that they use the same library code that powers RunnerSet.

RunnerSet is StatefulSet-based and RunnerSet/Runner is Pod-based so it had been hard to unify the implementation although they look very similar in many aspects.

This change finally resolves that issue, by first introducing a library that implements the generic logic that is used to reconcile RunnerSet, then adding an adapter that can be used to let the generic logic manage runner pods via Runner, instead of via StatefulSet.

Follow-up for #1127, #1167, and 1178
This commit is contained in:
Yusuke Kuoka 2022-03-06 15:56:51 +09:00 committed by GitHub
commit bed927052d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 790 additions and 1242 deletions

View File

@ -84,8 +84,7 @@ if [ -n "${TEST_REPO}" ]; then
cat acceptance/testdata/runnerset.envsubst.yaml | TEST_ENTERPRISE= TEST_ORG= RUNNER_MIN_REPLICAS=${REPO_RUNNER_MIN_REPLICAS} NAME=repo-runnerset envsubst | kubectl apply -f -
else
echo 'Deploying runnerdeployment and hra. Set USE_RUNNERSET if you want to deploy runnerset instead.'
cat acceptance/testdata/repo.runnerdeploy.yaml | envsubst | kubectl apply -f -
cat acceptance/testdata/repo.hra.yaml | envsubst | kubectl apply -f -
cat acceptance/testdata/runnerdeploy.envsubst.yaml | TEST_ENTERPRISE= TEST_ORG= RUNNER_MIN_REPLICAS=${REPO_RUNNER_MIN_REPLICAS} NAME=repo-runnerdeploy envsubst | kubectl apply -f -
fi
else
echo 'Skipped deploying runnerdeployment and hra. Set TEST_REPO to "yourorg/yourrepo" to deploy.'

View File

@ -1,36 +0,0 @@
apiVersion: actions.summerwind.dev/v1alpha1
kind: HorizontalRunnerAutoscaler
metadata:
name: org
spec:
scaleTargetRef:
name: org-runnerdeploy
scaleUpTriggers:
- githubEvent:
checkRun:
types: ["created"]
status: "queued"
amount: 1
duration: "1m"
scheduledOverrides:
- startTime: "2021-05-11T16:05:00+09:00"
endTime: "2021-05-11T16:40:00+09:00"
minReplicas: 2
- startTime: "2021-05-01T00:00:00+09:00"
endTime: "2021-05-03T00:00:00+09:00"
recurrenceRule:
frequency: Weekly
untilTime: "2022-05-01T00:00:00+09:00"
minReplicas: 0
minReplicas: 0
maxReplicas: 5
# Used to test that HRA is working for org runners
metrics:
- type: PercentageRunnersBusy
scaleUpThreshold: '0.75'
scaleDownThreshold: '0.3'
scaleUpFactor: '2'
scaleDownFactor: '0.5'
- type: TotalNumberOfQueuedAndInProgressWorkflowRuns
repositoryNames:
- ${TEST_ORG_REPO}

View File

@ -1,44 +0,0 @@
apiVersion: actions.summerwind.dev/v1alpha1
kind: RunnerDeployment
metadata:
name: org-runnerdeploy
spec:
# replicas: 1
template:
spec:
organization: ${TEST_ORG}
#
# Custom runner image
#
image: ${RUNNER_NAME}:${RUNNER_TAG}
imagePullPolicy: IfNotPresent
# Whether to pass --ephemeral (true) or --once (false, deprecated)
env:
- name: RUNNER_FEATURE_FLAG_EPHEMERAL
value: "${RUNNER_FEATURE_FLAG_EPHEMERAL}"
#
# dockerd within runner container
#
## Replace `mumoshu/actions-runner-dind:dev` with your dind image
#dockerdWithinRunnerContainer: true
#image: mumoshu/actions-runner-dind:dev
#
# Set the MTU used by dockerd-managed network interfaces (including docker-build-ubuntu)
#
#dockerMTU: 1450
#Runner group
# labels:
# - "mylabel 1"
# - "mylabel 2"
labels:
- "${RUNNER_LABEL}"
#
# Non-standard working directory
#
# workDir: "/"

View File

@ -1,25 +0,0 @@
apiVersion: actions.summerwind.dev/v1alpha1
kind: HorizontalRunnerAutoscaler
metadata:
name: actions-runner-aos-autoscaler
spec:
scaleTargetRef:
name: example-runnerdeploy
scaleUpTriggers:
- githubEvent:
checkRun:
types: ["created"]
status: "queued"
amount: 1
duration: "1m"
minReplicas: 0
maxReplicas: 5
metrics:
- type: PercentageRunnersBusy
scaleUpThreshold: '0.75'
scaleDownThreshold: '0.3'
scaleUpFactor: '2'
scaleDownFactor: '0.5'
- type: TotalNumberOfQueuedAndInProgressWorkflowRuns
repositoryNames:
- ${TEST_REPO}

View File

@ -1,44 +0,0 @@
apiVersion: actions.summerwind.dev/v1alpha1
kind: RunnerDeployment
metadata:
name: example-runnerdeploy
spec:
# replicas: 1
template:
spec:
repository: ${TEST_REPO}
#
# Custom runner image
#
image: ${RUNNER_NAME}:${RUNNER_TAG}
imagePullPolicy: IfNotPresent
# Whether to pass --ephemeral (true) or --once (false, deprecated)
env:
- name: RUNNER_FEATURE_FLAG_EPHEMERAL
value: "${RUNNER_FEATURE_FLAG_EPHEMERAL}"
#
# dockerd within runner container
#
## Replace `mumoshu/actions-runner-dind:dev` with your dind image
#dockerdWithinRunnerContainer: true
#image: mumoshu/actions-runner-dind:dev
#
# Set the MTU used by dockerd-managed network interfaces (including docker-build-ubuntu)
#
#dockerMTU: 1450
#Runner group
# labels:
# - "mylabel 1"
# - "mylabel 2"
labels:
- "${RUNNER_LABEL}"
#
# Non-standard working directory
#
# workDir: "/"

View File

@ -1,29 +0,0 @@
apiVersion: actions.summerwind.dev/v1alpha1
kind: HorizontalRunnerAutoscaler
metadata:
name: example-runnerset
spec:
scaleTargetRef:
kind: RunnerSet
name: example-runnerset
scaleUpTriggers:
- githubEvent:
checkRun:
types: ["created"]
status: "queued"
amount: 1
duration: "1m"
# RunnerSet doesn't support scale from/to zero yet
minReplicas: 1
maxReplicas: 5
# This should be less than 600(seconds, the default) for faster testing
scaleDownDelaySecondsAfterScaleOut: 60
metrics:
- type: PercentageRunnersBusy
scaleUpThreshold: '0.75'
scaleDownThreshold: '0.3'
scaleUpFactor: '2'
scaleDownFactor: '0.5'
- type: TotalNumberOfQueuedAndInProgressWorkflowRuns
repositoryNames:
- ${TEST_REPO}

View File

@ -18,15 +18,12 @@ package controllers
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/actions-runner-controller/actions-runner-controller/hash"
"github.com/go-logr/logr"
gogithub "github.com/google/go-github/v39/github"
"k8s.io/apimachinery/pkg/util/wait"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
@ -92,12 +89,6 @@ func (r *RunnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, client.IgnoreNotFound(err)
}
err := runner.Validate()
if err != nil {
log.Info("Failed to validate runner spec", "error", err.Error())
return ctrl.Result{}, nil
}
if runner.ObjectMeta.DeletionTimestamp.IsZero() {
finalizers, added := addFinalizer(runner.ObjectMeta.Finalizers, finalizerName)
@ -125,34 +116,6 @@ func (r *RunnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return r.processRunnerDeletion(runner, ctx, log, &pod)
}
registrationOnly := metav1.HasAnnotation(runner.ObjectMeta, annotationKeyRegistrationOnly)
if registrationOnly && runner.Status.Phase != "" {
// At this point we are sure that the registration-only runner has successfully configured and
// is of `offline` status, because we set runner.Status.Phase to that of the runner pod only after
// successful registration.
var pod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
if !kerrors.IsNotFound(err) {
log.Info(fmt.Sprintf("Retrying soon as we failed to get registration-only runner pod: %v", err))
return ctrl.Result{Requeue: true}, nil
}
} else if err := r.Delete(ctx, &pod); err != nil {
if !kerrors.IsNotFound(err) {
log.Info(fmt.Sprintf("Retrying soon as we failed to delete registration-only runner pod: %v", err))
return ctrl.Result{Requeue: true}, nil
}
}
log.Info("Successfully deleted registration-only runner pod to free node and cluster resource")
// Return here to not recreate the deleted pod, because recreating it is the waste of cluster and node resource,
// and also defeats the original purpose of scale-from/to-zero we're trying to implement by using the registration-only runner.
return ctrl.Result{}, nil
}
var pod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
if !kerrors.IsNotFound(err) {
@ -162,281 +125,31 @@ func (r *RunnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return r.processRunnerCreation(ctx, runner, log)
}
// Pod already exists
if !pod.ObjectMeta.DeletionTimestamp.IsZero() {
return r.processRunnerPodDeletion(ctx, runner, log, pod)
phase := string(pod.Status.Phase)
if phase == "" {
phase = "Created"
}
// If pod has ended up succeeded we need to restart it
// Happens e.g. when dind is in runner and run completes
stopped := runnerPodOrContainerIsStopped(&pod)
ephemeral := runner.Spec.Ephemeral == nil || *runner.Spec.Ephemeral
if stopped && ephemeral {
log.V(1).Info("Ephemeral runner has been stopped successfully. Marking this runner for deletion.")
// This is the key to make ephemeral runners to work reliably with webhook-based autoscale.
// See https://github.com/actions-runner-controller/actions-runner-controller/issues/911#issuecomment-1046161384 for more context.
//
// In the next reconcilation loop, this triggers a runner unregistration.
// (Note that the unregistration can fail safely because an ephemeral runner usually unregisters itself from GitHub but we do it just for confirmation)
//
// See the code path above that is executed when `runner.ObjectMeta.DeletionTimestamp.IsZero()` isn't true,
// which handles the unregistrationa the removal of the completed pod, and so on.
if err := r.Delete(ctx, &runner); err != nil {
log.V(1).Error(err, "Retrying to mark this runner for deletion in 10 seconds.")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
if runner.Status.Phase != phase {
if pod.Status.Phase == corev1.PodRunning {
// Seeing this message, you can expect the runner to become `Running` soon.
log.V(1).Info(
"Runner appears to have been registered and running.",
"podCreationTimestamp", pod.CreationTimestamp,
)
}
return ctrl.Result{Requeue: true}, nil
}
updated := runner.DeepCopy()
updated.Status.Phase = phase
updated.Status.Reason = pod.Status.Reason
updated.Status.Message = pod.Status.Message
restart := stopped
if registrationOnly && stopped {
restart = false
log.Info(
"Observed that registration-only runner for scaling-from-zero has successfully stopped. " +
"Unlike other pods, this one will be recreated only when runner spec changes.",
)
}
if updated, err := r.updateRegistrationToken(ctx, runner); err != nil {
return ctrl.Result{}, err
} else if updated {
return ctrl.Result{Requeue: true}, nil
}
newPod, err := r.newPod(runner)
if err != nil {
log.Error(err, "Could not create pod")
return ctrl.Result{}, err
}
if registrationOnly {
newPod.Spec.Containers[0].Env = append(
newPod.Spec.Containers[0].Env,
corev1.EnvVar{
Name: "RUNNER_REGISTRATION_ONLY",
Value: "true",
},
)
}
var registrationRecheckDelay time.Duration
// all checks done below only decide whether a restart is needed
// if a restart was already decided before, there is no need for the checks
// saving API calls and scary log messages
if !restart {
registrationCheckInterval := time.Minute
if r.RegistrationRecheckInterval > 0 {
registrationCheckInterval = r.RegistrationRecheckInterval
}
// We want to call ListRunners GitHub Actions API only once per runner per minute.
// This if block, in conjunction with:
// return ctrl.Result{RequeueAfter: registrationRecheckDelay}, nil
// achieves that.
if lastCheckTime := runner.Status.LastRegistrationCheckTime; lastCheckTime != nil {
nextCheckTime := lastCheckTime.Add(registrationCheckInterval)
now := time.Now()
// Requeue scheduled by RequeueAfter can happen a bit earlier (like dozens of milliseconds)
// so to avoid excessive, in-effective retry, we heuristically ignore the remaining delay in case it is
// shorter than 1s
requeueAfter := nextCheckTime.Sub(now) - time.Second
if requeueAfter > 0 {
log.Info(
fmt.Sprintf("Skipped registration check because it's deferred until %s. Retrying in %s at latest", nextCheckTime, requeueAfter),
"lastRegistrationCheckTime", lastCheckTime,
"registrationCheckInterval", registrationCheckInterval,
)
// Without RequeueAfter, the controller may not retry on scheduled. Instead, it must wait until the
// next sync period passes, which can be too much later than nextCheckTime.
//
// We need to requeue on this reconcilation even though we have already scheduled the initial
// requeue previously with `return ctrl.Result{RequeueAfter: registrationRecheckDelay}, nil`.
// Apparently, the workqueue used by controller-runtime seems to deduplicate and resets the delay on
// other requeues- so the initial scheduled requeue may have been reset due to requeue on
// spec/status change.
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
}
notFound := false
offline := false
runnerBusy, err := r.GitHubClient.IsRunnerBusy(ctx, runner.Spec.Enterprise, runner.Spec.Organization, runner.Spec.Repository, runner.Name)
currentTime := time.Now()
if err != nil {
var notFoundException *github.RunnerNotFound
var offlineException *github.RunnerOffline
if errors.As(err, &notFoundException) {
notFound = true
} else if errors.As(err, &offlineException) {
offline = true
} else {
var e *gogithub.RateLimitError
if errors.As(err, &e) {
// We log the underlying error when we failed calling GitHub API to list or unregisters,
// or the runner is still busy.
log.Error(
err,
fmt.Sprintf(
"Failed to check if runner is busy due to Github API rate limit. Retrying in %s to avoid excessive GitHub API calls",
retryDelayOnGitHubAPIRateLimitError,
),
)
return ctrl.Result{RequeueAfter: retryDelayOnGitHubAPIRateLimitError}, err
}
return ctrl.Result{}, err
}
}
// See the `newPod` function called above for more information
// about when this hash changes.
curHash := pod.Labels[LabelKeyPodTemplateHash]
newHash := newPod.Labels[LabelKeyPodTemplateHash]
if !runnerBusy && curHash != newHash {
restart = true
}
registrationTimeout := 10 * time.Minute
durationAfterRegistrationTimeout := currentTime.Sub(pod.CreationTimestamp.Add(registrationTimeout))
registrationDidTimeout := durationAfterRegistrationTimeout > 0
if notFound {
if registrationDidTimeout {
log.Info(
"Runner failed to register itself to GitHub in timely manner. "+
"Recreating the pod to see if it resolves the issue. "+
"CAUTION: If you see this a lot, you should investigate the root cause. "+
"See https://github.com/actions-runner-controller/actions-runner-controller/issues/288",
"podCreationTimestamp", pod.CreationTimestamp,
"currentTime", currentTime,
"configuredRegistrationTimeout", registrationTimeout,
)
restart = true
} else {
log.V(1).Info(
"Runner pod exists but we failed to check if runner is busy. Apparently it still needs more time.",
"runnerName", runner.Name,
)
}
} else if offline {
if registrationOnly {
log.Info(
"Observed that registration-only runner for scaling-from-zero has successfully been registered.",
"podCreationTimestamp", pod.CreationTimestamp,
"currentTime", currentTime,
"configuredRegistrationTimeout", registrationTimeout,
)
} else if registrationDidTimeout {
if runnerBusy {
log.Info(
"Timeout out while waiting for the runner to be online, but observed that it's busy at the same time."+
"This is a known (unintuitive) behaviour of a runner that is already running a job. Please see https://github.com/actions-runner-controller/actions-runner-controller/issues/911",
"podCreationTimestamp", pod.CreationTimestamp,
"currentTime", currentTime,
"configuredRegistrationTimeout", registrationTimeout,
)
} else {
log.Info(
"Already existing GitHub runner still appears offline . "+
"Recreating the pod to see if it resolves the issue. "+
"CAUTION: If you see this a lot, you should investigate the root cause. ",
"podCreationTimestamp", pod.CreationTimestamp,
"currentTime", currentTime,
"configuredRegistrationTimeout", registrationTimeout,
)
restart = true
}
} else {
log.V(1).Info(
"Runner pod exists but the GitHub runner appears to be still offline. Waiting for runner to get online ...",
"runnerName", runner.Name,
)
}
}
if (notFound || (offline && !registrationOnly)) && !registrationDidTimeout {
registrationRecheckJitter := 10 * time.Second
if r.RegistrationRecheckJitter > 0 {
registrationRecheckJitter = r.RegistrationRecheckJitter
}
registrationRecheckDelay = registrationCheckInterval + wait.Jitter(registrationRecheckJitter, 0.1)
if err := r.Status().Patch(ctx, updated, client.MergeFrom(&runner)); err != nil {
log.Error(err, "Failed to update runner status for Phase/Reason/Message")
return ctrl.Result{}, err
}
}
// Don't do anything if there's no need to restart the runner
if !restart {
// This guard enables us to update runner.Status.Phase to `Running` only after
// the runner is registered to GitHub.
if registrationRecheckDelay > 0 {
log.V(1).Info(fmt.Sprintf("Rechecking the runner registration in %s", registrationRecheckDelay))
updated := runner.DeepCopy()
updated.Status.LastRegistrationCheckTime = &metav1.Time{Time: time.Now()}
if err := r.Status().Patch(ctx, updated, client.MergeFrom(&runner)); err != nil {
log.Error(err, "Failed to update runner status for LastRegistrationCheckTime")
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: registrationRecheckDelay}, nil
}
if runner.Status.Phase != string(pod.Status.Phase) {
if pod.Status.Phase == corev1.PodRunning {
// Seeing this message, you can expect the runner to become `Running` soon.
log.Info(
"Runner appears to have registered and running.",
"podCreationTimestamp", pod.CreationTimestamp,
)
}
updated := runner.DeepCopy()
updated.Status.Phase = string(pod.Status.Phase)
updated.Status.Reason = pod.Status.Reason
updated.Status.Message = pod.Status.Message
if err := r.Status().Patch(ctx, updated, client.MergeFrom(&runner)); err != nil {
log.Error(err, "Failed to update runner status for Phase/Reason/Message")
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
updatedPod, res, err := tickRunnerGracefulStop(ctx, r.unregistrationTimeout(), r.unregistrationRetryDelay(), log, r.GitHubClient, r.Client, runner.Spec.Enterprise, runner.Spec.Organization, runner.Spec.Repository, runner.Name, &pod)
if res != nil {
return *res, err
}
// Only delete the pod if we successfully unregistered the runner or the runner is already deleted from the service.
// This should help us avoid race condition between runner pickup job after we think the runner is not busy.
if err := r.Delete(ctx, updatedPod); err != nil {
log.Error(err, "Failed to delete pod resource")
return ctrl.Result{}, err
}
r.Recorder.Event(&runner, corev1.EventTypeNormal, "PodDeleted", fmt.Sprintf("Deleted pod '%s'", newPod.Name))
log.Info("Deleted runner pod", "repository", runner.Spec.Repository)
return ctrl.Result{}, nil
}
@ -480,11 +193,6 @@ func (r *RunnerReconciler) processRunnerDeletion(runner v1alpha1.Runner, ctx con
finalizers, removed := removeFinalizer(runner.ObjectMeta.Finalizers, finalizerName)
if removed {
_, res, err := tickRunnerGracefulStop(ctx, r.unregistrationTimeout(), r.unregistrationRetryDelay(), log, r.GitHubClient, r.Client, runner.Spec.Enterprise, runner.Spec.Organization, runner.Spec.Repository, runner.Name, pod)
if res != nil {
return *res, err
}
newRunner := runner.DeepCopy()
newRunner.ObjectMeta.Finalizers = finalizers
@ -499,60 +207,6 @@ func (r *RunnerReconciler) processRunnerDeletion(runner v1alpha1.Runner, ctx con
return ctrl.Result{}, nil
}
func (r *RunnerReconciler) unregistrationTimeout() time.Duration {
unregistrationTimeout := DefaultUnregistrationTimeout
if r.UnregistrationTimeout > 0 {
unregistrationTimeout = r.UnregistrationTimeout
}
return unregistrationTimeout
}
func (r *RunnerReconciler) unregistrationRetryDelay() time.Duration {
retryDelay := DefaultUnregistrationRetryDelay
if r.UnregistrationRetryDelay > 0 {
retryDelay = r.UnregistrationRetryDelay
}
return retryDelay
}
func (r *RunnerReconciler) processRunnerPodDeletion(ctx context.Context, runner v1alpha1.Runner, log logr.Logger, pod corev1.Pod) (reconcile.Result, error) {
deletionTimeout := 1 * time.Minute
currentTime := time.Now()
deletionDidTimeout := currentTime.Sub(pod.DeletionTimestamp.Add(deletionTimeout)) > 0
if deletionDidTimeout {
log.Info(
fmt.Sprintf("Failed to delete pod within %s. ", deletionTimeout)+
"This is typically the case when a Kubernetes node became unreachable "+
"and the kube controller started evicting nodes. Forcefully deleting the pod to not get stuck.",
"podDeletionTimestamp", pod.DeletionTimestamp,
"currentTime", currentTime,
"configuredDeletionTimeout", deletionTimeout,
)
var force int64 = 0
// forcefully delete runner as we would otherwise get stuck if the node stays unreachable
if err := r.Delete(ctx, &pod, &client.DeleteOptions{GracePeriodSeconds: &force}); err != nil {
// probably
if !kerrors.IsNotFound(err) {
log.Error(err, "Failed to forcefully delete pod resource ...")
return ctrl.Result{}, err
}
// forceful deletion finally succeeded
return ctrl.Result{Requeue: true}, nil
}
r.Recorder.Event(&runner, corev1.EventTypeNormal, "PodDeleted", fmt.Sprintf("Forcefully deleted pod '%s'", pod.Name))
log.Info("Forcefully deleted runner pod", "repository", runner.Spec.Repository)
// give kube manager a little time to forcefully delete the stuck pod
return ctrl.Result{RequeueAfter: 3 * time.Second}, nil
} else {
return ctrl.Result{}, nil
}
}
func (r *RunnerReconciler) processRunnerCreation(ctx context.Context, runner v1alpha1.Runner, log logr.Logger) (reconcile.Result, error) {
if updated, err := r.updateRegistrationToken(ctx, runner); err != nil {
return ctrl.Result{}, err
@ -584,6 +238,7 @@ func (r *RunnerReconciler) processRunnerCreation(ctx context.Context, runner v1a
r.Recorder.Event(&runner, corev1.EventTypeNormal, "PodCreated", fmt.Sprintf("Created pod '%s'", newPod.Name))
log.Info("Created runner pod", "repository", runner.Spec.Repository)
return ctrl.Result{}, nil
}
@ -696,7 +351,7 @@ func (r *RunnerReconciler) newPod(runner v1alpha1.Runner) (corev1.Pod, error) {
registrationOnly := metav1.HasAnnotation(runner.ObjectMeta, annotationKeyRegistrationOnly)
pod, err := newRunnerPod(template, runner.Spec.RunnerConfig, r.RunnerImage, r.RunnerImagePullSecrets, r.DockerImage, r.DockerRegistryMirror, r.GitHubClient.GithubBaseURL, registrationOnly)
pod, err := newRunnerPod(runner.Name, template, runner.Spec.RunnerConfig, r.RunnerImage, r.RunnerImagePullSecrets, r.DockerImage, r.DockerRegistryMirror, r.GitHubClient.GithubBaseURL, registrationOnly)
if err != nil {
return pod, err
}
@ -813,7 +468,7 @@ func mutatePod(pod *corev1.Pod, token string) *corev1.Pod {
return updated
}
func newRunnerPod(template corev1.Pod, runnerSpec v1alpha1.RunnerConfig, defaultRunnerImage string, defaultRunnerImagePullSecrets []string, defaultDockerImage, defaultDockerRegistryMirror string, githubBaseURL string, registrationOnly bool) (corev1.Pod, error) {
func newRunnerPod(runnerName string, template corev1.Pod, runnerSpec v1alpha1.RunnerConfig, defaultRunnerImage string, defaultRunnerImagePullSecrets []string, defaultDockerImage, defaultDockerRegistryMirror string, githubBaseURL string, registrationOnly bool) (corev1.Pod, error) {
var (
privileged bool = true
dockerdInRunner bool = runnerSpec.DockerdWithinRunnerContainer != nil && *runnerSpec.DockerdWithinRunnerContainer
@ -822,6 +477,12 @@ func newRunnerPod(template corev1.Pod, runnerSpec v1alpha1.RunnerConfig, default
dockerdInRunnerPrivileged bool = dockerdInRunner
)
template = *template.DeepCopy()
// This label selector is used by default when rd.Spec.Selector is empty.
template.ObjectMeta.Labels = CloneAndAddLabel(template.ObjectMeta.Labels, LabelKeyRunnerSetName, runnerName)
template.ObjectMeta.Labels = CloneAndAddLabel(template.ObjectMeta.Labels, LabelKeyPodMutation, LabelValuePodMutation)
workDir := runnerSpec.WorkDir
if workDir == "" {
workDir = "/runner/_work"

View File

@ -52,7 +52,7 @@ func annotatePodOnce(ctx context.Context, c client.Client, log logr.Logger, pod
return nil, nil
}
if _, ok := getAnnotation(&pod.ObjectMeta, k); ok {
if _, ok := getAnnotation(pod, k); ok {
return pod, nil
}
@ -72,7 +72,7 @@ func annotatePodOnce(ctx context.Context, c client.Client, log logr.Logger, pod
func ensureRunnerUnregistration(ctx context.Context, unregistrationTimeout time.Duration, retryDelay time.Duration, log logr.Logger, ghClient *github.Client, enterprise, organization, repository, runner string, pod *corev1.Pod) (*ctrl.Result, error) {
var runnerID *int64
if id, ok := getAnnotation(&pod.ObjectMeta, AnnotationKeyRunnerID); ok {
if id, ok := getAnnotation(pod, AnnotationKeyRunnerID); ok {
v, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return &ctrl.Result{}, err
@ -175,7 +175,7 @@ func ensureRunnerUnregistration(ctx context.Context, unregistrationTimeout time.
}
func ensureRunnerPodRegistered(ctx context.Context, log logr.Logger, ghClient *github.Client, c client.Client, enterprise, organization, repository, runner string, pod *corev1.Pod) (*corev1.Pod, *ctrl.Result, error) {
_, hasRunnerID := getAnnotation(&pod.ObjectMeta, AnnotationKeyRunnerID)
_, hasRunnerID := getAnnotation(pod, AnnotationKeyRunnerID)
if runnerPodOrContainerIsStopped(pod) || hasRunnerID {
return pod, nil, nil
}
@ -199,12 +199,12 @@ func ensureRunnerPodRegistered(ctx context.Context, log logr.Logger, ghClient *g
return updated, nil, nil
}
func getAnnotation(meta *metav1.ObjectMeta, key string) (string, bool) {
if meta.Annotations == nil {
func getAnnotation(obj client.Object, key string) (string, bool) {
if obj.GetAnnotations() == nil {
return "", false
}
v, ok := meta.Annotations[key]
v, ok := obj.GetAnnotations()[key]
return v, ok
}
@ -237,7 +237,7 @@ func podConditionTransitionTimeAfter(pod *corev1.Pod, tpe corev1.PodConditionTyp
}
func podRunnerID(pod *corev1.Pod) string {
id, _ := getAnnotation(&pod.ObjectMeta, AnnotationKeyRunnerID)
id, _ := getAnnotation(pod, AnnotationKeyRunnerID)
return id
}

View File

@ -165,7 +165,7 @@ func (r *RunnerPodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
runnerPod = *po
if _, unregistrationRequested := getAnnotation(&runnerPod.ObjectMeta, AnnotationKeyUnregistrationRequestTimestamp); unregistrationRequested {
if _, unregistrationRequested := getAnnotation(&runnerPod, AnnotationKeyUnregistrationRequestTimestamp); unregistrationRequested {
log.V(2).Info("Progressing unregistration because unregistration-request timestamp is set")
// At this point we're sure that DeletionTimestamp is not set yet, but the unregistration process is triggered by an upstream controller like runnerset-controller.

View File

@ -0,0 +1,564 @@
package controllers
import (
"context"
"fmt"
"sort"
"time"
"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type podsForOwner struct {
total int
completed int
running int
terminating int
regTimeout int
pending int
templateHash string
runner *v1alpha1.Runner
statefulSet *appsv1.StatefulSet
owner owner
object client.Object
synced bool
pods []corev1.Pod
}
type owner interface {
client.Object
pods(context.Context, client.Client) ([]corev1.Pod, error)
templateHash() (string, bool)
withAnnotation(k, v string) client.Object
synced() bool
}
type ownerRunner struct {
client.Object
Log logr.Logger
Runner *v1alpha1.Runner
}
var _ owner = (*ownerRunner)(nil)
func (r *ownerRunner) pods(ctx context.Context, c client.Client) ([]corev1.Pod, error) {
var pod corev1.Pod
if err := c.Get(ctx, types.NamespacedName{Namespace: r.Runner.Namespace, Name: r.Runner.Name}, &pod); err != nil {
if errors.IsNotFound(err) {
return nil, nil
}
r.Log.Error(err, "Failed to get pod managed by runner")
return nil, err
}
return []corev1.Pod{pod}, nil
}
func (r *ownerRunner) templateHash() (string, bool) {
return getRunnerTemplateHash(r.Runner)
}
func (r *ownerRunner) withAnnotation(k, v string) client.Object {
copy := r.Runner.DeepCopy()
setAnnotation(&copy.ObjectMeta, k, v)
return copy
}
func (r *ownerRunner) synced() bool {
return r.Runner.Status.Phase != ""
}
type ownerStatefulSet struct {
client.Object
Log logr.Logger
StatefulSet *appsv1.StatefulSet
}
var _ owner = (*ownerStatefulSet)(nil)
func (s *ownerStatefulSet) pods(ctx context.Context, c client.Client) ([]corev1.Pod, error) {
var podList corev1.PodList
if err := c.List(ctx, &podList, client.MatchingLabels(s.StatefulSet.Spec.Template.ObjectMeta.Labels)); err != nil {
s.Log.Error(err, "Failed to list pods managed by statefulset")
return nil, err
}
var pods []corev1.Pod
for _, pod := range podList.Items {
if owner := metav1.GetControllerOf(&pod); owner == nil || owner.Kind != "StatefulSet" || owner.Name != s.StatefulSet.Name {
continue
}
pods = append(pods, pod)
}
return pods, nil
}
func (s *ownerStatefulSet) templateHash() (string, bool) {
return getRunnerTemplateHash(s.StatefulSet)
}
func (s *ownerStatefulSet) withAnnotation(k, v string) client.Object {
copy := s.StatefulSet.DeepCopy()
setAnnotation(&copy.ObjectMeta, k, v)
return copy
}
func (s *ownerStatefulSet) synced() bool {
var replicas int32 = 1
if s.StatefulSet.Spec.Replicas != nil {
replicas = *s.StatefulSet.Spec.Replicas
}
if s.StatefulSet.Status.Replicas != replicas {
s.Log.V(2).Info("Waiting for statefulset to sync", "desiredReplicas", replicas, "currentReplicas", s.StatefulSet.Status.Replicas)
return false
}
return true
}
func getPodsForOwner(ctx context.Context, c client.Client, log logr.Logger, o client.Object) (*podsForOwner, error) {
var (
owner owner
runner *v1alpha1.Runner
statefulSet *appsv1.StatefulSet
object client.Object
)
switch v := o.(type) {
case *v1alpha1.Runner:
owner = &ownerRunner{
Log: log,
Runner: v,
Object: v,
}
runner = v
object = v
case *appsv1.StatefulSet:
owner = &ownerStatefulSet{
Log: log,
StatefulSet: v,
Object: v,
}
statefulSet = v
object = v
default:
return nil, fmt.Errorf("BUG: Unsupported runner pods owner %v(%T)", v, v)
}
pods, err := owner.pods(ctx, c)
if err != nil {
return nil, err
}
var completed, running, terminating, regTimeout, pending, total int
for _, pod := range pods {
total++
if runnerPodOrContainerIsStopped(&pod) {
completed++
} else if pod.Status.Phase == corev1.PodRunning {
if podRunnerID(&pod) == "" && podConditionTransitionTimeAfter(&pod, corev1.PodReady, registrationTimeout) {
log.Info(
"Runner failed to register itself to GitHub in timely manner. "+
"Recreating the pod to see if it resolves the issue. "+
"CAUTION: If you see this a lot, you should investigate the root cause. "+
"See https://github.com/actions-runner-controller/actions-runner-controller/issues/288",
"creationTimestamp", pod.CreationTimestamp,
"readyTransitionTime", podConditionTransitionTime(&pod, corev1.PodReady, corev1.ConditionTrue),
"configuredRegistrationTimeout", registrationTimeout,
)
regTimeout++
} else {
running++
}
} else if !pod.DeletionTimestamp.IsZero() {
terminating++
} else {
// pending includes running but timedout runner's pod too
pending++
}
}
templateHash, ok := owner.templateHash()
if !ok {
log.Info("Failed to get template hash of statefulset. It must be in an invalid state. Please manually delete the statefulset so that it is recreated")
return nil, nil
}
synced := owner.synced()
return &podsForOwner{
total: total,
completed: completed,
running: running,
terminating: terminating,
regTimeout: regTimeout,
pending: pending,
templateHash: templateHash,
runner: runner,
statefulSet: statefulSet,
owner: owner,
object: object,
synced: synced,
pods: pods,
}, nil
}
func getRunnerTemplateHash(r client.Object) (string, bool) {
hash, ok := r.GetLabels()[LabelKeyRunnerTemplateHash]
return hash, ok
}
type state struct {
podsForOwners map[string][]*podsForOwner
lastSyncTime *time.Time
}
type result struct {
currentObjects []*podsForOwner
}
// Why `create` must be a function rather than a client.Object? That's becase we use it to create one or more objects on scale up.
//
// We use client.Create to create a necessary number of client.Object. client.Create mutates the passed object on a successful creation.
// It seems to set .Revision at least, and the existence of .Revision let client.Create fail due to K8s restriction that an object being just created
// can't have .Revision.
// Now, imagine that you are to add 2 runner replicas on scale up.
// We create one resource object per a replica that ends up calling 2 client.Create calls.
// If we were reusing client.Object to be passed to client.Create calls, only the first call suceeeds.
// The second call fails due to the first call mutated the client.Object to have .Revision.
// Passing a factory function of client.Object and creating a brand-new client.Object per a client.Create call resolves this issue,
// allowing us to create two or more replicas in one reconcilation loop without being rejected by K8s.
func syncRunnerPodsOwners(ctx context.Context, c client.Client, log logr.Logger, effectiveTime *metav1.Time, newDesiredReplicas int, create func() client.Object, ephemeral bool, owners []client.Object) (*result, error) {
state, err := collectPodsForOwners(ctx, c, log, owners)
if err != nil || state == nil {
return nil, err
}
podsForOwnersPerTemplateHash, lastSyncTime := state.podsForOwners, state.lastSyncTime
// # Why do we recreate statefulsets instead of updating their desired replicas?
//
// A statefulset cannot add more pods when not all the pods are running.
// Our ephemeral runners' pods that have finished running become Completed(Phase=Succeeded).
// So creating one statefulset per a batch of ephemeral runners is the only way for us to add more replicas.
//
// # Why do we recreate statefulsets instead of updating fields other than replicas?
//
// That's because Kubernetes doesn't allow updating anything other than replicas, template, and updateStrategy.
// And the nature of ephemeral runner pods requires you to create a statefulset per a batch of new runner pods so
// we have really no other choice.
//
// If you're curious, the below is the error message you will get when you tried to update forbidden StatefulSet field(s):
//
// 2021-06-13T07:19:52.760Z ERROR actions-runner-controller.runnerset Failed to patch statefulset
// {"runnerset": "default/example-runnerset", "error": "StatefulSet.apps \"example-runnerset\" is invalid: s
// pec: Forbidden: updates to statefulset spec for fields other than 'replicas', 'template', and 'updateStrategy'
// are forbidden"}
//
// Even though the error message includes "Forbidden", this error's reason is "Invalid".
// So we used to match these errors by using errors.IsInvalid. But that's another story...
desiredTemplateHash, ok := getRunnerTemplateHash(create())
if !ok {
log.Info("Failed to get template hash of desired owner resource. It must be in an invalid state. Please manually delete the owner so that it is recreated")
return nil, nil
}
currentObjects := podsForOwnersPerTemplateHash[desiredTemplateHash]
sort.SliceStable(currentObjects, func(i, j int) bool {
return currentObjects[i].owner.GetCreationTimestamp().Time.Before(currentObjects[j].owner.GetCreationTimestamp().Time)
})
if len(currentObjects) > 0 {
timestampFirst := currentObjects[0].owner.GetCreationTimestamp()
timestampLast := currentObjects[len(currentObjects)-1].owner.GetCreationTimestamp()
var names []string
for _, ss := range currentObjects {
names = append(names, ss.owner.GetName())
}
log.V(2).Info("Detected some current object(s)", "creationTimestampFirst", timestampFirst, "creationTimestampLast", timestampLast, "names", names)
}
var pending, running, regTimeout int
for _, ss := range currentObjects {
pending += ss.pending
running += ss.running
regTimeout += ss.regTimeout
}
numOwners := len(owners)
var hashes []string
for h, _ := range state.podsForOwners {
hashes = append(hashes, h)
}
log.V(2).Info(
"Found some pods across owner(s)",
"pending", pending,
"running", running,
"regTimeout", regTimeout,
"desired", newDesiredReplicas,
"owners", numOwners,
)
maybeRunning := pending + running
if newDesiredReplicas > maybeRunning && ephemeral && lastSyncTime != nil && effectiveTime != nil && lastSyncTime.After(effectiveTime.Time) {
log.V(2).Info("Detected that some ephemeral runners have disappeared. Usually this is due to that ephemeral runner completions so ARC does not create new runners until EffectiveTime is updated.", "lastSyncTime", metav1.Time{Time: *lastSyncTime}, "effectiveTime", *effectiveTime, "desired", newDesiredReplicas, "pending", pending, "running", running)
} else if newDesiredReplicas > maybeRunning {
num := newDesiredReplicas - maybeRunning
for i := 0; i < num; i++ {
// Add more replicas
if err := c.Create(ctx, create()); err != nil {
return nil, err
}
}
log.V(1).Info("Created replica(s)",
"created", num,
"templateHashDesired", desiredTemplateHash,
"replicasDesired", newDesiredReplicas,
"replicasMaybeRunning", maybeRunning,
"templateHashObserved", hashes,
)
return nil, nil
} else if newDesiredReplicas <= running {
// If you use ephemeral runners with webhook-based autoscaler and the runner controller is working normally,
// you're unlikely to fall into this branch.
//
// That's because all the stakeholders work like this:
//
// 1. A runner pod completes with the runner container exiting with code 0
// 2. ARC runner controller detects the pod completion, marks the owner(runner or statefulset) resource on k8s for deletion (=Runner.DeletionTimestamp becomes non-zero)
// 3. GitHub triggers a corresponding workflow_job "complete" webhook event
// 4. ARC github-webhook-server (webhook-based autoscaler) receives the webhook event updates HRA with removing the oldest capacity reservation
// 5. ARC horizontalrunnerautoscaler updates RunnerDeployment's desired replicas based on capacity reservations
// 6. ARC runnerdeployment controller updates RunnerReplicaSet's desired replicas
// 7. (We're here) ARC runnerset or runnerreplicaset controller starts reconciling the owner resource (statefulset or runner)
//
// In a normally working ARC installation, the runner that was used to run the workflow job should already have been
// marked for deletion by the runner controller.
// This runnerreplicaset controller doesn't count marked runners into the `running` value, hence you're unlikely to
// fall into this branch when you're using ephemeral runners with webhook-based-autoscaler.
var retained int
var delete []*podsForOwner
for i := len(currentObjects) - 1; i >= 0; i-- {
ss := currentObjects[i]
if ss.running == 0 || retained >= newDesiredReplicas {
// In case the desired replicas is satisfied until i-1, or this owner has no running pods,
// this owner can be considered safe for deletion.
// Note that we already waited on this owner to create pods by waiting for
// `.Status.Replicas`(=total number of pods managed by owner, regardless of the runner is Running or Completed) to match the desired replicas in a previous step.
// So `.running == 0` means "the owner has created the desired number of pods before, and all of them are completed now".
delete = append(delete, ss)
} else if retained < newDesiredReplicas {
retained += ss.running
}
}
if retained == newDesiredReplicas {
for _, ss := range delete {
log := log.WithValues("owner", types.NamespacedName{Namespace: ss.owner.GetNamespace(), Name: ss.owner.GetName()})
// Statefulset termination process 1/4: Set unregistrationRequestTimestamp only after all the pods managed by the statefulset have
// started unregistreation process.
//
// NOTE: We just mark it instead of immediately starting the deletion process.
// Otherwise, the runner pod may hit termiationGracePeriod before the unregistration completes(the max terminationGracePeriod is limited to 1h by K8s and a job can be run for more than that),
// or actions/runner may potentially misbehave on SIGTERM immediately sent by K8s.
// We'd better unregister first and then start a pod deletion process.
// The annotation works as a mark to start the pod unregistration and deletion process of ours.
for _, po := range ss.pods {
if _, err := annotatePodOnce(ctx, c, log, &po, AnnotationKeyUnregistrationRequestTimestamp, time.Now().Format(time.RFC3339)); err != nil {
return nil, err
}
}
if _, ok := getAnnotation(ss.owner, AnnotationKeyUnregistrationRequestTimestamp); !ok {
updated := ss.owner.withAnnotation(AnnotationKeyUnregistrationRequestTimestamp, time.Now().Format(time.RFC3339))
if err := c.Patch(ctx, updated, client.MergeFrom(ss.object)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch object to have %s annotation", AnnotationKeyUnregistrationRequestTimestamp))
return nil, err
}
log.V(2).Info("Redundant object has been annotated to start the unregistration before deletion")
} else {
log.V(2).Info("BUG: Redundant object was already annotated")
}
}
return nil, err
} else if retained > newDesiredReplicas {
log.V(2).Info("Waiting sync before scale down", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
return nil, nil
} else {
log.Info("Invalid state", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
panic("crashed due to invalid state")
}
}
for _, sss := range podsForOwnersPerTemplateHash {
for _, ss := range sss {
if ss.templateHash != desiredTemplateHash {
if ss.owner.GetDeletionTimestamp().IsZero() {
if err := c.Delete(ctx, ss.object); err != nil {
log.Error(err, "Unable to delete object")
return nil, err
}
log.V(2).Info("Deleted redundant and outdated object")
}
return nil, nil
}
}
}
return &result{
currentObjects: currentObjects,
}, nil
}
func collectPodsForOwners(ctx context.Context, c client.Client, log logr.Logger, owners []client.Object) (*state, error) {
podsForOwnerPerTemplateHash := map[string][]*podsForOwner{}
// lastSyncTime becomes non-nil only when there are one or more owner(s) hence there are same number of runner pods.
// It's used to prevent runnerset-controller from recreating "completed ephemeral runners".
// This is needed to prevent runners from being terminated prematurely.
// See https://github.com/actions-runner-controller/actions-runner-controller/issues/911 for more context.
//
// This becomes nil when there are zero statefulset(s). That's fine because then there should be zero stateful(s) to be recreated either hence
// we don't need to guard with lastSyncTime.
var lastSyncTime *time.Time
for _, ss := range owners {
log := log.WithValues("owner", types.NamespacedName{Namespace: ss.GetNamespace(), Name: ss.GetName()})
res, err := getPodsForOwner(ctx, c, log, ss)
if err != nil {
return nil, err
}
if res.templateHash == "" {
log.Info("validation error: runner pod owner must have template hash", "object", res.object)
return nil, nil
}
// Statefulset termination process 4/4: Let Kubernetes cascade-delete the statefulset and the pods.
//
// If the runner is already marked for deletion(=has a non-zero deletion timestamp) by the runner controller (can be caused by an ephemeral runner completion)
// or by this controller (in case it was deleted in the previous reconcilation loop),
// we don't need to bother calling GitHub API to re-mark the runner for deletion.
// Just hold on, and runners will disappear as long as the runner controller is up and running.
if !res.owner.GetDeletionTimestamp().IsZero() {
continue
}
// Statefulset termination process 3/4: Set the deletionTimestamp to let Kubernetes start a cascade deletion of the statefulset and the pods.
if _, ok := getAnnotation(res.owner, AnnotationKeyUnregistrationCompleteTimestamp); ok {
if err := c.Delete(ctx, res.object); err != nil {
log.Error(err, "Failed to delete owner")
return nil, err
}
continue
}
// Statefulset termination process 2/4: Set unregistrationCompleteTimestamp only if all the pods managed by the statefulset
// have either unregistered or being deleted.
if _, ok := getAnnotation(res.owner, AnnotationKeyUnregistrationRequestTimestamp); ok {
var deletionSafe int
for _, po := range res.pods {
if _, ok := getAnnotation(&po, AnnotationKeyUnregistrationCompleteTimestamp); ok {
deletionSafe++
} else if !po.DeletionTimestamp.IsZero() {
deletionSafe++
}
}
log.V(2).Info("Marking owner for unregistration completion", "deletionSafe", deletionSafe, "total", res.total)
if deletionSafe == res.total {
if _, ok := getAnnotation(res.owner, AnnotationKeyUnregistrationCompleteTimestamp); !ok {
updated := res.owner.withAnnotation(AnnotationKeyUnregistrationCompleteTimestamp, time.Now().Format(time.RFC3339))
if err := c.Patch(ctx, updated, client.MergeFrom(res.object)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch owner to have %s annotation", AnnotationKeyUnregistrationCompleteTimestamp))
return nil, err
}
log.V(2).Info("Redundant owner has been annotated to start the deletion")
} else {
log.V(2).Info("BUG: Redundant owner was already annotated to start the deletion")
}
}
continue
}
if annotations := res.owner.GetAnnotations(); annotations != nil {
if a, ok := annotations[SyncTimeAnnotationKey]; ok {
t, err := time.Parse(time.RFC3339, a)
if err == nil {
if lastSyncTime == nil || lastSyncTime.Before(t) {
lastSyncTime = &t
}
}
}
}
// A completed owner and a completed runner pod can safely be deleted without
// a race condition so delete it here,
// so that the later process can be a bit simpler.
if res.total > 0 && res.total == res.completed {
if err := c.Delete(ctx, ss); err != nil {
log.Error(err, "Unable to delete owner")
return nil, err
}
log.V(2).Info("Deleted completed owner")
return nil, nil
}
if !res.synced {
log.V(1).Info("Skipped reconcilation because owner is not synced yet", "pods", res.pods)
return nil, nil
}
podsForOwnerPerTemplateHash[res.templateHash] = append(podsForOwnerPerTemplateHash[res.templateHash], res)
}
return &state{podsForOwnerPerTemplateHash, lastSyncTime}, nil
}

View File

@ -18,13 +18,10 @@ package controllers
import (
"context"
"errors"
"fmt"
"reflect"
"time"
"github.com/go-logr/logr"
gogithub "github.com/google/go-github/v39/github"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
@ -32,7 +29,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
@ -72,15 +68,35 @@ func (r *RunnerReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, nil
}
if rs.ObjectMeta.Labels == nil {
rs.ObjectMeta.Labels = map[string]string{}
}
// Template hash is usually set by the upstream controller(RunnerDeplloyment controller) on authoring
// RunerReplicaset resource, but it may be missing when the user directly created RunnerReplicaSet.
// As a template hash is required by by the runner replica management, we dynamically add it here without ever persisting it.
if rs.ObjectMeta.Labels[LabelKeyRunnerTemplateHash] == "" {
template := rs.Spec.DeepCopy()
template.Replicas = nil
template.EffectiveTime = nil
templateHash := ComputeHash(template)
log.Info("Using auto-generated template hash", "value", templateHash)
rs.ObjectMeta.Labels = CloneAndAddLabel(rs.ObjectMeta.Labels, LabelKeyRunnerTemplateHash, templateHash)
rs.Spec.Template.ObjectMeta.Labels = CloneAndAddLabel(rs.Spec.Template.ObjectMeta.Labels, LabelKeyRunnerTemplateHash, templateHash)
}
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return ctrl.Result{}, err
}
// Get the Runners managed by the target RunnerReplicaSet
var allRunners v1alpha1.RunnerList
var runnerList v1alpha1.RunnerList
if err := r.List(
ctx,
&allRunners,
&runnerList,
client.InNamespace(req.Namespace),
client.MatchingLabelsSelector{Selector: selector},
); err != nil {
@ -89,218 +105,43 @@ func (r *RunnerReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
}
var (
current int
ready int
available int
lastSyncTime *time.Time
)
for _, r := range allRunners.Items {
// This guard is required to avoid the RunnerReplicaSet created by the controller v0.17.0 or before
// to not treat all the runners in the namespace as its children.
if metav1.IsControlledBy(&r, &rs) && !metav1.HasAnnotation(r.ObjectMeta, annotationKeyRegistrationOnly) {
// If the runner is already marked for deletion(=has a non-zero deletion timestamp) by the runner controller (can be caused by an ephemeral runner completion)
// or by runnerreplicaset controller (in case it was deleted in the previous reconcilation loop),
// we don't need to bother calling GitHub API to re-mark the runner for deletion.
// Just hold on, and runners will disappear as long as the runner controller is up and running.
if !r.DeletionTimestamp.IsZero() {
continue
}
if r.Annotations != nil {
if a, ok := r.Annotations[SyncTimeAnnotationKey]; ok {
t, err := time.Parse(time.RFC3339, a)
if err == nil {
if lastSyncTime == nil || lastSyncTime.Before(t) {
lastSyncTime = &t
}
}
}
}
current += 1
if r.Status.Phase == string(corev1.PodRunning) {
ready += 1
// available is currently the same as ready, as we don't yet have minReadySeconds for runners
available += 1
}
}
}
var desired int
replicas := 1
if rs.Spec.Replicas != nil {
desired = *rs.Spec.Replicas
} else {
desired = 1
}
// TODO: remove this registration runner cleanup later (v0.23.0 or v0.24.0)
//
// We had to have a registration-only runner to support scale-from-zero before.
// But since Sep 2021 Actions update on GitHub Cloud and GHES 3.3, it is unneceesary.
// See the below issues for more contexts:
// https://github.com/actions-runner-controller/actions-runner-controller/issues/516
// https://github.com/actions-runner-controller/actions-runner-controller/issues/859
//
// In the below block, we have a logic to remove existing registration-only runners as unnecessary.
// This logic is introduced since actions-runner-controller 0.21.0 and probably last one or two minor releases
// so that actions-runner-controller instance in everyone's cluster won't leave dangling registration-only runners.
registrationOnlyRunnerNsName := req.NamespacedName
registrationOnlyRunnerNsName.Name = registrationOnlyRunnerNameFor(rs.Name)
registrationOnlyRunner := v1alpha1.Runner{}
registrationOnlyRunnerExists := false
if err := r.Get(
ctx,
registrationOnlyRunnerNsName,
&registrationOnlyRunner,
); err != nil {
if !kerrors.IsNotFound(err) {
return ctrl.Result{}, err
}
} else {
registrationOnlyRunnerExists = true
}
if registrationOnlyRunnerExists {
if err := r.Client.Delete(ctx, &registrationOnlyRunner); err != nil {
log.Error(err, "Retrying soon because we failed to delete registration-only runner")
return ctrl.Result{Requeue: true}, nil
}
replicas = *rs.Spec.Replicas
}
effectiveTime := rs.Spec.EffectiveTime
ephemeral := rs.Spec.Template.Spec.Ephemeral == nil || *rs.Spec.Template.Spec.Ephemeral
if current < desired && ephemeral && lastSyncTime != nil && effectiveTime != nil && lastSyncTime.After(effectiveTime.Time) {
log.V(1).Info("Detected that some ephemeral runners have disappeared. Usually this is due to that ephemeral runner completions so ARC does not create new runners until EffectiveTime is updated.", "lastSyncTime", metav1.Time{Time: *lastSyncTime}, "effectiveTime", *effectiveTime, "desired", desired, "available", current, "ready", ready)
} else if current > desired {
// If you use ephemeral runners with webhook-based autoscaler and the runner controller is working normally,
// you're unlikely to fall into this branch.
//
// That's becaseu all the stakeholders work like this:
//
// 1. A runner pod completes with the runner container exiting with code 0
// 2. ARC runner controller detects the pod completion, marks the runner resource on k8s for deletion (=Runner.DeletionTimestamp becomes non-zero)
// 3. GitHub triggers a corresponding workflow_job "complete" webhook event
// 4. ARC github-webhook-server (webhook-based autoscaler) receives the webhook event updates HRA with removing the oldest capacity reservation
// 5. ARC horizontalrunnerautoscaler updates RunnerDeployment's desired replicas based on capacity reservations
// 6. ARC runnerdeployment controller updates RunnerReplicaSet's desired replicas
// 7. (We're here) ARC runnerreplicaset controller (this controller) starts reconciling the RunnerReplicaSet
//
// In a normally working ARC installation, the runner that was used to run the workflow job should already have been
// marked for deletion by the runner controller.
// This runnerreplicaset controller doesn't count marked runners into the `current` value, hence you're unlikely to
// fall into this branch when you're using ephemeral runners with webhook-based-autoscaler.
desired, err := r.newRunner(rs)
if err != nil {
log.Error(err, "Could not create runner")
n := current - desired
log.V(0).Info(fmt.Sprintf("Deleting %d runners from RunnerReplicaSet %s", n, req.NamespacedName), "desired", desired, "current", current, "ready", ready)
// get runners that are currently offline/not busy/timed-out to register
var deletionCandidates []v1alpha1.Runner
for _, runner := range allRunners.Items {
busy, err := r.GitHubClient.IsRunnerBusy(ctx, runner.Spec.Enterprise, runner.Spec.Organization, runner.Spec.Repository, runner.Name)
if err != nil {
notRegistered := false
offline := false
var notFoundException *github.RunnerNotFound
var offlineException *github.RunnerOffline
if errors.As(err, &notFoundException) {
log.V(1).Info("Failed to check if runner is busy. Either this runner has never been successfully registered to GitHub or it still needs more time.", "runnerName", runner.Name)
notRegistered = true
} else if errors.As(err, &offlineException) {
offline = true
} else {
var e *gogithub.RateLimitError
if errors.As(err, &e) {
// We log the underlying error when we failed calling GitHub API to list or unregisters,
// or the runner is still busy.
log.Error(
err,
fmt.Sprintf(
"Failed to check if runner is busy due to GitHub API rate limit. Retrying in %s to avoid excessive GitHub API calls",
retryDelayOnGitHubAPIRateLimitError,
),
)
return ctrl.Result{RequeueAfter: retryDelayOnGitHubAPIRateLimitError}, err
}
return ctrl.Result{}, err
}
registrationTimeout := 15 * time.Minute
currentTime := time.Now()
registrationDidTimeout := currentTime.Sub(runner.CreationTimestamp.Add(registrationTimeout)) > 0
if notRegistered && registrationDidTimeout {
log.Info(
"Runner failed to register itself to GitHub in timely manner. "+
"Marking the runner for scale down. "+
"CAUTION: If you see this a lot, you should investigate the root cause. "+
"See https://github.com/actions-runner-controller/actions-runner-controller/issues/288",
"runnerCreationTimestamp", runner.CreationTimestamp,
"currentTime", currentTime,
"configuredRegistrationTimeout", registrationTimeout,
)
deletionCandidates = append(deletionCandidates, runner)
}
// offline runners should always be a great target for scale down
if offline {
deletionCandidates = append(deletionCandidates, runner)
}
} else if !busy {
deletionCandidates = append(deletionCandidates, runner)
}
}
if len(deletionCandidates) < n {
n = len(deletionCandidates)
}
log.V(0).Info(fmt.Sprintf("Deleting %d runner(s)", n), "desired", desired, "current", current, "ready", ready)
for i := 0; i < n; i++ {
if err := r.Client.Delete(ctx, &deletionCandidates[i]); client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to delete runner resource")
return ctrl.Result{}, err
}
r.Recorder.Event(&rs, corev1.EventTypeNormal, "RunnerDeleted", fmt.Sprintf("Deleted runner '%s'", deletionCandidates[i].Name))
log.Info(fmt.Sprintf("Deleted runner %s", deletionCandidates[i].Name))
}
} else if desired > current {
n := desired - current
log.V(0).Info(fmt.Sprintf("Creating %d runner(s)", n), "desired", desired, "available", current, "ready", ready)
for i := 0; i < n; i++ {
newRunner, err := r.newRunner(rs)
if err != nil {
log.Error(err, "Could not create runner")
return ctrl.Result{}, err
}
if err := r.Client.Create(ctx, &newRunner); err != nil {
log.Error(err, "Failed to create runner resource")
return ctrl.Result{}, err
}
}
return ctrl.Result{}, err
}
var status v1alpha1.RunnerReplicaSetStatus
var live []client.Object
for _, r := range runnerList.Items {
r := r
live = append(live, &r)
}
res, err := syncRunnerPodsOwners(ctx, r.Client, log, effectiveTime, replicas, func() client.Object { return desired.DeepCopy() }, ephemeral, live)
if err != nil || res == nil {
return ctrl.Result{}, err
}
var (
status v1alpha1.RunnerReplicaSetStatus
current, available, ready int
)
for _, o := range res.currentObjects {
current += o.total
available += o.running
ready += o.running
}
status.Replicas = &current
status.AvailableReplicas = &available
@ -322,6 +163,8 @@ func (r *RunnerReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
func (r *RunnerReplicaSetReconciler) newRunner(rs v1alpha1.RunnerReplicaSet) (v1alpha1.Runner, error) {
// Note that the upstream controller (runnerdeployment) is expected to add
// the "runner template hash" label to the template.meta which is necessary to make this controller work correctly
objectMeta := rs.Spec.Template.ObjectMeta.DeepCopy()
objectMeta.GenerateName = rs.ObjectMeta.Name + "-"

View File

@ -7,7 +7,6 @@ import (
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
@ -102,12 +101,40 @@ func intPtr(v int) *int {
var _ = Context("Inside of a new namespace", func() {
ctx := context.TODO()
ns := SetupTest(ctx)
name := "example-runnerreplicaset"
Describe("when no existing resources exist", func() {
getRunnerCount := func() int {
runners := actionsv1alpha1.RunnerList{Items: []actionsv1alpha1.Runner{}}
It("should create a new Runner resource from the specified template, add a another Runner on replicas increased, and removes all the replicas when set to 0", func() {
name := "example-runnerreplicaset"
selector, err := metav1.LabelSelectorAsSelector(
&metav1.LabelSelector{
MatchLabels: map[string]string{
"foo": "bar",
},
},
)
if err != nil {
logf.Log.Error(err, "failed to create labelselector")
return -1
}
err = k8sClient.List(
ctx,
&runners,
client.InNamespace(ns.Name),
client.MatchingLabelsSelector{Selector: selector},
)
if err != nil {
logf.Log.Error(err, "list runners")
}
runnersList.Sync(runners.Items)
return len(runners.Items)
}
Describe("RunnerReplicaSet", func() {
It("should create a new Runner resource from the specified template", func() {
{
rs := &actionsv1alpha1.RunnerReplicaSet{
ObjectMeta: metav1.ObjectMeta{
@ -146,126 +173,99 @@ var _ = Context("Inside of a new namespace", func() {
Expect(err).NotTo(HaveOccurred(), "failed to create test RunnerReplicaSet resource")
runners := actionsv1alpha1.RunnerList{Items: []actionsv1alpha1.Runner{}}
Eventually(
func() int {
selector, err := metav1.LabelSelectorAsSelector(
&metav1.LabelSelector{
MatchLabels: map[string]string{
"foo": "bar",
},
},
)
if err != nil {
logf.Log.Error(err, "failed to create labelselector")
return -1
}
err = k8sClient.List(
ctx,
&runners,
client.InNamespace(ns.Name),
client.MatchingLabelsSelector{Selector: selector},
)
if err != nil {
logf.Log.Error(err, "list runners")
return -1
}
runnersList.Sync(runners.Items)
return len(runners.Items)
},
time.Second*5, time.Millisecond*500).Should(BeEquivalentTo(1))
getRunnerCount,
time.Second*5, time.Second).Should(BeEquivalentTo(1))
}
})
It("should create 2 runners when specified 2 replicas", func() {
{
// We wrap the update in the Eventually block to avoid the below error that occurs due to concurrent modification
// made by the controller to update .Status.AvailableReplicas and .Status.ReadyReplicas
// Operation cannot be fulfilled on runnerreplicasets.actions.summerwind.dev "example-runnerreplicaset": the object has been modified; please apply your changes to the latest version and try again
Eventually(func() error {
var rs actionsv1alpha1.RunnerReplicaSet
err := k8sClient.Get(ctx, types.NamespacedName{Namespace: ns.Name, Name: name}, &rs)
Expect(err).NotTo(HaveOccurred(), "failed to get test RunnerReplicaSet resource")
rs.Spec.Replicas = intPtr(2)
return k8sClient.Update(ctx, &rs)
},
time.Second*1, time.Millisecond*500).Should(BeNil())
runners := actionsv1alpha1.RunnerList{Items: []actionsv1alpha1.Runner{}}
Eventually(
func() int {
selector, err := metav1.LabelSelectorAsSelector(
&metav1.LabelSelector{
MatchLabels: map[string]string{
"foo": "bar",
},
},
)
if err != nil {
logf.Log.Error(err, "failed to create labelselector")
return -1
}
err = k8sClient.List(
ctx,
&runners,
client.InNamespace(ns.Name),
client.MatchingLabelsSelector{Selector: selector},
)
if err != nil {
logf.Log.Error(err, "list runners")
}
runnersList.Sync(runners.Items)
return len(runners.Items)
rs := &actionsv1alpha1.RunnerReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns.Name,
},
time.Second*5, time.Millisecond*500).Should(BeEquivalentTo(2))
}
{
// We wrap the update in the Eventually block to avoid the below error that occurs due to concurrent modification
// made by the controller to update .Status.AvailableReplicas and .Status.ReadyReplicas
// Operation cannot be fulfilled on runnersets.actions.summerwind.dev "example-runnerset": the object has been modified; please apply your changes to the latest version and try again
Eventually(func() error {
var rs actionsv1alpha1.RunnerReplicaSet
err := k8sClient.Get(ctx, types.NamespacedName{Namespace: ns.Name, Name: name}, &rs)
Expect(err).NotTo(HaveOccurred(), "failed to get test RunnerReplicaSet resource")
rs.Spec.Replicas = intPtr(0)
return k8sClient.Update(ctx, &rs)
},
time.Second*1, time.Millisecond*500).Should(BeNil())
runners := actionsv1alpha1.RunnerList{Items: []actionsv1alpha1.Runner{}}
Eventually(
func() int {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
Spec: actionsv1alpha1.RunnerReplicaSetSpec{
Replicas: intPtr(2),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"foo": "bar",
},
})
Expect(err).ToNot(HaveOccurred())
if err := k8sClient.List(ctx, &runners, client.InNamespace(ns.Name), client.MatchingLabelsSelector{Selector: selector}); err != nil {
logf.Log.Error(err, "list runners")
return -1
}
runnersList.Sync(runners.Items)
return len(runners.Items)
},
Template: actionsv1alpha1.RunnerTemplate{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: actionsv1alpha1.RunnerSpec{
RunnerConfig: actionsv1alpha1.RunnerConfig{
Repository: "test/valid",
Image: "bar",
},
RunnerPodSpec: actionsv1alpha1.RunnerPodSpec{
Env: []corev1.EnvVar{
{Name: "FOO", Value: "FOOVALUE"},
},
},
},
},
},
time.Second*5, time.Millisecond*500).Should(BeEquivalentTo(0))
}
err := k8sClient.Create(ctx, rs)
Expect(err).NotTo(HaveOccurred(), "failed to create test RunnerReplicaSet resource")
Eventually(
getRunnerCount,
time.Second*5, time.Second).Should(BeEquivalentTo(2))
}
})
It("should not create any runners when specified 0 replicas", func() {
{
rs := &actionsv1alpha1.RunnerReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns.Name,
},
Spec: actionsv1alpha1.RunnerReplicaSetSpec{
Replicas: intPtr(0),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"foo": "bar",
},
},
Template: actionsv1alpha1.RunnerTemplate{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: actionsv1alpha1.RunnerSpec{
RunnerConfig: actionsv1alpha1.RunnerConfig{
Repository: "test/valid",
Image: "bar",
},
RunnerPodSpec: actionsv1alpha1.RunnerPodSpec{
Env: []corev1.EnvVar{
{Name: "FOO", Value: "FOOVALUE"},
},
},
},
},
},
}
err := k8sClient.Create(ctx, rs)
Expect(err).NotTo(HaveOccurred(), "failed to create test RunnerReplicaSet resource")
Consistently(
getRunnerCount,
time.Second*5, time.Second).Should(BeEquivalentTo(0))
}
})
})

View File

@ -18,13 +18,10 @@ package controllers
import (
"context"
"fmt"
"reflect"
"sort"
"time"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
@ -87,15 +84,6 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
metrics.SetRunnerSet(*runnerSet)
desiredStatefulSet, err := r.newStatefulSet(runnerSet)
if err != nil {
r.Recorder.Event(runnerSet, corev1.EventTypeNormal, "RunnerAutoscalingFailure", err.Error())
log.Error(err, "Could not create statefulset")
return ctrl.Result{}, err
}
var statefulsetList appsv1.StatefulSetList
if err := r.List(ctx, &statefulsetList, client.InNamespace(req.Namespace), client.MatchingFields{runnerSetOwnerKey: req.Name}); err != nil {
return ctrl.Result{}, err
@ -108,164 +96,18 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil
}
desiredTemplateHash, ok := getStatefulSetTemplateHash(desiredStatefulSet)
if !ok {
log.Info("Failed to get template hash of desired statefulset. It must be in an invalid state. Please manually delete the statefulset so that it is recreated")
desiredStatefulSet, err := r.newStatefulSet(runnerSet)
if err != nil {
r.Recorder.Event(runnerSet, corev1.EventTypeNormal, "RunnerAutoscalingFailure", err.Error())
return ctrl.Result{}, nil
log.Error(err, "Could not create statefulset")
return ctrl.Result{}, err
}
statefulsetsPerTemplateHash := map[string][]*podsForStatefulset{}
// # Why do we recreate statefulsets instead of updating their desired replicas?
//
// A statefulset cannot add more pods when not all the pods are running.
// Our ephemeral runners' pods that have finished running become Completed(Phase=Succeeded).
// So creating one statefulset per a batch of ephemeral runners is the only way for us to add more replicas.
//
// # Why do we recreate statefulsets instead of updating fields other than replicas?
//
// That's because Kubernetes doesn't allow updating anything other than replicas, template, and updateStrategy.
// And the nature of ephemeral runner pods requires you to create a statefulset per a batch of new runner pods so
// we have really no other choice.
//
// If you're curious, the below is the error message you will get when you tried to update forbidden StatefulSet field(s):
//
// 2021-06-13T07:19:52.760Z ERROR actions-runner-controller.runnerset Failed to patch statefulset
// {"runnerset": "default/example-runnerset", "error": "StatefulSet.apps \"example-runnerset\" is invalid: s
// pec: Forbidden: updates to statefulset spec for fields other than 'replicas', 'template', and 'updateStrategy'
// are forbidden"}
//
// Even though the error message includes "Forbidden", this error's reason is "Invalid".
// So we used to match these errors by using errors.IsInvalid. But that's another story...
// lastSyncTime becomes non-nil only when there are one or more statefulset(s) hence there are same number of runner pods.
// It's used to prevent runnerset-controller from recreating "completed ephemeral runners".
// This is needed to prevent runners from being terminated prematurely.
// See https://github.com/actions-runner-controller/actions-runner-controller/issues/911 for more context.
//
// This becomes nil when there are zero statefulset(s). That's fine because then there should be zero stateful(s) to be recreated either hence
// we don't need to guard with lastSyncTime.
var lastSyncTime *time.Time
for _, ss := range statefulsets {
ss := ss
log := log.WithValues("statefulset", types.NamespacedName{Namespace: ss.Namespace, Name: ss.Name})
res, err := r.getPodsForStatefulset(ctx, log, &ss)
if err != nil {
return ctrl.Result{}, err
}
// Statefulset termination process 4/4: Let Kubernetes cascade-delete the statefulset and the pods.
if !res.statefulset.DeletionTimestamp.IsZero() {
continue
}
// Statefulset termination process 3/4: Set the deletionTimestamp to let Kubernetes start a cascade deletion of the statefulset and the pods.
if _, ok := getAnnotation(&res.statefulset.ObjectMeta, AnnotationKeyUnregistrationCompleteTimestamp); ok {
if err := r.Client.Delete(ctx, res.statefulset); err != nil {
log.Error(err, "Failed to delete statefulset")
return ctrl.Result{}, err
}
continue
}
// Statefulset termination process 2/4: Set unregistrationCompleteTimestamp only if all the pods managed by the statefulset
// have either unregistered or being deleted.
if _, ok := getAnnotation(&res.statefulset.ObjectMeta, AnnotationKeyUnregistrationRequestTimestamp); ok {
var deletionSafe int
for _, po := range res.pods {
if _, ok := getAnnotation(&po.ObjectMeta, AnnotationKeyUnregistrationCompleteTimestamp); ok {
deletionSafe++
} else if !po.DeletionTimestamp.IsZero() {
deletionSafe++
}
}
log.V(2).Info("Marking statefulset for unregistration completion", "deletionSafe", deletionSafe, "total", res.total)
if deletionSafe == res.total {
if _, ok := getAnnotation(&res.statefulset.ObjectMeta, AnnotationKeyUnregistrationCompleteTimestamp); !ok {
updated := res.statefulset.DeepCopy()
setAnnotation(&updated.ObjectMeta, AnnotationKeyUnregistrationCompleteTimestamp, time.Now().Format(time.RFC3339))
if err := r.Client.Patch(ctx, updated, client.MergeFrom(res.statefulset)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch statefulset to have %s annotation", AnnotationKeyUnregistrationCompleteTimestamp))
return ctrl.Result{}, err
}
log.V(2).Info("Redundant statefulset has been annotated to start the deletion")
} else {
log.V(2).Info("BUG: Redundant statefulset was already annotated to start the deletion")
}
}
continue
}
if res.statefulset.Annotations != nil {
if a, ok := res.statefulset.Annotations[SyncTimeAnnotationKey]; ok {
t, err := time.Parse(time.RFC3339, a)
if err == nil {
if lastSyncTime == nil || lastSyncTime.Before(t) {
lastSyncTime = &t
}
}
}
}
statefulsetsPerTemplateHash[res.templateHash] = append(statefulsetsPerTemplateHash[res.templateHash], res)
// A completed statefulset or a completed pod can safely be deleted without
// a race condition so delete it here,
// so that the later process can be a bit simpler.
if res.total > 0 && res.total == res.completed {
if err := r.Client.Delete(ctx, &ss); err != nil {
log.Error(err, "Unable to delete statefulset")
return ctrl.Result{}, err
}
log.V(2).Info("Deleted completed statefulset")
return ctrl.Result{}, nil
}
var replicas int32 = 1
if ss.Spec.Replicas != nil {
replicas = *ss.Spec.Replicas
}
if ss.Status.Replicas != replicas {
log.V(2).Info("Waiting for statefulset to sync", "desiredReplicas", replicas, "currentReplicas", ss.Status.Replicas)
return ctrl.Result{}, nil
}
}
currentStatefulSets := statefulsetsPerTemplateHash[desiredTemplateHash]
sort.SliceStable(currentStatefulSets, func(i, j int) bool {
return currentStatefulSets[i].statefulset.CreationTimestamp.Before(&currentStatefulSets[j].statefulset.CreationTimestamp)
})
if len(currentStatefulSets) > 0 {
timestampFirst := currentStatefulSets[0].statefulset.CreationTimestamp
timestampLast := currentStatefulSets[len(currentStatefulSets)-1].statefulset.CreationTimestamp
var names []string
for _, ss := range currentStatefulSets {
names = append(names, ss.statefulset.Name)
}
log.V(2).Info("Detected some current statefulsets", "creationTimestampFirst", timestampFirst, "creationTimestampLast", timestampLast, "statefulsets", names)
}
var pending, running, regTimeout int
for _, ss := range currentStatefulSets {
pending += ss.pending
running += ss.running
regTimeout += ss.regTimeout
}
addedReplicas := int32(1)
create := desiredStatefulSet.DeepCopy()
create.Spec.Replicas = &addedReplicas
const defaultReplicas = 1
@ -277,123 +119,28 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
newDesiredReplicas := getIntOrDefault(replicasOfDesiredStatefulSet, defaultReplicas)
log.V(2).Info(
"Found some pods across statefulset(s)",
"pending", pending,
"running", running,
"regTimeout", regTimeout,
"desired", newDesiredReplicas,
"statefulsets", len(statefulsets),
)
effectiveTime := runnerSet.Spec.EffectiveTime
ephemeral := runnerSet.Spec.Ephemeral == nil || *runnerSet.Spec.Ephemeral
maybeRunning := pending + running
if newDesiredReplicas > maybeRunning && ephemeral && lastSyncTime != nil && effectiveTime != nil && lastSyncTime.After(effectiveTime.Time) {
log.V(2).Info("Detected that some ephemeral runners have disappeared. Usually this is due to that ephemeral runner completions so ARC does not create new runners until EffectiveTime is updated.", "lastSyncTime", metav1.Time{Time: *lastSyncTime}, "effectiveTime", *effectiveTime, "desired", newDesiredReplicas, "pending", pending, "running", running)
} else if newDesiredReplicas > maybeRunning {
num := newDesiredReplicas - maybeRunning
var owners []client.Object
for i := 0; i < num; i++ {
// Add more replicas
addedReplicas := int32(1)
create := desiredStatefulSet.DeepCopy()
create.Spec.Replicas = &addedReplicas
if err := r.Client.Create(ctx, create); err != nil {
return ctrl.Result{}, err
}
}
log.V(2).Info("Created statefulset(s) to add more replicas", "num", num)
return ctrl.Result{}, nil
} else if newDesiredReplicas <= running {
var retained int
var delete []*podsForStatefulset
for i := len(currentStatefulSets) - 1; i >= 0; i-- {
ss := currentStatefulSets[i]
if ss.running == 0 || retained >= newDesiredReplicas {
// In case the desired replicas is satisfied until i-1, or this statefulset has no running pods,
// this statefulset can be considered safe for deletion.
// Note that we already waited on this statefulset to create pods by waiting for
// `ss.Status.Replicas`(=total number of pods managed by statefulset, regarldess of the runner is Running or Completed) to match the desired replicas in a previous step.
// So `ss.running == 0` means "the statefulset has created the desired number of pods before but all of them are completed now".
delete = append(delete, ss)
} else if retained < newDesiredReplicas {
retained += ss.running
}
}
if retained == newDesiredReplicas {
for _, ss := range delete {
log := log.WithValues("statefulset", types.NamespacedName{Namespace: ss.statefulset.Namespace, Name: ss.statefulset.Name})
// Statefulset termination process 1/4: Set unregistrationRequestTimestamp only after all the pods managed by the statefulset have
// started unregistreation process.
//
// NOTE: We just mark it instead of immediately starting the deletion process.
// Otherwise, the runner pod may hit termiationGracePeriod before the unregistration completes(the max terminationGracePeriod is limited to 1h by K8s and a job can be run for more than that),
// or actions/runner may potentially misbehave on SIGTERM immediately sent by K8s.
// We'd better unregister first and then start a pod deletion process.
// The annotation works as a mark to start the pod unregistration and deletion process of ours.
for _, po := range ss.pods {
if _, err := annotatePodOnce(ctx, r.Client, log, &po, AnnotationKeyUnregistrationRequestTimestamp, time.Now().Format(time.RFC3339)); err != nil {
return ctrl.Result{}, err
}
}
if _, ok := getAnnotation(&ss.statefulset.ObjectMeta, AnnotationKeyUnregistrationRequestTimestamp); !ok {
updated := ss.statefulset.DeepCopy()
setAnnotation(&updated.ObjectMeta, AnnotationKeyUnregistrationRequestTimestamp, time.Now().Format(time.RFC3339))
if err := r.Client.Patch(ctx, updated, client.MergeFrom(ss.statefulset)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch statefulset to have %s annotation", AnnotationKeyUnregistrationRequestTimestamp))
return ctrl.Result{}, err
}
log.V(2).Info("Redundant statefulset has been annotated to start the unregistration before deletion")
} else {
log.V(2).Info("BUG: Redundant statefulset was already annotated")
}
}
return ctrl.Result{}, err
} else if retained > newDesiredReplicas {
log.V(2).Info("Waiting sync before scale down", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
return ctrl.Result{}, nil
} else {
log.Info("Invalid state", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
panic("crashed due to invalid state")
}
for _, ss := range statefulsets {
ss := ss
owners = append(owners, &ss)
}
for _, sss := range statefulsetsPerTemplateHash {
for _, ss := range sss {
if ss.templateHash != desiredTemplateHash {
if ss.statefulset.DeletionTimestamp.IsZero() {
if err := r.Client.Delete(ctx, ss.statefulset); err != nil {
log.Error(err, "Unable to delete statefulset")
return ctrl.Result{}, err
}
log.V(2).Info("Deleted redundant and outdated statefulset")
}
return ctrl.Result{}, nil
}
}
res, err := syncRunnerPodsOwners(ctx, r.Client, log, effectiveTime, newDesiredReplicas, func() client.Object { return create.DeepCopy() }, ephemeral, owners)
if err != nil || res == nil {
return ctrl.Result{}, err
}
var statusReplicas, statusReadyReplicas, totalCurrentReplicas, updatedReplicas int
for _, ss := range currentStatefulSets {
statusReplicas += int(ss.statefulset.Status.Replicas)
statusReadyReplicas += int(ss.statefulset.Status.ReadyReplicas)
totalCurrentReplicas += int(ss.statefulset.Status.CurrentReplicas)
updatedReplicas += int(ss.statefulset.Status.UpdatedReplicas)
for _, ss := range res.currentObjects {
statusReplicas += int(ss.statefulSet.Status.Replicas)
statusReadyReplicas += int(ss.statefulSet.Status.ReadyReplicas)
totalCurrentReplicas += int(ss.statefulSet.Status.CurrentReplicas)
updatedReplicas += int(ss.statefulSet.Status.UpdatedReplicas)
}
status := runnerSet.Status.DeepCopy()
@ -419,91 +166,6 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil
}
type podsForStatefulset struct {
total int
completed int
running int
terminating int
regTimeout int
pending int
templateHash string
statefulset *appsv1.StatefulSet
pods []corev1.Pod
}
func (r *RunnerSetReconciler) getPodsForStatefulset(ctx context.Context, log logr.Logger, ss *appsv1.StatefulSet) (*podsForStatefulset, error) {
var podList corev1.PodList
if err := r.Client.List(ctx, &podList, client.MatchingLabels(ss.Spec.Template.ObjectMeta.Labels)); err != nil {
log.Error(err, "Failed to list pods managed by statefulset")
return nil, err
}
var completed, running, terminating, regTimeout, pending, total int
var pods []corev1.Pod
for _, pod := range podList.Items {
if owner := metav1.GetControllerOf(&pod); owner == nil || owner.Kind != "StatefulSet" || owner.Name != ss.Name {
continue
}
pods = append(pods, pod)
total++
if runnerPodOrContainerIsStopped(&pod) {
completed++
} else if pod.Status.Phase == corev1.PodRunning {
if podRunnerID(&pod) == "" && podConditionTransitionTimeAfter(&pod, corev1.PodReady, registrationTimeout) {
log.Info(
"Runner failed to register itself to GitHub in timely manner. "+
"Recreating the pod to see if it resolves the issue. "+
"CAUTION: If you see this a lot, you should investigate the root cause. "+
"See https://github.com/actions-runner-controller/actions-runner-controller/issues/288",
"creationTimestamp", pod.CreationTimestamp,
"readyTransitionTime", podConditionTransitionTime(&pod, corev1.PodReady, corev1.ConditionTrue),
"configuredRegistrationTimeout", registrationTimeout,
)
regTimeout++
} else {
running++
}
} else if !pod.DeletionTimestamp.IsZero() {
terminating++
} else {
// pending includes running but timedout runner's pod too
pending++
}
}
templateHash, ok := getStatefulSetTemplateHash(ss)
if !ok {
log.Info("Failed to get template hash of statefulset. It must be in an invalid state. Please manually delete the statefulset so that it is recreated")
return nil, nil
}
return &podsForStatefulset{
total: total,
completed: completed,
running: running,
terminating: terminating,
regTimeout: regTimeout,
pending: pending,
templateHash: templateHash,
statefulset: ss,
pods: pods,
}, nil
}
func getStatefulSetTemplateHash(rs *appsv1.StatefulSet) (string, bool) {
hash, ok := rs.Labels[LabelKeyRunnerTemplateHash]
return hash, ok
}
func getRunnerSetSelector(runnerSet *v1alpha1.RunnerSet) *metav1.LabelSelector {
selector := runnerSet.Spec.Selector
if selector == nil {
@ -523,17 +185,12 @@ func (r *RunnerSetReconciler) newStatefulSet(runnerSet *v1alpha1.RunnerSet) (*ap
runnerSetWithOverrides.Labels = append(runnerSetWithOverrides.Labels, l)
}
// This label selector is used by default when rd.Spec.Selector is empty.
runnerSetWithOverrides.Template.ObjectMeta.Labels = CloneAndAddLabel(runnerSetWithOverrides.Template.ObjectMeta.Labels, LabelKeyRunnerSetName, runnerSet.Name)
runnerSetWithOverrides.Template.ObjectMeta.Labels = CloneAndAddLabel(runnerSetWithOverrides.Template.ObjectMeta.Labels, LabelKeyPodMutation, LabelValuePodMutation)
template := corev1.Pod{
ObjectMeta: runnerSetWithOverrides.StatefulSetSpec.Template.ObjectMeta,
Spec: runnerSetWithOverrides.StatefulSetSpec.Template.Spec,
}
pod, err := newRunnerPod(template, runnerSet.Spec.RunnerConfig, r.RunnerImage, r.RunnerImagePullSecrets, r.DockerImage, r.DockerRegistryMirror, r.GitHubBaseURL, false)
pod, err := newRunnerPod(runnerSet.Name, template, runnerSet.Spec.RunnerConfig, r.RunnerImage, r.RunnerImagePullSecrets, r.DockerImage, r.DockerRegistryMirror, r.GitHubBaseURL, false)
if err != nil {
return nil, err
}

View File

@ -265,6 +265,8 @@ func (e *env) installActionsRunnerController(t *testing.T) {
if e.useRunnerSet {
scriptEnv = append(scriptEnv, "USE_RUNNERSET=1")
} else {
scriptEnv = append(scriptEnv, "USE_RUNNERSET=false")
}
varEnv := []string{