Make RunnerSet much more reliable with or without webhook

This commit is contained in:
Yusuke Kuoka 2022-02-27 12:01:01 +00:00
parent 11be6c1fb6
commit 15b402bb32
7 changed files with 301 additions and 65 deletions

View File

@ -25,6 +25,14 @@ import (
type RunnerSetSpec struct { type RunnerSetSpec struct {
RunnerConfig `json:",inline"` RunnerConfig `json:",inline"`
// EffectiveTime is the time the upstream controller requested to sync Replicas.
// It is usually populated by the webhook-based autoscaler via HRA.
// It is used to prevent ephemeral runners from unnecessarily recreated.
//
// +optional
// +nullable
EffectiveTime *metav1.Time `json:"effectiveTime,omitempty"`
appsv1.StatefulSetSpec `json:",inline"` appsv1.StatefulSetSpec `json:",inline"`
} }

View File

@ -932,6 +932,10 @@ func (in *RunnerSetList) DeepCopyObject() runtime.Object {
func (in *RunnerSetSpec) DeepCopyInto(out *RunnerSetSpec) { func (in *RunnerSetSpec) DeepCopyInto(out *RunnerSetSpec) {
*out = *in *out = *in
in.RunnerConfig.DeepCopyInto(&out.RunnerConfig) in.RunnerConfig.DeepCopyInto(&out.RunnerConfig)
if in.EffectiveTime != nil {
in, out := &in.EffectiveTime, &out.EffectiveTime
*out = (*in).DeepCopy()
}
in.StatefulSetSpec.DeepCopyInto(&out.StatefulSetSpec) in.StatefulSetSpec.DeepCopyInto(&out.StatefulSetSpec)
} }

View File

@ -55,6 +55,11 @@ spec:
type: string type: string
dockerdWithinRunnerContainer: dockerdWithinRunnerContainer:
type: boolean type: boolean
effectiveTime:
description: EffectiveTime is the time the upstream controller requested to sync Replicas. It is usually populated by the webhook-based autoscaler via HRA. It is used to prevent ephemeral runners from unnecessarily recreated.
format: date-time
nullable: true
type: string
enterprise: enterprise:
pattern: ^[^/]+$ pattern: ^[^/]+$
type: string type: string

View File

@ -55,6 +55,11 @@ spec:
type: string type: string
dockerdWithinRunnerContainer: dockerdWithinRunnerContainer:
type: boolean type: boolean
effectiveTime:
description: EffectiveTime is the time the upstream controller requested to sync Replicas. It is usually populated by the webhook-based autoscaler via HRA. It is used to prevent ephemeral runners from unnecessarily recreated.
format: date-time
nullable: true
type: string
enterprise: enterprise:
pattern: ^[^/]+$ pattern: ^[^/]+$
type: string type: string

View File

@ -202,15 +202,38 @@ func (r *HorizontalRunnerAutoscalerReconciler) Reconcile(ctx context.Context, re
} }
currentDesiredReplicas := getIntOrDefault(replicas, defaultReplicas) currentDesiredReplicas := getIntOrDefault(replicas, defaultReplicas)
ephemeral := rs.Spec.Ephemeral == nil || *rs.Spec.Ephemeral
var effectiveTime *time.Time
for _, r := range hra.Spec.CapacityReservations {
t := r.EffectiveTime
if effectiveTime == nil || effectiveTime.Before(t.Time) {
effectiveTime = &t.Time
}
}
if currentDesiredReplicas != newDesiredReplicas { if currentDesiredReplicas != newDesiredReplicas {
copy := rs.DeepCopy() copy := rs.DeepCopy()
v := int32(newDesiredReplicas) v := int32(newDesiredReplicas)
copy.Spec.Replicas = &v copy.Spec.Replicas = &v
if ephemeral && effectiveTime != nil {
copy.Spec.EffectiveTime = &metav1.Time{Time: *effectiveTime}
}
if err := r.Client.Patch(ctx, copy, client.MergeFrom(&rs)); err != nil {
return fmt.Errorf("patching runnerset to have %d replicas: %w", newDesiredReplicas, err)
}
} else if ephemeral && effectiveTime != nil {
copy := rs.DeepCopy()
copy.Spec.EffectiveTime = &metav1.Time{Time: *effectiveTime}
if err := r.Client.Patch(ctx, copy, client.MergeFrom(&rs)); err != nil { if err := r.Client.Patch(ctx, copy, client.MergeFrom(&rs)); err != nil {
return fmt.Errorf("patching runnerset to have %d replicas: %w", newDesiredReplicas, err) return fmt.Errorf("patching runnerset to have %d replicas: %w", newDesiredReplicas, err)
} }
} }
return nil return nil
}) })
} }

View File

@ -356,6 +356,13 @@ func (r *RunnerPodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return *res, err return *res, err
} }
if runnerPodOrContainerIsStopped(updated) {
log.Info("Detected runner to have successfully stopped", "name", runnerPod.Name)
return ctrl.Result{}, nil
} else {
log.Info("Runner can be safely deleted", "name", runnerPod.Name)
}
// Delete current pod if recreation is needed // Delete current pod if recreation is needed
if err := r.Delete(ctx, updated); err != nil { if err := r.Delete(ctx, updated); err != nil {
log.Error(err, "Failed to delete pod resource") log.Error(err, "Failed to delete pod resource")

View File

@ -19,10 +19,10 @@ package controllers
import ( import (
"context" "context"
"reflect" "reflect"
"sort"
"time" "time"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -99,27 +99,15 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err return ctrl.Result{}, err
} }
liveStatefulSet := &appsv1.StatefulSet{} var statefulsetList appsv1.StatefulSetList
if err := r.Get(ctx, types.NamespacedName{Namespace: runnerSet.Namespace, Name: runnerSet.Name}, liveStatefulSet); err != nil { if err := r.List(ctx, &statefulsetList, client.InNamespace(req.Namespace), client.MatchingFields{runnerSetOwnerKey: req.Name}); err != nil {
if !errors.IsNotFound(err) { return ctrl.Result{}, err
log.Error(err, "Failed to get live statefulset")
return ctrl.Result{}, err
}
if err := r.Client.Create(ctx, desiredStatefulSet); err != nil {
log.Error(err, "Failed to create statefulset resource")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
} }
liveTemplateHash, ok := getStatefulSetTemplateHash(liveStatefulSet) statefulsets := statefulsetList.Items
if !ok {
log.Info("Failed to get template hash of newest statefulset resource. It must be in an invalid state. Please manually delete the statefulset so that it is recreated")
if len(statefulsets) > 1000 {
log.Info("Postponed reconcilation to prevent potential infinite loop. If you're really scaling more than 1000 statefulsets, do change this hard-coded threshold!")
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
@ -130,76 +118,195 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
if liveTemplateHash != desiredTemplateHash { statefulsetsPerTemplateHash := map[string][]*podsForStatefulset{}
copy := liveStatefulSet.DeepCopy()
copy.Spec = desiredStatefulSet.Spec
if err := r.Client.Patch(ctx, copy, client.MergeFrom(liveStatefulSet)); err != nil { // # Why do we recreate statefulsets instead of updating their desired replicas?
log.Error(err, "Failed to patch statefulset", "reason", errors.ReasonForError(err)) //
// A statefulset cannot add more pods when not all the pods are running.
// Our ephemeral runners' pods that have finished running become Completed(Phase=Succeeded).
// So creating one statefulset per a batch of ephemeral runners is the only way for us to add more replicas.
//
// # Why do we recreate statefulsets instead of updating fields other than replicas?
//
// That's because Kubernetes doesn't allow updating anything other than replicas, template, and updateStrategy.
// And the nature of ephemeral runner pods requires you to create a statefulset per a batch of new runner pods so
// we have really no other choice.
//
// If you're curious, the below is the error message you will get when you tried to update forbidden StatefulSet field(s):
//
// 2021-06-13T07:19:52.760Z ERROR actions-runner-controller.runnerset Failed to patch statefulset
// {"runnerset": "default/example-runnerset", "error": "StatefulSet.apps \"example-runnerset\" is invalid: s
// pec: Forbidden: updates to statefulset spec for fields other than 'replicas', 'template', and 'updateStrategy'
// are forbidden"}
//
// Even though the error message includes "Forbidden", this error's reason is "Invalid".
// So we used to match these errors by using errors.IsInvalid. But that's another story...
if errors.IsInvalid(err) { var lastSyncTime *time.Time
// NOTE: This might not be ideal but is currently required to deal with the forbidden error by recreating the statefulset
//
// 2021-06-13T07:19:52.760Z ERROR actions-runner-controller.runnerset Failed to patch statefulset
// {"runnerset": "default/example-runnerset", "error": "StatefulSet.apps \"example-runnerset\" is invalid: s
// pec: Forbidden: updates to statefulset spec for fields other than 'replicas', 'template', and 'updateStrategy'
// are forbidden"}
//
// Even though the error message includes "Forbidden", this error's reason is "Invalid".
// That's why we're using errors.IsInvalid above.
if err := r.Client.Delete(ctx, liveStatefulSet); err != nil { for _, ss := range statefulsets {
log.Error(err, "Failed to delete statefulset for force-update") ss := ss
return ctrl.Result{}, err
}
log.Info("Deleted statefulset for force-update")
}
log := log.WithValues("statefulset", types.NamespacedName{Namespace: ss.Namespace, Name: ss.Name})
res, err := r.getPodsForStatefulset(ctx, log, &ss)
if err != nil {
return ctrl.Result{}, err return ctrl.Result{}, err
} }
// We requeue in order to clean up old runner replica sets later. if !res.statefulset.DeletionTimestamp.IsZero() {
// Otherwise, they aren't cleaned up until the next re-sync interval. continue
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil }
if res.statefulset.Annotations != nil {
if a, ok := res.statefulset.Annotations[SyncTimeAnnotationKey]; ok {
t, err := time.Parse(time.RFC3339, a)
if err == nil {
if lastSyncTime == nil || lastSyncTime.Before(t) {
lastSyncTime = &t
}
}
}
}
statefulsetsPerTemplateHash[res.templateHash] = append(statefulsetsPerTemplateHash[res.templateHash], res)
if res.total > 0 && res.total == res.completed {
if err := r.Client.Delete(ctx, &ss); err != nil {
log.Error(err, "Unable to delete statefulset")
return ctrl.Result{}, err
}
log.V(2).Info("Deleted completed statefulset")
return ctrl.Result{}, nil
}
var replicas int32 = 1
if ss.Spec.Replicas != nil {
replicas = *ss.Spec.Replicas
}
if ss.Status.Replicas != replicas {
log.V(2).Info("Waiting for statefulset to sync", "desiredReplicas", replicas, "currentReplicas", ss.Status.Replicas)
return ctrl.Result{}, nil
}
}
currentStatefulSets := statefulsetsPerTemplateHash[desiredTemplateHash]
sort.SliceStable(currentStatefulSets, func(i, j int) bool {
return currentStatefulSets[i].statefulset.CreationTimestamp.Before(&currentStatefulSets[j].statefulset.CreationTimestamp)
})
if len(currentStatefulSets) > 0 {
timestampFirst := currentStatefulSets[0].statefulset.CreationTimestamp
timestampLast := currentStatefulSets[len(currentStatefulSets)-1].statefulset.CreationTimestamp
var names []string
for _, ss := range currentStatefulSets {
names = append(names, ss.statefulset.Name)
}
log.V(2).Info("Detected some current statefulsets", "creationTimestampFirst", timestampFirst, "creationTimestampLast", timestampLast, "statefulsets", names)
}
var pending, running int
for _, ss := range currentStatefulSets {
pending += ss.pending
running += ss.running
} }
const defaultReplicas = 1 const defaultReplicas = 1
var replicasOfLiveStatefulSet *int
if liveStatefulSet.Spec.Replicas != nil {
v := int(*liveStatefulSet.Spec.Replicas)
replicasOfLiveStatefulSet = &v
}
var replicasOfDesiredStatefulSet *int var replicasOfDesiredStatefulSet *int
if desiredStatefulSet.Spec.Replicas != nil { if desiredStatefulSet.Spec.Replicas != nil {
v := int(*desiredStatefulSet.Spec.Replicas) v := int(*desiredStatefulSet.Spec.Replicas)
replicasOfDesiredStatefulSet = &v replicasOfDesiredStatefulSet = &v
} }
currentDesiredReplicas := getIntOrDefault(replicasOfLiveStatefulSet, defaultReplicas)
newDesiredReplicas := getIntOrDefault(replicasOfDesiredStatefulSet, defaultReplicas) newDesiredReplicas := getIntOrDefault(replicasOfDesiredStatefulSet, defaultReplicas)
// Please add more conditions that we can in-place update the newest runnerreplicaset without disruption log.V(2).Info("Found some pods across statefulset(s)", "pending", pending, "running", running, "desired", newDesiredReplicas, "statefulsets", len(statefulsets))
if currentDesiredReplicas != newDesiredReplicas {
v := int32(newDesiredReplicas)
updated := liveStatefulSet.DeepCopy() effectiveTime := runnerSet.Spec.EffectiveTime
updated.Spec.Replicas = &v ephemeral := runnerSet.Spec.Ephemeral == nil || *runnerSet.Spec.Ephemeral
if err := r.Client.Patch(ctx, updated, client.MergeFrom(liveStatefulSet)); err != nil { if newDesiredReplicas > pending+running && ephemeral && lastSyncTime != nil && effectiveTime != nil && lastSyncTime.After(effectiveTime.Time) {
log.Error(err, "Failed to update statefulset") log.V(2).Info("Detected that some ephemeral runners have disappeared. Usually this is due to that ephemeral runner completions so ARC does not create new runners until EffectiveTime is updated.", "lastSyncTime", metav1.Time{Time: *lastSyncTime}, "effectiveTime", *effectiveTime, "desired", newDesiredReplicas, "pending", pending, "running", running)
} else if newDesiredReplicas > pending+running {
num := newDesiredReplicas - (pending + running)
return ctrl.Result{}, err for i := 0; i < num; i++ {
// Add more replicas
addedReplicas := int32(1)
create := desiredStatefulSet.DeepCopy()
create.Spec.Replicas = &addedReplicas
if err := r.Client.Create(ctx, create); err != nil {
return ctrl.Result{}, err
}
} }
log.V(2).Info("Created statefulset(s) to add more replicas", "num", num)
return ctrl.Result{}, nil return ctrl.Result{}, nil
} else if newDesiredReplicas < running {
var retained int
var lastIndex int
for i := len(currentStatefulSets) - 1; i >= 0; i-- {
ss := currentStatefulSets[i]
retained += ss.running
if retained >= newDesiredReplicas {
lastIndex = i
break
}
}
if retained == newDesiredReplicas {
for i := 0; i < lastIndex; i++ {
ss := currentStatefulSets[i]
log := log.WithValues("statefulset", types.NamespacedName{Namespace: ss.statefulset.Namespace, Name: ss.statefulset.Name})
if err := r.Client.Delete(ctx, ss.statefulset); err != nil {
return ctrl.Result{}, err
}
log.V(2).Info("Deleted redundant statefulset", "i", i, "lastIndex", lastIndex)
}
return ctrl.Result{}, err
} else if retained > newDesiredReplicas {
log.V(2).Info("Waiting sync before scale down", "retained", retained, "newDesiredReplicas", newDesiredReplicas, "lastIndex", lastIndex)
return ctrl.Result{}, nil
} else {
log.Info("Invalid state", "retained", retained, "newDesiredReplicas", newDesiredReplicas, "lastIndex", lastIndex)
panic("crashed due to invalid state")
}
} }
statusReplicas := int(liveStatefulSet.Status.Replicas) for _, sss := range statefulsetsPerTemplateHash {
statusReadyReplicas := int(liveStatefulSet.Status.ReadyReplicas) for _, ss := range sss {
totalCurrentReplicas := int(liveStatefulSet.Status.CurrentReplicas) if ss.templateHash != desiredTemplateHash {
updatedReplicas := int(liveStatefulSet.Status.UpdatedReplicas) if ss.statefulset.DeletionTimestamp.IsZero() {
if err := r.Client.Delete(ctx, ss.statefulset); err != nil {
log.Error(err, "Unable to delete statefulset")
return ctrl.Result{}, err
}
log.V(2).Info("Deleted redundant and outdated statefulset")
}
return ctrl.Result{}, nil
}
}
}
var statusReplicas, statusReadyReplicas, totalCurrentReplicas, updatedReplicas int
for _, ss := range currentStatefulSets {
statusReplicas += int(ss.statefulset.Status.Replicas)
statusReadyReplicas += int(ss.statefulset.Status.ReadyReplicas)
totalCurrentReplicas += int(ss.statefulset.Status.CurrentReplicas)
updatedReplicas += int(ss.statefulset.Status.UpdatedReplicas)
}
status := runnerSet.Status.DeepCopy() status := runnerSet.Status.DeepCopy()
@ -224,6 +331,64 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
type podsForStatefulset struct {
total int
completed int
running int
terminating int
pending int
templateHash string
statefulset *appsv1.StatefulSet
pods []corev1.Pod
}
func (r *RunnerSetReconciler) getPodsForStatefulset(ctx context.Context, log logr.Logger, ss *appsv1.StatefulSet) (*podsForStatefulset, error) {
var podList corev1.PodList
if err := r.Client.List(ctx, &podList, client.MatchingLabels(ss.Spec.Template.ObjectMeta.Labels)); err != nil {
log.Error(err, "Failed to list pods managed by statefulset")
return nil, err
}
var completed, running, terminating, pending, total int
for _, pod := range podList.Items {
if owner := metav1.GetControllerOf(&pod); owner == nil || owner.Kind != "StatefulSet" || owner.Name != ss.Name {
continue
}
total++
if runnerPodOrContainerIsStopped(&pod) {
completed++
} else if pod.Status.Phase == corev1.PodRunning {
running++
} else if !pod.DeletionTimestamp.IsZero() {
terminating++
} else {
pending++
}
}
templateHash, ok := getStatefulSetTemplateHash(ss)
if !ok {
log.Info("Failed to get template hash of statefulset. It must be in an invalid state. Please manually delete the statefulset so that it is recreated")
return nil, nil
}
return &podsForStatefulset{
total: total,
completed: completed,
running: running,
terminating: terminating,
pending: pending,
templateHash: templateHash,
statefulset: ss,
pods: podList.Items,
}, nil
}
func getStatefulSetTemplateHash(rs *appsv1.StatefulSet) (string, bool) { func getStatefulSetTemplateHash(rs *appsv1.StatefulSet) (string, bool) {
hash, ok := rs.Labels[LabelKeyRunnerTemplateHash] hash, ok := rs.Labels[LabelKeyRunnerTemplateHash]
@ -288,9 +453,12 @@ func (r *RunnerSetReconciler) newStatefulSet(runnerSet *v1alpha1.RunnerSet) (*ap
rs := appsv1.StatefulSet{ rs := appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{}, TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: runnerSet.ObjectMeta.Name, GenerateName: runnerSet.ObjectMeta.Name + "-",
Namespace: runnerSet.ObjectMeta.Namespace, Namespace: runnerSet.ObjectMeta.Namespace,
Labels: CloneAndAddLabel(runnerSet.ObjectMeta.Labels, LabelKeyRunnerTemplateHash, templateHash), Labels: CloneAndAddLabel(runnerSet.ObjectMeta.Labels, LabelKeyRunnerTemplateHash, templateHash),
Annotations: map[string]string{
SyncTimeAnnotationKey: time.Now().Format(time.RFC3339),
},
}, },
Spec: runnerSetWithOverrides.StatefulSetSpec, Spec: runnerSetWithOverrides.StatefulSetSpec,
} }
@ -310,6 +478,22 @@ func (r *RunnerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.Recorder = mgr.GetEventRecorderFor(name) r.Recorder = mgr.GetEventRecorderFor(name)
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &appsv1.StatefulSet{}, runnerSetOwnerKey, func(rawObj client.Object) []string {
set := rawObj.(*appsv1.StatefulSet)
owner := metav1.GetControllerOf(set)
if owner == nil {
return nil
}
if owner.APIVersion != v1alpha1.GroupVersion.String() || owner.Kind != "RunnerSet" {
return nil
}
return []string{owner.Name}
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr). return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.RunnerSet{}). For(&v1alpha1.RunnerSet{}).
Owns(&appsv1.StatefulSet{}). Owns(&appsv1.StatefulSet{}).