refactor: Extract runner pod owner management out of runnerset controller

so that it can potentially be reusable from runnerreplicaset controller
This commit is contained in:
Yusuke Kuoka 2022-03-05 10:41:52 +00:00
parent 95a5770d55
commit c95e84a528
4 changed files with 536 additions and 363 deletions

View File

@ -52,7 +52,7 @@ func annotatePodOnce(ctx context.Context, c client.Client, log logr.Logger, pod
return nil, nil return nil, nil
} }
if _, ok := getAnnotation(&pod.ObjectMeta, k); ok { if _, ok := getAnnotation(pod, k); ok {
return pod, nil return pod, nil
} }
@ -72,7 +72,7 @@ func annotatePodOnce(ctx context.Context, c client.Client, log logr.Logger, pod
func ensureRunnerUnregistration(ctx context.Context, unregistrationTimeout time.Duration, retryDelay time.Duration, log logr.Logger, ghClient *github.Client, enterprise, organization, repository, runner string, pod *corev1.Pod) (*ctrl.Result, error) { func ensureRunnerUnregistration(ctx context.Context, unregistrationTimeout time.Duration, retryDelay time.Duration, log logr.Logger, ghClient *github.Client, enterprise, organization, repository, runner string, pod *corev1.Pod) (*ctrl.Result, error) {
var runnerID *int64 var runnerID *int64
if id, ok := getAnnotation(&pod.ObjectMeta, AnnotationKeyRunnerID); ok { if id, ok := getAnnotation(pod, AnnotationKeyRunnerID); ok {
v, err := strconv.ParseInt(id, 10, 64) v, err := strconv.ParseInt(id, 10, 64)
if err != nil { if err != nil {
return &ctrl.Result{}, err return &ctrl.Result{}, err
@ -175,7 +175,7 @@ func ensureRunnerUnregistration(ctx context.Context, unregistrationTimeout time.
} }
func ensureRunnerPodRegistered(ctx context.Context, log logr.Logger, ghClient *github.Client, c client.Client, enterprise, organization, repository, runner string, pod *corev1.Pod) (*corev1.Pod, *ctrl.Result, error) { func ensureRunnerPodRegistered(ctx context.Context, log logr.Logger, ghClient *github.Client, c client.Client, enterprise, organization, repository, runner string, pod *corev1.Pod) (*corev1.Pod, *ctrl.Result, error) {
_, hasRunnerID := getAnnotation(&pod.ObjectMeta, AnnotationKeyRunnerID) _, hasRunnerID := getAnnotation(pod, AnnotationKeyRunnerID)
if runnerPodOrContainerIsStopped(pod) || hasRunnerID { if runnerPodOrContainerIsStopped(pod) || hasRunnerID {
return pod, nil, nil return pod, nil, nil
} }
@ -199,12 +199,12 @@ func ensureRunnerPodRegistered(ctx context.Context, log logr.Logger, ghClient *g
return updated, nil, nil return updated, nil, nil
} }
func getAnnotation(meta *metav1.ObjectMeta, key string) (string, bool) { func getAnnotation(obj client.Object, key string) (string, bool) {
if meta.Annotations == nil { if obj.GetAnnotations() == nil {
return "", false return "", false
} }
v, ok := meta.Annotations[key] v, ok := obj.GetAnnotations()[key]
return v, ok return v, ok
} }
@ -237,7 +237,7 @@ func podConditionTransitionTimeAfter(pod *corev1.Pod, tpe corev1.PodConditionTyp
} }
func podRunnerID(pod *corev1.Pod) string { func podRunnerID(pod *corev1.Pod) string {
id, _ := getAnnotation(&pod.ObjectMeta, AnnotationKeyRunnerID) id, _ := getAnnotation(pod, AnnotationKeyRunnerID)
return id return id
} }

View File

@ -165,7 +165,7 @@ func (r *RunnerPodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
runnerPod = *po runnerPod = *po
if _, unregistrationRequested := getAnnotation(&runnerPod.ObjectMeta, AnnotationKeyUnregistrationRequestTimestamp); unregistrationRequested { if _, unregistrationRequested := getAnnotation(&runnerPod, AnnotationKeyUnregistrationRequestTimestamp); unregistrationRequested {
log.V(2).Info("Progressing unregistration because unregistration-request timestamp is set") log.V(2).Info("Progressing unregistration because unregistration-request timestamp is set")
// At this point we're sure that DeletionTimestamp is not set yet, but the unregistration process is triggered by an upstream controller like runnerset-controller. // At this point we're sure that DeletionTimestamp is not set yet, but the unregistration process is triggered by an upstream controller like runnerset-controller.

View File

@ -0,0 +1,504 @@
package controllers
import (
"context"
"fmt"
"sort"
"time"
"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type podsForOwner struct {
total int
completed int
running int
terminating int
regTimeout int
pending int
templateHash string
runner *v1alpha1.Runner
statefulSet *appsv1.StatefulSet
owner owner
synced bool
pods []corev1.Pod
}
type owner interface {
client.Object
pods(context.Context, client.Client) ([]corev1.Pod, error)
templateHash() (string, bool)
withAnnotation(k, v string) client.Object
synced() bool
}
type ownerRunner struct {
client.Object
Log logr.Logger
Runner *v1alpha1.Runner
}
var _ owner = (*ownerRunner)(nil)
func (r *ownerRunner) pods(ctx context.Context, c client.Client) ([]corev1.Pod, error) {
var pod corev1.Pod
if err := c.Get(ctx, types.NamespacedName{Namespace: r.Runner.Namespace, Name: r.Runner.Name}, &pod); err != nil {
r.Log.Error(err, "Failed to get pod managed by runner")
return nil, err
}
return []corev1.Pod{pod}, nil
}
func (r *ownerRunner) templateHash() (string, bool) {
return getRunnerTemplateHash(r.Runner)
}
func (r *ownerRunner) withAnnotation(k, v string) client.Object {
copy := r.Runner.DeepCopy()
setAnnotation(&copy.ObjectMeta, k, v)
return copy
}
func (r *ownerRunner) synced() bool {
return true
}
type ownerStatefulSet struct {
client.Object
Log logr.Logger
StatefulSet *appsv1.StatefulSet
}
var _ owner = (*ownerStatefulSet)(nil)
func (s *ownerStatefulSet) pods(ctx context.Context, c client.Client) ([]corev1.Pod, error) {
var podList corev1.PodList
if err := c.List(ctx, &podList, client.MatchingLabels(s.StatefulSet.Spec.Template.ObjectMeta.Labels)); err != nil {
s.Log.Error(err, "Failed to list pods managed by statefulset")
return nil, err
}
var pods []corev1.Pod
for _, pod := range podList.Items {
if owner := metav1.GetControllerOf(&pod); owner == nil || owner.Kind != "StatefulSet" || owner.Name != s.StatefulSet.Name {
continue
}
pods = append(pods, pod)
}
return pods, nil
}
func (s *ownerStatefulSet) templateHash() (string, bool) {
return getStatefulSetTemplateHash(s.StatefulSet)
}
func (s *ownerStatefulSet) withAnnotation(k, v string) client.Object {
copy := s.StatefulSet.DeepCopy()
setAnnotation(&copy.ObjectMeta, k, v)
return copy
}
func (s *ownerStatefulSet) synced() bool {
var replicas int32 = 1
if s.StatefulSet.Spec.Replicas != nil {
replicas = *s.StatefulSet.Spec.Replicas
}
if s.StatefulSet.Status.Replicas != replicas {
s.Log.V(2).Info("Waiting for statefulset to sync", "desiredReplicas", replicas, "currentReplicas", s.StatefulSet.Status.Replicas)
return false
}
return true
}
func getPodsForOwner(ctx context.Context, c client.Client, log logr.Logger, o client.Object) (*podsForOwner, error) {
var (
owner owner
runner *v1alpha1.Runner
statefulSet *appsv1.StatefulSet
)
switch v := o.(type) {
case *v1alpha1.Runner:
owner = &ownerRunner{
Object: v,
Log: log,
Runner: v,
}
runner = v
case *appsv1.StatefulSet:
owner = &ownerStatefulSet{
Object: v,
Log: log,
StatefulSet: v,
}
statefulSet = v
default:
return nil, fmt.Errorf("BUG: Unsupported runner pods owner %v(%T)", v, v)
}
pods, err := owner.pods(ctx, c)
if err != nil {
return nil, err
}
var completed, running, terminating, regTimeout, pending, total int
for _, pod := range pods {
total++
if runnerPodOrContainerIsStopped(&pod) {
completed++
} else if pod.Status.Phase == corev1.PodRunning {
if podRunnerID(&pod) == "" && podConditionTransitionTimeAfter(&pod, corev1.PodReady, registrationTimeout) {
log.Info(
"Runner failed to register itself to GitHub in timely manner. "+
"Recreating the pod to see if it resolves the issue. "+
"CAUTION: If you see this a lot, you should investigate the root cause. "+
"See https://github.com/actions-runner-controller/actions-runner-controller/issues/288",
"creationTimestamp", pod.CreationTimestamp,
"readyTransitionTime", podConditionTransitionTime(&pod, corev1.PodReady, corev1.ConditionTrue),
"configuredRegistrationTimeout", registrationTimeout,
)
regTimeout++
} else {
running++
}
} else if !pod.DeletionTimestamp.IsZero() {
terminating++
} else {
// pending includes running but timedout runner's pod too
pending++
}
}
templateHash, ok := owner.templateHash()
if !ok {
log.Info("Failed to get template hash of statefulset. It must be in an invalid state. Please manually delete the statefulset so that it is recreated")
return nil, nil
}
synced := owner.synced()
return &podsForOwner{
total: total,
completed: completed,
running: running,
terminating: terminating,
regTimeout: regTimeout,
pending: pending,
templateHash: templateHash,
runner: runner,
statefulSet: statefulSet,
owner: owner,
synced: synced,
pods: pods,
}, nil
}
func getRunnerTemplateHash(r *v1alpha1.Runner) (string, bool) {
hash, ok := r.Labels[LabelKeyRunnerTemplateHash]
return hash, ok
}
func getStatefulSetTemplateHash(rs *appsv1.StatefulSet) (string, bool) {
hash, ok := rs.Labels[LabelKeyRunnerTemplateHash]
return hash, ok
}
type state struct {
podsForOwners map[string][]*podsForOwner
lastSyncTime *time.Time
}
type result struct {
currentObjects []*podsForOwner
}
func syncRunnerPodsOwners(ctx context.Context, c client.Client, log logr.Logger, effectiveTime *metav1.Time, newDesiredReplicas int, desiredTemplateHash string, create client.Object, ephemeral bool, owners []client.Object) (*result, error) {
state, err := collectPodsForOwners(ctx, c, log, owners)
if err != nil || state == nil {
return nil, err
}
podsForOwnersPerTemplateHash, lastSyncTime := state.podsForOwners, state.lastSyncTime
// # Why do we recreate statefulsets instead of updating their desired replicas?
//
// A statefulset cannot add more pods when not all the pods are running.
// Our ephemeral runners' pods that have finished running become Completed(Phase=Succeeded).
// So creating one statefulset per a batch of ephemeral runners is the only way for us to add more replicas.
//
// # Why do we recreate statefulsets instead of updating fields other than replicas?
//
// That's because Kubernetes doesn't allow updating anything other than replicas, template, and updateStrategy.
// And the nature of ephemeral runner pods requires you to create a statefulset per a batch of new runner pods so
// we have really no other choice.
//
// If you're curious, the below is the error message you will get when you tried to update forbidden StatefulSet field(s):
//
// 2021-06-13T07:19:52.760Z ERROR actions-runner-controller.runnerset Failed to patch statefulset
// {"runnerset": "default/example-runnerset", "error": "StatefulSet.apps \"example-runnerset\" is invalid: s
// pec: Forbidden: updates to statefulset spec for fields other than 'replicas', 'template', and 'updateStrategy'
// are forbidden"}
//
// Even though the error message includes "Forbidden", this error's reason is "Invalid".
// So we used to match these errors by using errors.IsInvalid. But that's another story...
currentObjects := podsForOwnersPerTemplateHash[desiredTemplateHash]
sort.SliceStable(currentObjects, func(i, j int) bool {
return currentObjects[i].owner.GetCreationTimestamp().Time.Before(currentObjects[j].owner.GetCreationTimestamp().Time)
})
if len(currentObjects) > 0 {
timestampFirst := currentObjects[0].owner.GetCreationTimestamp()
timestampLast := currentObjects[len(currentObjects)-1].owner.GetCreationTimestamp()
var names []string
for _, ss := range currentObjects {
names = append(names, ss.owner.GetName())
}
log.V(2).Info("Detected some current object(s)", "creationTimestampFirst", timestampFirst, "creationTimestampLast", timestampLast, "names", names)
}
var pending, running, regTimeout int
for _, ss := range currentObjects {
pending += ss.pending
running += ss.running
regTimeout += ss.regTimeout
}
log.V(2).Info(
"Found some pods across owner(s)",
"pending", pending,
"running", running,
"regTimeout", regTimeout,
"desired", newDesiredReplicas,
"owners", len(owners),
)
maybeRunning := pending + running
if newDesiredReplicas > maybeRunning && ephemeral && lastSyncTime != nil && effectiveTime != nil && lastSyncTime.After(effectiveTime.Time) {
log.V(2).Info("Detected that some ephemeral runners have disappeared. Usually this is due to that ephemeral runner completions so ARC does not create new runners until EffectiveTime is updated.", "lastSyncTime", metav1.Time{Time: *lastSyncTime}, "effectiveTime", *effectiveTime, "desired", newDesiredReplicas, "pending", pending, "running", running)
} else if newDesiredReplicas > maybeRunning {
num := newDesiredReplicas - maybeRunning
for i := 0; i < num; i++ {
// Add more replicas
if err := c.Create(ctx, create); err != nil {
return nil, err
}
}
log.V(2).Info("Created object(s) to add more replicas", "num", num)
return nil, nil
} else if newDesiredReplicas <= running {
var retained int
var delete []*podsForOwner
for i := len(currentObjects) - 1; i >= 0; i-- {
ss := currentObjects[i]
if ss.running == 0 || retained >= newDesiredReplicas {
// In case the desired replicas is satisfied until i-1, or this owner has no running pods,
// this owner can be considered safe for deletion.
// Note that we already waited on this owner to create pods by waiting for
// `.Status.Replicas`(=total number of pods managed by owner, regardless of the runner is Running or Completed) to match the desired replicas in a previous step.
// So `.running == 0` means "the owner has created the desired number of pods before, and all of them are completed now".
delete = append(delete, ss)
} else if retained < newDesiredReplicas {
retained += ss.running
}
}
if retained == newDesiredReplicas {
for _, ss := range delete {
log := log.WithValues("owner", types.NamespacedName{Namespace: ss.owner.GetNamespace(), Name: ss.owner.GetName()})
// Statefulset termination process 1/4: Set unregistrationRequestTimestamp only after all the pods managed by the statefulset have
// started unregistreation process.
//
// NOTE: We just mark it instead of immediately starting the deletion process.
// Otherwise, the runner pod may hit termiationGracePeriod before the unregistration completes(the max terminationGracePeriod is limited to 1h by K8s and a job can be run for more than that),
// or actions/runner may potentially misbehave on SIGTERM immediately sent by K8s.
// We'd better unregister first and then start a pod deletion process.
// The annotation works as a mark to start the pod unregistration and deletion process of ours.
for _, po := range ss.pods {
if _, err := annotatePodOnce(ctx, c, log, &po, AnnotationKeyUnregistrationRequestTimestamp, time.Now().Format(time.RFC3339)); err != nil {
return nil, err
}
}
if _, ok := getAnnotation(ss.owner, AnnotationKeyUnregistrationRequestTimestamp); !ok {
updated := ss.owner.withAnnotation(AnnotationKeyUnregistrationRequestTimestamp, time.Now().Format(time.RFC3339))
if err := c.Patch(ctx, updated, client.MergeFrom(ss.owner)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch object to have %s annotation", AnnotationKeyUnregistrationRequestTimestamp))
return nil, err
}
log.V(2).Info("Redundant object has been annotated to start the unregistration before deletion")
} else {
log.V(2).Info("BUG: Redundant object was already annotated")
}
}
return nil, err
} else if retained > newDesiredReplicas {
log.V(2).Info("Waiting sync before scale down", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
return nil, nil
} else {
log.Info("Invalid state", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
panic("crashed due to invalid state")
}
}
for _, sss := range podsForOwnersPerTemplateHash {
for _, ss := range sss {
if ss.templateHash != desiredTemplateHash {
if ss.owner.GetDeletionTimestamp().IsZero() {
if err := c.Delete(ctx, ss.owner); err != nil {
log.Error(err, "Unable to delete object")
return nil, err
}
log.V(2).Info("Deleted redundant and outdated object")
}
return nil, nil
}
}
}
return &result{
currentObjects: currentObjects,
}, nil
}
func collectPodsForOwners(ctx context.Context, c client.Client, log logr.Logger, owners []client.Object) (*state, error) {
podsForOwnerPerTemplateHash := map[string][]*podsForOwner{}
// lastSyncTime becomes non-nil only when there are one or more owner(s) hence there are same number of runner pods.
// It's used to prevent runnerset-controller from recreating "completed ephemeral runners".
// This is needed to prevent runners from being terminated prematurely.
// See https://github.com/actions-runner-controller/actions-runner-controller/issues/911 for more context.
//
// This becomes nil when there are zero statefulset(s). That's fine because then there should be zero stateful(s) to be recreated either hence
// we don't need to guard with lastSyncTime.
var lastSyncTime *time.Time
for _, ss := range owners {
log := log.WithValues("owner", types.NamespacedName{Namespace: ss.GetNamespace(), Name: ss.GetName()})
res, err := getPodsForOwner(ctx, c, log, ss)
if err != nil {
return nil, err
}
// Statefulset termination process 4/4: Let Kubernetes cascade-delete the statefulset and the pods.
//
// If the runner is already marked for deletion(=has a non-zero deletion timestamp) by the runner controller (can be caused by an ephemeral runner completion)
// or by this controller (in case it was deleted in the previous reconcilation loop),
// we don't need to bother calling GitHub API to re-mark the runner for deletion.
// Just hold on, and runners will disappear as long as the runner controller is up and running.
if !res.owner.GetDeletionTimestamp().IsZero() {
continue
}
// Statefulset termination process 3/4: Set the deletionTimestamp to let Kubernetes start a cascade deletion of the statefulset and the pods.
if _, ok := getAnnotation(res.owner, AnnotationKeyUnregistrationCompleteTimestamp); ok {
if err := c.Delete(ctx, res.owner); err != nil {
log.Error(err, "Failed to delete owner")
return nil, err
}
continue
}
// Statefulset termination process 2/4: Set unregistrationCompleteTimestamp only if all the pods managed by the statefulset
// have either unregistered or being deleted.
if _, ok := getAnnotation(res.owner, AnnotationKeyUnregistrationRequestTimestamp); ok {
var deletionSafe int
for _, po := range res.pods {
if _, ok := getAnnotation(&po, AnnotationKeyUnregistrationCompleteTimestamp); ok {
deletionSafe++
} else if !po.DeletionTimestamp.IsZero() {
deletionSafe++
}
}
log.V(2).Info("Marking owner for unregistration completion", "deletionSafe", deletionSafe, "total", res.total)
if deletionSafe == res.total {
if _, ok := getAnnotation(res.owner, AnnotationKeyUnregistrationCompleteTimestamp); !ok {
updated := res.owner.withAnnotation(AnnotationKeyUnregistrationCompleteTimestamp, time.Now().Format(time.RFC3339))
if err := c.Patch(ctx, updated, client.MergeFrom(res.owner)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch owner to have %s annotation", AnnotationKeyUnregistrationCompleteTimestamp))
return nil, err
}
log.V(2).Info("Redundant owner has been annotated to start the deletion")
} else {
log.V(2).Info("BUG: Redundant owner was already annotated to start the deletion")
}
}
continue
}
if annotations := res.owner.GetAnnotations(); annotations != nil {
if a, ok := annotations[SyncTimeAnnotationKey]; ok {
t, err := time.Parse(time.RFC3339, a)
if err == nil {
if lastSyncTime == nil || lastSyncTime.Before(t) {
lastSyncTime = &t
}
}
}
}
// A completed owner and a completed runner pod can safely be deleted without
// a race condition so delete it here,
// so that the later process can be a bit simpler.
if res.total > 0 && res.total == res.completed {
if err := c.Delete(ctx, ss); err != nil {
log.Error(err, "Unable to delete owner")
return nil, err
}
log.V(2).Info("Deleted completed owner")
return nil, nil
}
if !res.synced {
return nil, nil
}
podsForOwnerPerTemplateHash[res.templateHash] = append(podsForOwnerPerTemplateHash[res.templateHash], res)
}
return &state{podsForOwnerPerTemplateHash, lastSyncTime}, nil
}

View File

@ -18,13 +18,10 @@ package controllers
import ( import (
"context" "context"
"fmt"
"reflect" "reflect"
"sort"
"time" "time"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
@ -87,15 +84,6 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
metrics.SetRunnerSet(*runnerSet) metrics.SetRunnerSet(*runnerSet)
desiredStatefulSet, err := r.newStatefulSet(runnerSet)
if err != nil {
r.Recorder.Event(runnerSet, corev1.EventTypeNormal, "RunnerAutoscalingFailure", err.Error())
log.Error(err, "Could not create statefulset")
return ctrl.Result{}, err
}
var statefulsetList appsv1.StatefulSetList var statefulsetList appsv1.StatefulSetList
if err := r.List(ctx, &statefulsetList, client.InNamespace(req.Namespace), client.MatchingFields{runnerSetOwnerKey: req.Name}); err != nil { if err := r.List(ctx, &statefulsetList, client.InNamespace(req.Namespace), client.MatchingFields{runnerSetOwnerKey: req.Name}); err != nil {
return ctrl.Result{}, err return ctrl.Result{}, err
@ -108,6 +96,15 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
desiredStatefulSet, err := r.newStatefulSet(runnerSet)
if err != nil {
r.Recorder.Event(runnerSet, corev1.EventTypeNormal, "RunnerAutoscalingFailure", err.Error())
log.Error(err, "Could not create statefulset")
return ctrl.Result{}, err
}
desiredTemplateHash, ok := getStatefulSetTemplateHash(desiredStatefulSet) desiredTemplateHash, ok := getStatefulSetTemplateHash(desiredStatefulSet)
if !ok { if !ok {
log.Info("Failed to get template hash of desired statefulset. It must be in an invalid state. Please manually delete the statefulset so that it is recreated") log.Info("Failed to get template hash of desired statefulset. It must be in an invalid state. Please manually delete the statefulset so that it is recreated")
@ -115,157 +112,9 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
statefulsetsPerTemplateHash := map[string][]*podsForStatefulset{} addedReplicas := int32(1)
create := desiredStatefulSet.DeepCopy()
// # Why do we recreate statefulsets instead of updating their desired replicas? create.Spec.Replicas = &addedReplicas
//
// A statefulset cannot add more pods when not all the pods are running.
// Our ephemeral runners' pods that have finished running become Completed(Phase=Succeeded).
// So creating one statefulset per a batch of ephemeral runners is the only way for us to add more replicas.
//
// # Why do we recreate statefulsets instead of updating fields other than replicas?
//
// That's because Kubernetes doesn't allow updating anything other than replicas, template, and updateStrategy.
// And the nature of ephemeral runner pods requires you to create a statefulset per a batch of new runner pods so
// we have really no other choice.
//
// If you're curious, the below is the error message you will get when you tried to update forbidden StatefulSet field(s):
//
// 2021-06-13T07:19:52.760Z ERROR actions-runner-controller.runnerset Failed to patch statefulset
// {"runnerset": "default/example-runnerset", "error": "StatefulSet.apps \"example-runnerset\" is invalid: s
// pec: Forbidden: updates to statefulset spec for fields other than 'replicas', 'template', and 'updateStrategy'
// are forbidden"}
//
// Even though the error message includes "Forbidden", this error's reason is "Invalid".
// So we used to match these errors by using errors.IsInvalid. But that's another story...
// lastSyncTime becomes non-nil only when there are one or more statefulset(s) hence there are same number of runner pods.
// It's used to prevent runnerset-controller from recreating "completed ephemeral runners".
// This is needed to prevent runners from being terminated prematurely.
// See https://github.com/actions-runner-controller/actions-runner-controller/issues/911 for more context.
//
// This becomes nil when there are zero statefulset(s). That's fine because then there should be zero stateful(s) to be recreated either hence
// we don't need to guard with lastSyncTime.
var lastSyncTime *time.Time
for _, ss := range statefulsets {
ss := ss
log := log.WithValues("statefulset", types.NamespacedName{Namespace: ss.Namespace, Name: ss.Name})
res, err := r.getPodsForStatefulset(ctx, log, &ss)
if err != nil {
return ctrl.Result{}, err
}
// Statefulset termination process 4/4: Let Kubernetes cascade-delete the statefulset and the pods.
if !res.statefulset.DeletionTimestamp.IsZero() {
continue
}
// Statefulset termination process 3/4: Set the deletionTimestamp to let Kubernetes start a cascade deletion of the statefulset and the pods.
if _, ok := getAnnotation(&res.statefulset.ObjectMeta, AnnotationKeyUnregistrationCompleteTimestamp); ok {
if err := r.Client.Delete(ctx, res.statefulset); err != nil {
log.Error(err, "Failed to delete statefulset")
return ctrl.Result{}, err
}
continue
}
// Statefulset termination process 2/4: Set unregistrationCompleteTimestamp only if all the pods managed by the statefulset
// have either unregistered or being deleted.
if _, ok := getAnnotation(&res.statefulset.ObjectMeta, AnnotationKeyUnregistrationRequestTimestamp); ok {
var deletionSafe int
for _, po := range res.pods {
if _, ok := getAnnotation(&po.ObjectMeta, AnnotationKeyUnregistrationCompleteTimestamp); ok {
deletionSafe++
} else if !po.DeletionTimestamp.IsZero() {
deletionSafe++
}
}
log.V(2).Info("Marking statefulset for unregistration completion", "deletionSafe", deletionSafe, "total", res.total)
if deletionSafe == res.total {
if _, ok := getAnnotation(&res.statefulset.ObjectMeta, AnnotationKeyUnregistrationCompleteTimestamp); !ok {
updated := res.statefulset.DeepCopy()
setAnnotation(&updated.ObjectMeta, AnnotationKeyUnregistrationCompleteTimestamp, time.Now().Format(time.RFC3339))
if err := r.Client.Patch(ctx, updated, client.MergeFrom(res.statefulset)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch statefulset to have %s annotation", AnnotationKeyUnregistrationCompleteTimestamp))
return ctrl.Result{}, err
}
log.V(2).Info("Redundant statefulset has been annotated to start the deletion")
} else {
log.V(2).Info("BUG: Redundant statefulset was already annotated to start the deletion")
}
}
continue
}
if res.statefulset.Annotations != nil {
if a, ok := res.statefulset.Annotations[SyncTimeAnnotationKey]; ok {
t, err := time.Parse(time.RFC3339, a)
if err == nil {
if lastSyncTime == nil || lastSyncTime.Before(t) {
lastSyncTime = &t
}
}
}
}
statefulsetsPerTemplateHash[res.templateHash] = append(statefulsetsPerTemplateHash[res.templateHash], res)
// A completed statefulset or a completed pod can safely be deleted without
// a race condition so delete it here,
// so that the later process can be a bit simpler.
if res.total > 0 && res.total == res.completed {
if err := r.Client.Delete(ctx, &ss); err != nil {
log.Error(err, "Unable to delete statefulset")
return ctrl.Result{}, err
}
log.V(2).Info("Deleted completed statefulset")
return ctrl.Result{}, nil
}
var replicas int32 = 1
if ss.Spec.Replicas != nil {
replicas = *ss.Spec.Replicas
}
if ss.Status.Replicas != replicas {
log.V(2).Info("Waiting for statefulset to sync", "desiredReplicas", replicas, "currentReplicas", ss.Status.Replicas)
return ctrl.Result{}, nil
}
}
currentStatefulSets := statefulsetsPerTemplateHash[desiredTemplateHash]
sort.SliceStable(currentStatefulSets, func(i, j int) bool {
return currentStatefulSets[i].statefulset.CreationTimestamp.Before(&currentStatefulSets[j].statefulset.CreationTimestamp)
})
if len(currentStatefulSets) > 0 {
timestampFirst := currentStatefulSets[0].statefulset.CreationTimestamp
timestampLast := currentStatefulSets[len(currentStatefulSets)-1].statefulset.CreationTimestamp
var names []string
for _, ss := range currentStatefulSets {
names = append(names, ss.statefulset.Name)
}
log.V(2).Info("Detected some current statefulsets", "creationTimestampFirst", timestampFirst, "creationTimestampLast", timestampLast, "statefulsets", names)
}
var pending, running, regTimeout int
for _, ss := range currentStatefulSets {
pending += ss.pending
running += ss.running
regTimeout += ss.regTimeout
}
const defaultReplicas = 1 const defaultReplicas = 1
@ -277,123 +126,28 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
newDesiredReplicas := getIntOrDefault(replicasOfDesiredStatefulSet, defaultReplicas) newDesiredReplicas := getIntOrDefault(replicasOfDesiredStatefulSet, defaultReplicas)
log.V(2).Info(
"Found some pods across statefulset(s)",
"pending", pending,
"running", running,
"regTimeout", regTimeout,
"desired", newDesiredReplicas,
"statefulsets", len(statefulsets),
)
effectiveTime := runnerSet.Spec.EffectiveTime effectiveTime := runnerSet.Spec.EffectiveTime
ephemeral := runnerSet.Spec.Ephemeral == nil || *runnerSet.Spec.Ephemeral ephemeral := runnerSet.Spec.Ephemeral == nil || *runnerSet.Spec.Ephemeral
maybeRunning := pending + running
if newDesiredReplicas > maybeRunning && ephemeral && lastSyncTime != nil && effectiveTime != nil && lastSyncTime.After(effectiveTime.Time) { var owners []client.Object
log.V(2).Info("Detected that some ephemeral runners have disappeared. Usually this is due to that ephemeral runner completions so ARC does not create new runners until EffectiveTime is updated.", "lastSyncTime", metav1.Time{Time: *lastSyncTime}, "effectiveTime", *effectiveTime, "desired", newDesiredReplicas, "pending", pending, "running", running)
} else if newDesiredReplicas > maybeRunning {
num := newDesiredReplicas - maybeRunning
for i := 0; i < num; i++ { for _, ss := range statefulsets {
// Add more replicas ss := ss
addedReplicas := int32(1) owners = append(owners, &ss)
}
create := desiredStatefulSet.DeepCopy() res, err := syncRunnerPodsOwners(ctx, r.Client, log, effectiveTime, newDesiredReplicas, desiredTemplateHash, create, ephemeral, owners)
create.Spec.Replicas = &addedReplicas if err != nil || res == nil {
if err := r.Client.Create(ctx, create); err != nil {
return ctrl.Result{}, err return ctrl.Result{}, err
} }
}
log.V(2).Info("Created statefulset(s) to add more replicas", "num", num)
return ctrl.Result{}, nil
} else if newDesiredReplicas <= running {
var retained int
var delete []*podsForStatefulset
for i := len(currentStatefulSets) - 1; i >= 0; i-- {
ss := currentStatefulSets[i]
if ss.running == 0 || retained >= newDesiredReplicas {
// In case the desired replicas is satisfied until i-1, or this statefulset has no running pods,
// this statefulset can be considered safe for deletion.
// Note that we already waited on this statefulset to create pods by waiting for
// `ss.Status.Replicas`(=total number of pods managed by statefulset, regarldess of the runner is Running or Completed) to match the desired replicas in a previous step.
// So `ss.running == 0` means "the statefulset has created the desired number of pods before but all of them are completed now".
delete = append(delete, ss)
} else if retained < newDesiredReplicas {
retained += ss.running
}
}
if retained == newDesiredReplicas {
for _, ss := range delete {
log := log.WithValues("statefulset", types.NamespacedName{Namespace: ss.statefulset.Namespace, Name: ss.statefulset.Name})
// Statefulset termination process 1/4: Set unregistrationRequestTimestamp only after all the pods managed by the statefulset have
// started unregistreation process.
//
// NOTE: We just mark it instead of immediately starting the deletion process.
// Otherwise, the runner pod may hit termiationGracePeriod before the unregistration completes(the max terminationGracePeriod is limited to 1h by K8s and a job can be run for more than that),
// or actions/runner may potentially misbehave on SIGTERM immediately sent by K8s.
// We'd better unregister first and then start a pod deletion process.
// The annotation works as a mark to start the pod unregistration and deletion process of ours.
for _, po := range ss.pods {
if _, err := annotatePodOnce(ctx, r.Client, log, &po, AnnotationKeyUnregistrationRequestTimestamp, time.Now().Format(time.RFC3339)); err != nil {
return ctrl.Result{}, err
}
}
if _, ok := getAnnotation(&ss.statefulset.ObjectMeta, AnnotationKeyUnregistrationRequestTimestamp); !ok {
updated := ss.statefulset.DeepCopy()
setAnnotation(&updated.ObjectMeta, AnnotationKeyUnregistrationRequestTimestamp, time.Now().Format(time.RFC3339))
if err := r.Client.Patch(ctx, updated, client.MergeFrom(ss.statefulset)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch statefulset to have %s annotation", AnnotationKeyUnregistrationRequestTimestamp))
return ctrl.Result{}, err
}
log.V(2).Info("Redundant statefulset has been annotated to start the unregistration before deletion")
} else {
log.V(2).Info("BUG: Redundant statefulset was already annotated")
}
}
return ctrl.Result{}, err
} else if retained > newDesiredReplicas {
log.V(2).Info("Waiting sync before scale down", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
return ctrl.Result{}, nil
} else {
log.Info("Invalid state", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
panic("crashed due to invalid state")
}
}
for _, sss := range statefulsetsPerTemplateHash {
for _, ss := range sss {
if ss.templateHash != desiredTemplateHash {
if ss.statefulset.DeletionTimestamp.IsZero() {
if err := r.Client.Delete(ctx, ss.statefulset); err != nil {
log.Error(err, "Unable to delete statefulset")
return ctrl.Result{}, err
}
log.V(2).Info("Deleted redundant and outdated statefulset")
}
return ctrl.Result{}, nil
}
}
}
var statusReplicas, statusReadyReplicas, totalCurrentReplicas, updatedReplicas int var statusReplicas, statusReadyReplicas, totalCurrentReplicas, updatedReplicas int
for _, ss := range currentStatefulSets { for _, ss := range res.currentObjects {
statusReplicas += int(ss.statefulset.Status.Replicas) statusReplicas += int(ss.statefulSet.Status.Replicas)
statusReadyReplicas += int(ss.statefulset.Status.ReadyReplicas) statusReadyReplicas += int(ss.statefulSet.Status.ReadyReplicas)
totalCurrentReplicas += int(ss.statefulset.Status.CurrentReplicas) totalCurrentReplicas += int(ss.statefulSet.Status.CurrentReplicas)
updatedReplicas += int(ss.statefulset.Status.UpdatedReplicas) updatedReplicas += int(ss.statefulSet.Status.UpdatedReplicas)
} }
status := runnerSet.Status.DeepCopy() status := runnerSet.Status.DeepCopy()
@ -419,91 +173,6 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
type podsForStatefulset struct {
total int
completed int
running int
terminating int
regTimeout int
pending int
templateHash string
statefulset *appsv1.StatefulSet
pods []corev1.Pod
}
func (r *RunnerSetReconciler) getPodsForStatefulset(ctx context.Context, log logr.Logger, ss *appsv1.StatefulSet) (*podsForStatefulset, error) {
var podList corev1.PodList
if err := r.Client.List(ctx, &podList, client.MatchingLabels(ss.Spec.Template.ObjectMeta.Labels)); err != nil {
log.Error(err, "Failed to list pods managed by statefulset")
return nil, err
}
var completed, running, terminating, regTimeout, pending, total int
var pods []corev1.Pod
for _, pod := range podList.Items {
if owner := metav1.GetControllerOf(&pod); owner == nil || owner.Kind != "StatefulSet" || owner.Name != ss.Name {
continue
}
pods = append(pods, pod)
total++
if runnerPodOrContainerIsStopped(&pod) {
completed++
} else if pod.Status.Phase == corev1.PodRunning {
if podRunnerID(&pod) == "" && podConditionTransitionTimeAfter(&pod, corev1.PodReady, registrationTimeout) {
log.Info(
"Runner failed to register itself to GitHub in timely manner. "+
"Recreating the pod to see if it resolves the issue. "+
"CAUTION: If you see this a lot, you should investigate the root cause. "+
"See https://github.com/actions-runner-controller/actions-runner-controller/issues/288",
"creationTimestamp", pod.CreationTimestamp,
"readyTransitionTime", podConditionTransitionTime(&pod, corev1.PodReady, corev1.ConditionTrue),
"configuredRegistrationTimeout", registrationTimeout,
)
regTimeout++
} else {
running++
}
} else if !pod.DeletionTimestamp.IsZero() {
terminating++
} else {
// pending includes running but timedout runner's pod too
pending++
}
}
templateHash, ok := getStatefulSetTemplateHash(ss)
if !ok {
log.Info("Failed to get template hash of statefulset. It must be in an invalid state. Please manually delete the statefulset so that it is recreated")
return nil, nil
}
return &podsForStatefulset{
total: total,
completed: completed,
running: running,
terminating: terminating,
regTimeout: regTimeout,
pending: pending,
templateHash: templateHash,
statefulset: ss,
pods: pods,
}, nil
}
func getStatefulSetTemplateHash(rs *appsv1.StatefulSet) (string, bool) {
hash, ok := rs.Labels[LabelKeyRunnerTemplateHash]
return hash, ok
}
func getRunnerSetSelector(runnerSet *v1alpha1.RunnerSet) *metav1.LabelSelector { func getRunnerSetSelector(runnerSet *v1alpha1.RunnerSet) *metav1.LabelSelector {
selector := runnerSet.Spec.Selector selector := runnerSet.Spec.Selector
if selector == nil { if selector == nil {