Add HRA support for RunnerSet (#647)
`HRA.Spec.ScaleTargetRef.Kind` is added to denote that the scale-target is a RunnerSet.
It defaults to `RunnerDeployment` for backward compatibility.
```
apiVersion: actions.summerwind.dev/v1alpha1
kind: HorizontalRunnerAutoscaler
metadata:
name: myhra
spec:
scaleTargetRef:
kind: RunnerSet
name: myrunnerset
```
Ref #629
Ref #613
Ref #612
This commit is contained in:
parent
9e1c28fcff
commit
98da4c2adb
|
|
@ -50,6 +50,7 @@ sleep 20
|
||||||
if [ -n "${TEST_REPO}" ]; then
|
if [ -n "${TEST_REPO}" ]; then
|
||||||
if [ -n "USE_RUNNERSET" ]; then
|
if [ -n "USE_RUNNERSET" ]; then
|
||||||
cat acceptance/testdata/repo.runnerset.yaml | envsubst | kubectl apply -f -
|
cat acceptance/testdata/repo.runnerset.yaml | envsubst | kubectl apply -f -
|
||||||
|
cat acceptance/testdata/repo.runnerset.hra.yaml | envsubst | kubectl apply -f -
|
||||||
else
|
else
|
||||||
echo 'Deploying runnerdeployment and hra. Set USE_RUNNERSET if you want to deploy runnerset instead.'
|
echo 'Deploying runnerdeployment and hra. Set USE_RUNNERSET if you want to deploy runnerset instead.'
|
||||||
cat acceptance/testdata/repo.runnerdeploy.yaml | envsubst | kubectl apply -f -
|
cat acceptance/testdata/repo.runnerdeploy.yaml | envsubst | kubectl apply -f -
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
apiVersion: actions.summerwind.dev/v1alpha1
|
||||||
|
kind: HorizontalRunnerAutoscaler
|
||||||
|
metadata:
|
||||||
|
name: example-runnerset
|
||||||
|
spec:
|
||||||
|
scaleTargetRef:
|
||||||
|
kind: RunnerSet
|
||||||
|
name: example-runnerset
|
||||||
|
scaleUpTriggers:
|
||||||
|
- githubEvent:
|
||||||
|
checkRun:
|
||||||
|
types: ["created"]
|
||||||
|
status: "queued"
|
||||||
|
amount: 1
|
||||||
|
duration: "1m"
|
||||||
|
# RunnerSet doesn't support scale from/to zero yet
|
||||||
|
minReplicas: 1
|
||||||
|
maxReplicas: 5
|
||||||
|
metrics:
|
||||||
|
- type: PercentageRunnersBusy
|
||||||
|
scaleUpThreshold: '0.75'
|
||||||
|
scaleDownThreshold: '0.3'
|
||||||
|
scaleUpFactor: '2'
|
||||||
|
scaleDownFactor: '0.5'
|
||||||
|
- type: TotalNumberOfQueuedAndInProgressWorkflowRuns
|
||||||
|
repositoryNames:
|
||||||
|
- ${TEST_REPO}
|
||||||
|
|
@ -106,6 +106,12 @@ type CapacityReservation struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ScaleTargetRef struct {
|
type ScaleTargetRef struct {
|
||||||
|
// Kind is the type of resource being referenced
|
||||||
|
// +optional
|
||||||
|
// +kubebuilder:validation:Enum=RunnerDeployment;RunnerSet
|
||||||
|
Kind string `json:"kind,omitempty"`
|
||||||
|
|
||||||
|
// Name is the name of resource being referenced
|
||||||
Name string `json:"name,omitempty"`
|
Name string `json:"name,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -129,7 +129,14 @@ spec:
|
||||||
description: ScaleTargetRef sis the reference to scaled resource like
|
description: ScaleTargetRef sis the reference to scaled resource like
|
||||||
RunnerDeployment
|
RunnerDeployment
|
||||||
properties:
|
properties:
|
||||||
|
kind:
|
||||||
|
description: Kind is the type of resource being referenced
|
||||||
|
enum:
|
||||||
|
- RunnerDeployment
|
||||||
|
- RunnerSet
|
||||||
|
type: string
|
||||||
name:
|
name:
|
||||||
|
description: Name is the name of resource being referenced
|
||||||
type: string
|
type: string
|
||||||
type: object
|
type: object
|
||||||
scaleUpTriggers:
|
scaleUpTriggers:
|
||||||
|
|
|
||||||
|
|
@ -129,7 +129,14 @@ spec:
|
||||||
description: ScaleTargetRef sis the reference to scaled resource like
|
description: ScaleTargetRef sis the reference to scaled resource like
|
||||||
RunnerDeployment
|
RunnerDeployment
|
||||||
properties:
|
properties:
|
||||||
|
kind:
|
||||||
|
description: Kind is the type of resource being referenced
|
||||||
|
enum:
|
||||||
|
- RunnerDeployment
|
||||||
|
- RunnerSet
|
||||||
|
type: string
|
||||||
name:
|
name:
|
||||||
|
description: Name is the name of resource being referenced
|
||||||
type: string
|
type: string
|
||||||
type: object
|
type: object
|
||||||
scaleUpTriggers:
|
scaleUpTriggers:
|
||||||
|
|
|
||||||
|
|
@ -10,9 +10,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
|
"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
|
||||||
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -63,7 +60,7 @@ func (r *HorizontalRunnerAutoscalerReconciler) fetchSuggestedReplicasFromCache(h
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *HorizontalRunnerAutoscalerReconciler) suggestDesiredReplicas(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
|
func (r *HorizontalRunnerAutoscalerReconciler) suggestDesiredReplicas(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
|
||||||
if hra.Spec.MinReplicas == nil {
|
if hra.Spec.MinReplicas == nil {
|
||||||
return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing minReplicas", hra.Namespace, hra.Name)
|
return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing minReplicas", hra.Namespace, hra.Name)
|
||||||
} else if hra.Spec.MaxReplicas == nil {
|
} else if hra.Spec.MaxReplicas == nil {
|
||||||
|
|
@ -74,7 +71,7 @@ func (r *HorizontalRunnerAutoscalerReconciler) suggestDesiredReplicas(rd v1alpha
|
||||||
numMetrics := len(metrics)
|
numMetrics := len(metrics)
|
||||||
if numMetrics == 0 {
|
if numMetrics == 0 {
|
||||||
if len(hra.Spec.ScaleUpTriggers) == 0 {
|
if len(hra.Spec.ScaleUpTriggers) == 0 {
|
||||||
return r.suggestReplicasByQueuedAndInProgressWorkflowRuns(rd, hra, nil)
|
return r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|
@ -92,9 +89,9 @@ func (r *HorizontalRunnerAutoscalerReconciler) suggestDesiredReplicas(rd v1alpha
|
||||||
|
|
||||||
switch primaryMetricType {
|
switch primaryMetricType {
|
||||||
case v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns:
|
case v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns:
|
||||||
suggested, err = r.suggestReplicasByQueuedAndInProgressWorkflowRuns(rd, hra, &primaryMetric)
|
suggested, err = r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &primaryMetric)
|
||||||
case v1alpha1.AutoscalingMetricTypePercentageRunnersBusy:
|
case v1alpha1.AutoscalingMetricTypePercentageRunnersBusy:
|
||||||
suggested, err = r.suggestReplicasByPercentageRunnersBusy(rd, hra, primaryMetric)
|
suggested, err = r.suggestReplicasByPercentageRunnersBusy(st, hra, primaryMetric)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("validting autoscaling metrics: unsupported metric type %q", primaryMetric)
|
return nil, fmt.Errorf("validting autoscaling metrics: unsupported metric type %q", primaryMetric)
|
||||||
}
|
}
|
||||||
|
|
@ -127,15 +124,15 @@ func (r *HorizontalRunnerAutoscalerReconciler) suggestDesiredReplicas(rd v1alpha
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.suggestReplicasByQueuedAndInProgressWorkflowRuns(rd, hra, &fallbackMetric)
|
return r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &fallbackMetric)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByQueuedAndInProgressWorkflowRuns(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler, metrics *v1alpha1.MetricSpec) (*int, error) {
|
func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByQueuedAndInProgressWorkflowRuns(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics *v1alpha1.MetricSpec) (*int, error) {
|
||||||
|
|
||||||
var repos [][]string
|
var repos [][]string
|
||||||
repoID := rd.Spec.Template.Spec.Repository
|
repoID := st.repo
|
||||||
if repoID == "" {
|
if repoID == "" {
|
||||||
orgName := rd.Spec.Template.Spec.Organization
|
orgName := st.org
|
||||||
if orgName == "" {
|
if orgName == "" {
|
||||||
return nil, fmt.Errorf("asserting runner deployment spec to detect bug: spec.template.organization should not be empty on this code path")
|
return nil, fmt.Errorf("asserting runner deployment spec to detect bug: spec.template.organization should not be empty on this code path")
|
||||||
}
|
}
|
||||||
|
|
@ -230,14 +227,15 @@ func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByQueuedAndInProgr
|
||||||
"workflow_runs_queued", queued,
|
"workflow_runs_queued", queued,
|
||||||
"workflow_runs_unknown", unknown,
|
"workflow_runs_unknown", unknown,
|
||||||
"namespace", hra.Namespace,
|
"namespace", hra.Namespace,
|
||||||
"runner_deployment", rd.Name,
|
"kind", st.kind,
|
||||||
|
"name", st.st,
|
||||||
"horizontal_runner_autoscaler", hra.Name,
|
"horizontal_runner_autoscaler", hra.Name,
|
||||||
)
|
)
|
||||||
|
|
||||||
return &necessaryReplicas, nil
|
return &necessaryReplicas, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunnersBusy(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler, metrics v1alpha1.MetricSpec) (*int, error) {
|
func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunnersBusy(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics v1alpha1.MetricSpec) (*int, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
scaleUpThreshold := defaultScaleUpThreshold
|
scaleUpThreshold := defaultScaleUpThreshold
|
||||||
scaleDownThreshold := defaultScaleDownThreshold
|
scaleDownThreshold := defaultScaleDownThreshold
|
||||||
|
|
@ -294,41 +292,15 @@ func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunner
|
||||||
scaleDownFactor = sdf
|
scaleDownFactor = sdf
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the list of runners in namespace. Horizontal Runner Autoscaler should only be responsible for scaling resources in its own ns.
|
runnerMap, err := st.getRunnerMap()
|
||||||
var runnerList v1alpha1.RunnerList
|
|
||||||
|
|
||||||
var opts []client.ListOption
|
|
||||||
|
|
||||||
opts = append(opts, client.InNamespace(rd.Namespace))
|
|
||||||
|
|
||||||
selector, err := metav1.LabelSelectorAsSelector(getSelector(&rd))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
opts = append(opts, client.MatchingLabelsSelector{Selector: selector})
|
|
||||||
|
|
||||||
r.Log.V(2).Info("Finding runners with selector", "ns", rd.Namespace)
|
|
||||||
|
|
||||||
if err := r.List(
|
|
||||||
ctx,
|
|
||||||
&runnerList,
|
|
||||||
opts...,
|
|
||||||
); err != nil {
|
|
||||||
if !kerrors.IsNotFound(err) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
runnerMap := make(map[string]struct{})
|
|
||||||
for _, items := range runnerList.Items {
|
|
||||||
runnerMap[items.Name] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
enterprise = rd.Spec.Template.Spec.Enterprise
|
enterprise = st.enterprise
|
||||||
organization = rd.Spec.Template.Spec.Organization
|
organization = st.org
|
||||||
repository = rd.Spec.Template.Spec.Repository
|
repository = st.repo
|
||||||
)
|
)
|
||||||
|
|
||||||
// ListRunners will return all runners managed by GitHub - not restricted to ns
|
// ListRunners will return all runners managed by GitHub - not restricted to ns
|
||||||
|
|
@ -343,7 +315,7 @@ func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunner
|
||||||
|
|
||||||
var desiredReplicasBefore int
|
var desiredReplicasBefore int
|
||||||
|
|
||||||
if v := rd.Spec.Replicas; v == nil {
|
if v := st.replicas; v == nil {
|
||||||
desiredReplicasBefore = 1
|
desiredReplicasBefore = 1
|
||||||
} else {
|
} else {
|
||||||
desiredReplicasBefore = *v
|
desiredReplicasBefore = *v
|
||||||
|
|
@ -355,7 +327,7 @@ func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunner
|
||||||
numRunnersBusy int
|
numRunnersBusy int
|
||||||
)
|
)
|
||||||
|
|
||||||
numRunners = len(runnerList.Items)
|
numRunners = len(runnerMap)
|
||||||
|
|
||||||
for _, runner := range runners {
|
for _, runner := range runners {
|
||||||
if _, ok := runnerMap[*runner.Name]; ok {
|
if _, ok := runnerMap[*runner.Name]; ok {
|
||||||
|
|
@ -382,7 +354,7 @@ func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunner
|
||||||
desiredReplicas = int(float64(desiredReplicasBefore) * scaleDownFactor)
|
desiredReplicas = int(float64(desiredReplicasBefore) * scaleDownFactor)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
desiredReplicas = *rd.Spec.Replicas
|
desiredReplicas = *st.replicas
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTES for operators:
|
// NOTES for operators:
|
||||||
|
|
@ -398,7 +370,8 @@ func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunner
|
||||||
"num_runners_registered", numRunnersRegistered,
|
"num_runners_registered", numRunnersRegistered,
|
||||||
"num_runners_busy", numRunnersBusy,
|
"num_runners_busy", numRunnersBusy,
|
||||||
"namespace", hra.Namespace,
|
"namespace", hra.Namespace,
|
||||||
"runner_deployment", rd.Name,
|
"kind", st.kind,
|
||||||
|
"name", st.st,
|
||||||
"horizontal_runner_autoscaler", hra.Name,
|
"horizontal_runner_autoscaler", hra.Name,
|
||||||
"enterprise", enterprise,
|
"enterprise", enterprise,
|
||||||
"organization", organization,
|
"organization", organization,
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package controllers
|
package controllers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
@ -231,7 +232,9 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
got, _, _, err := h.computeReplicasWithCache(log, metav1Now.Time, rd, hra, minReplicas)
|
st := h.scaleTargetFromRD(context.Background(), rd)
|
||||||
|
|
||||||
|
got, _, _, err := h.computeReplicasWithCache(log, metav1Now.Time, st, hra, minReplicas)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if tc.err == "" {
|
if tc.err == "" {
|
||||||
t.Fatalf("unexpected error: expected none, got %v", err)
|
t.Fatalf("unexpected error: expected none, got %v", err)
|
||||||
|
|
@ -497,7 +500,9 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
got, _, _, err := h.computeReplicasWithCache(log, metav1Now.Time, rd, hra, minReplicas)
|
st := h.scaleTargetFromRD(context.Background(), rd)
|
||||||
|
|
||||||
|
got, _, _, err := h.computeReplicasWithCache(log, metav1Now.Time, st, hra, minReplicas)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if tc.err == "" {
|
if tc.err == "" {
|
||||||
t.Fatalf("unexpected error: expected none, got %v", err)
|
t.Fatalf("unexpected error: expected none, got %v", err)
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
|
||||||
"github.com/actions-runner-controller/actions-runner-controller/github"
|
"github.com/actions-runner-controller/actions-runner-controller/github"
|
||||||
|
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
|
||||||
"github.com/go-logr/logr"
|
"github.com/go-logr/logr"
|
||||||
|
|
@ -77,18 +78,181 @@ func (r *HorizontalRunnerAutoscalerReconciler) Reconcile(ctx context.Context, re
|
||||||
|
|
||||||
metrics.SetHorizontalRunnerAutoscalerSpec(hra.ObjectMeta, hra.Spec)
|
metrics.SetHorizontalRunnerAutoscalerSpec(hra.ObjectMeta, hra.Spec)
|
||||||
|
|
||||||
var rd v1alpha1.RunnerDeployment
|
kind := hra.Spec.ScaleTargetRef.Kind
|
||||||
if err := r.Get(ctx, types.NamespacedName{
|
|
||||||
Namespace: req.Namespace,
|
switch kind {
|
||||||
Name: hra.Spec.ScaleTargetRef.Name,
|
case "", "RunnerDeployment":
|
||||||
}, &rd); err != nil {
|
var rd v1alpha1.RunnerDeployment
|
||||||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
if err := r.Get(ctx, types.NamespacedName{
|
||||||
|
Namespace: req.Namespace,
|
||||||
|
Name: hra.Spec.ScaleTargetRef.Name,
|
||||||
|
}, &rd); err != nil {
|
||||||
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rd.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||||
|
return ctrl.Result{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
st := r.scaleTargetFromRD(ctx, rd)
|
||||||
|
|
||||||
|
return r.reconcile(ctx, req, log, hra, st, func(newDesiredReplicas int) error {
|
||||||
|
currentDesiredReplicas := getIntOrDefault(rd.Spec.Replicas, defaultReplicas)
|
||||||
|
|
||||||
|
// Please add more conditions that we can in-place update the newest runnerreplicaset without disruption
|
||||||
|
if currentDesiredReplicas != newDesiredReplicas {
|
||||||
|
copy := rd.DeepCopy()
|
||||||
|
copy.Spec.Replicas = &newDesiredReplicas
|
||||||
|
|
||||||
|
if err := r.Client.Patch(ctx, copy, client.MergeFrom(&rd)); err != nil {
|
||||||
|
return fmt.Errorf("patching runnerdeployment to have %d replicas: %w", newDesiredReplicas, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
case "RunnerSet":
|
||||||
|
var rs v1alpha1.RunnerSet
|
||||||
|
if err := r.Get(ctx, types.NamespacedName{
|
||||||
|
Namespace: req.Namespace,
|
||||||
|
Name: hra.Spec.ScaleTargetRef.Name,
|
||||||
|
}, &rs); err != nil {
|
||||||
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rs.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||||
|
return ctrl.Result{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var replicas *int
|
||||||
|
|
||||||
|
if rs.Spec.Replicas != nil {
|
||||||
|
v := int(*rs.Spec.Replicas)
|
||||||
|
replicas = &v
|
||||||
|
}
|
||||||
|
|
||||||
|
st := scaleTarget{
|
||||||
|
st: rs.Name,
|
||||||
|
kind: "runnerset",
|
||||||
|
enterprise: rs.Spec.Enterprise,
|
||||||
|
org: rs.Spec.Organization,
|
||||||
|
repo: rs.Spec.Repository,
|
||||||
|
replicas: replicas,
|
||||||
|
getRunnerMap: func() (map[string]struct{}, error) {
|
||||||
|
// return the list of runners in namespace. Horizontal Runner Autoscaler should only be responsible for scaling resources in its own ns.
|
||||||
|
var runnerPodList corev1.PodList
|
||||||
|
|
||||||
|
var opts []client.ListOption
|
||||||
|
|
||||||
|
opts = append(opts, client.InNamespace(rs.Namespace))
|
||||||
|
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
opts = append(opts, client.MatchingLabelsSelector{Selector: selector})
|
||||||
|
|
||||||
|
r.Log.V(2).Info("Finding runnerset's runner pods with selector", "ns", rs.Namespace)
|
||||||
|
|
||||||
|
if err := r.List(
|
||||||
|
ctx,
|
||||||
|
&runnerPodList,
|
||||||
|
opts...,
|
||||||
|
); err != nil {
|
||||||
|
if !kerrors.IsNotFound(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runnerMap := make(map[string]struct{})
|
||||||
|
for _, items := range runnerPodList.Items {
|
||||||
|
runnerMap[items.Name] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return runnerMap, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.reconcile(ctx, req, log, hra, st, func(newDesiredReplicas int) error {
|
||||||
|
var replicas *int
|
||||||
|
if rs.Spec.Replicas != nil {
|
||||||
|
v := int(*rs.Spec.Replicas)
|
||||||
|
replicas = &v
|
||||||
|
}
|
||||||
|
currentDesiredReplicas := getIntOrDefault(replicas, defaultReplicas)
|
||||||
|
|
||||||
|
if currentDesiredReplicas != newDesiredReplicas {
|
||||||
|
copy := rs.DeepCopy()
|
||||||
|
v := int32(newDesiredReplicas)
|
||||||
|
copy.Spec.Replicas = &v
|
||||||
|
|
||||||
|
if err := r.Client.Patch(ctx, copy, client.MergeFrom(&rs)); err != nil {
|
||||||
|
return fmt.Errorf("patching runnerset to have %d replicas: %w", newDesiredReplicas, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if !rd.ObjectMeta.DeletionTimestamp.IsZero() {
|
log.Info(fmt.Sprintf("Unsupported scale target %s %s: kind %s is not supported. valid kinds are %s and %s", kind, hra.Spec.ScaleTargetRef.Name, kind, "RunnerDeployment", "RunnerSet"))
|
||||||
return ctrl.Result{}, nil
|
|
||||||
|
return ctrl.Result{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *HorizontalRunnerAutoscalerReconciler) scaleTargetFromRD(ctx context.Context, rd v1alpha1.RunnerDeployment) scaleTarget {
|
||||||
|
st := scaleTarget{
|
||||||
|
st: rd.Name,
|
||||||
|
kind: "runnerdeployment",
|
||||||
|
enterprise: rd.Spec.Template.Spec.Enterprise,
|
||||||
|
org: rd.Spec.Template.Spec.Organization,
|
||||||
|
repo: rd.Spec.Template.Spec.Repository,
|
||||||
|
replicas: rd.Spec.Replicas,
|
||||||
|
getRunnerMap: func() (map[string]struct{}, error) {
|
||||||
|
// return the list of runners in namespace. Horizontal Runner Autoscaler should only be responsible for scaling resources in its own ns.
|
||||||
|
var runnerList v1alpha1.RunnerList
|
||||||
|
|
||||||
|
var opts []client.ListOption
|
||||||
|
|
||||||
|
opts = append(opts, client.InNamespace(rd.Namespace))
|
||||||
|
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(getSelector(&rd))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
opts = append(opts, client.MatchingLabelsSelector{Selector: selector})
|
||||||
|
|
||||||
|
r.Log.V(2).Info("Finding runners with selector", "ns", rd.Namespace)
|
||||||
|
|
||||||
|
if err := r.List(
|
||||||
|
ctx,
|
||||||
|
&runnerList,
|
||||||
|
opts...,
|
||||||
|
); err != nil {
|
||||||
|
if !kerrors.IsNotFound(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runnerMap := make(map[string]struct{})
|
||||||
|
for _, items := range runnerList.Items {
|
||||||
|
runnerMap[items.Name] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return runnerMap, nil
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return st
|
||||||
|
}
|
||||||
|
|
||||||
|
type scaleTarget struct {
|
||||||
|
st, kind string
|
||||||
|
enterprise, repo, org string
|
||||||
|
replicas *int
|
||||||
|
|
||||||
|
getRunnerMap func() (map[string]struct{}, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *HorizontalRunnerAutoscalerReconciler) reconcile(ctx context.Context, req ctrl.Request, log logr.Logger, hra v1alpha1.HorizontalRunnerAutoscaler, st scaleTarget, updatedDesiredReplicas func(int) error) (ctrl.Result, error) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
minReplicas, active, upcoming, err := r.getMinReplicas(log, now, hra)
|
minReplicas, active, upcoming, err := r.getMinReplicas(log, now, hra)
|
||||||
|
|
@ -98,7 +262,7 @@ func (r *HorizontalRunnerAutoscalerReconciler) Reconcile(ctx context.Context, re
|
||||||
return ctrl.Result{}, err
|
return ctrl.Result{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
newDesiredReplicas, computedReplicas, computedReplicasFromCache, err := r.computeReplicasWithCache(log, now, rd, hra, minReplicas)
|
newDesiredReplicas, computedReplicas, computedReplicasFromCache, err := r.computeReplicasWithCache(log, now, st, hra, minReplicas)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Recorder.Event(&hra, corev1.EventTypeNormal, "RunnerAutoscalingFailure", err.Error())
|
r.Recorder.Event(&hra, corev1.EventTypeNormal, "RunnerAutoscalingFailure", err.Error())
|
||||||
|
|
||||||
|
|
@ -107,16 +271,8 @@ func (r *HorizontalRunnerAutoscalerReconciler) Reconcile(ctx context.Context, re
|
||||||
return ctrl.Result{}, err
|
return ctrl.Result{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
currentDesiredReplicas := getIntOrDefault(rd.Spec.Replicas, defaultReplicas)
|
if err := updatedDesiredReplicas(newDesiredReplicas); err != nil {
|
||||||
|
return ctrl.Result{}, err
|
||||||
// Please add more conditions that we can in-place update the newest runnerreplicaset without disruption
|
|
||||||
if currentDesiredReplicas != newDesiredReplicas {
|
|
||||||
copy := rd.DeepCopy()
|
|
||||||
copy.Spec.Replicas = &newDesiredReplicas
|
|
||||||
|
|
||||||
if err := r.Client.Patch(ctx, copy, client.MergeFrom(&rd)); err != nil {
|
|
||||||
return ctrl.Result{}, fmt.Errorf("patching runnerdeployment to have %d replicas: %w", newDesiredReplicas, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
updated := hra.DeepCopy()
|
updated := hra.DeepCopy()
|
||||||
|
|
@ -287,7 +443,7 @@ func (r *HorizontalRunnerAutoscalerReconciler) getMinReplicas(log logr.Logger, n
|
||||||
return minReplicas, active, upcoming, nil
|
return minReplicas, active, upcoming, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *HorizontalRunnerAutoscalerReconciler) computeReplicasWithCache(log logr.Logger, now time.Time, rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler, minReplicas int) (int, int, *int, error) {
|
func (r *HorizontalRunnerAutoscalerReconciler) computeReplicasWithCache(log logr.Logger, now time.Time, st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, minReplicas int) (int, int, *int, error) {
|
||||||
var suggestedReplicas int
|
var suggestedReplicas int
|
||||||
|
|
||||||
suggestedReplicasFromCache := r.fetchSuggestedReplicasFromCache(hra)
|
suggestedReplicasFromCache := r.fetchSuggestedReplicasFromCache(hra)
|
||||||
|
|
@ -303,7 +459,7 @@ func (r *HorizontalRunnerAutoscalerReconciler) computeReplicasWithCache(log logr
|
||||||
suggestedReplicas = *cached
|
suggestedReplicas = *cached
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
v, err := r.suggestDesiredReplicas(rd, hra)
|
v, err := r.suggestDesiredReplicas(st, hra)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, nil, err
|
return 0, 0, nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue