actions-runner-controller/controllers/autoscaling.go

372 lines
11 KiB
Go

package controllers
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"strings"
"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
"github.com/google/go-github/v45/github"
)
const (
defaultScaleUpThreshold = 0.8
defaultScaleDownThreshold = 0.3
defaultScaleUpFactor = 1.3
defaultScaleDownFactor = 0.7
)
func (r *HorizontalRunnerAutoscalerReconciler) suggestDesiredReplicas(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
if hra.Spec.MinReplicas == nil {
return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing minReplicas", hra.Namespace, hra.Name)
} else if hra.Spec.MaxReplicas == nil {
return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing maxReplicas", hra.Namespace, hra.Name)
}
metrics := hra.Spec.Metrics
numMetrics := len(metrics)
if numMetrics == 0 {
// We don't default to anything since ARC 0.23.0
// See https://github.com/actions-runner-controller/actions-runner-controller/issues/728
return nil, nil
} else if numMetrics > 2 {
return nil, fmt.Errorf("too many autoscaling metrics configured: It must be 0 to 2, but got %d", numMetrics)
}
primaryMetric := metrics[0]
primaryMetricType := primaryMetric.Type
var (
suggested *int
err error
)
switch primaryMetricType {
case v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns:
suggested, err = r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &primaryMetric)
case v1alpha1.AutoscalingMetricTypePercentageRunnersBusy:
suggested, err = r.suggestReplicasByPercentageRunnersBusy(st, hra, primaryMetric)
default:
return nil, fmt.Errorf("validating autoscaling metrics: unsupported metric type %q", primaryMetric)
}
if err != nil {
return nil, err
}
if suggested != nil && *suggested > 0 {
return suggested, nil
}
if len(metrics) == 1 {
// This is never supposed to happen but anyway-
// Fall-back to `minReplicas + capacityReservedThroughWebhook`.
return nil, nil
}
// At this point, we are sure that there are exactly 2 Metrics entries.
fallbackMetric := metrics[1]
fallbackMetricType := fallbackMetric.Type
if primaryMetricType != v1alpha1.AutoscalingMetricTypePercentageRunnersBusy ||
fallbackMetricType != v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns {
return nil, fmt.Errorf(
"invalid HRA Spec: Metrics[0] of %s cannot be combined with Metrics[1] of %s: The only allowed combination is 0=PercentageRunnersBusy and 1=TotalNumberOfQueuedAndInProgressWorkflowRuns",
primaryMetricType, fallbackMetricType,
)
}
return r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &fallbackMetric)
}
func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByQueuedAndInProgressWorkflowRuns(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics *v1alpha1.MetricSpec) (*int, error) {
var repos [][]string
repoID := st.repo
if repoID == "" {
orgName := st.org
if orgName == "" {
return nil, fmt.Errorf("asserting runner deployment spec to detect bug: spec.template.organization should not be empty on this code path")
}
// In case it's an organizational runners deployment without any scaling metrics defined,
// we assume that the desired replicas should always be `minReplicas + capacityReservedThroughWebhook`.
// See https://github.com/actions-runner-controller/actions-runner-controller/issues/377#issuecomment-793372693
if metrics == nil {
return nil, nil
}
if len(metrics.RepositoryNames) == 0 {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment")
}
for _, repoName := range metrics.RepositoryNames {
repos = append(repos, []string{orgName, repoName})
}
} else {
repo := strings.Split(repoID, "/")
repos = append(repos, repo)
}
var total, inProgress, queued, completed, unknown int
type callback func()
listWorkflowJobs := func(user string, repoName string, runID int64, fallback_cb callback) {
if runID == 0 {
fallback_cb()
return
}
opt := github.ListWorkflowJobsOptions{ListOptions: github.ListOptions{PerPage: 50}}
var allJobs []*github.WorkflowJob
for {
jobs, resp, err := r.GitHubClient.Actions.ListWorkflowJobs(context.TODO(), user, repoName, runID, &opt)
if err != nil {
r.Log.Error(err, "Error listing workflow jobs")
return //err
}
allJobs = append(allJobs, jobs.Jobs...)
if resp.NextPage == 0 {
break
}
opt.Page = resp.NextPage
}
if len(allJobs) == 0 {
fallback_cb()
} else {
JOB:
for _, job := range allJobs {
runnerLabels := make(map[string]struct{}, len(st.labels))
for _, l := range st.labels {
runnerLabels[l] = struct{}{}
}
if len(job.Labels) == 0 {
// This shouldn't usually happen
r.Log.Info("Detected job with no labels, which is not supported by ARC. Skipping anyway.", "labels", job.Labels, "run_id", job.GetRunID(), "job_id", job.GetID())
continue JOB
}
for _, l := range job.Labels {
if l == "self-hosted" {
continue
}
if _, ok := runnerLabels[l]; !ok {
continue JOB
}
}
switch job.GetStatus() {
case "completed":
// We add a case for `completed` so it is not counted in `unknown`.
// And we do not increment the counter for completed because
// that counter only refers to workflows. The reason for
// this is because we do not get a list of jobs for
// completed workflows in order to keep the number of API
// calls to a minimum.
case "in_progress":
inProgress++
case "queued":
queued++
default:
unknown++
}
}
}
}
for _, repo := range repos {
user, repoName := repo[0], repo[1]
workflowRuns, err := r.GitHubClient.ListRepositoryWorkflowRuns(context.TODO(), user, repoName)
if err != nil {
return nil, err
}
for _, run := range workflowRuns {
total++
// In May 2020, there are only 3 statuses.
// Follow the below links for more details:
// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
// - https://developer.github.com/v3/checks/runs/#create-a-check-run
switch run.GetStatus() {
case "completed":
completed++
case "in_progress":
listWorkflowJobs(user, repoName, run.GetID(), func() { inProgress++ })
case "queued":
listWorkflowJobs(user, repoName, run.GetID(), func() { queued++ })
default:
unknown++
}
}
}
necessaryReplicas := queued + inProgress
r.Log.V(1).Info(
fmt.Sprintf("Suggested desired replicas of %d by TotalNumberOfQueuedAndInProgressWorkflowRuns", necessaryReplicas),
"workflow_runs_completed", completed,
"workflow_runs_in_progress", inProgress,
"workflow_runs_queued", queued,
"workflow_runs_unknown", unknown,
"namespace", hra.Namespace,
"kind", st.kind,
"name", st.st,
"horizontal_runner_autoscaler", hra.Name,
)
return &necessaryReplicas, nil
}
func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunnersBusy(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics v1alpha1.MetricSpec) (*int, error) {
ctx := context.Background()
scaleUpThreshold := defaultScaleUpThreshold
scaleDownThreshold := defaultScaleDownThreshold
scaleUpFactor := defaultScaleUpFactor
scaleDownFactor := defaultScaleDownFactor
if metrics.ScaleUpThreshold != "" {
sut, err := strconv.ParseFloat(metrics.ScaleUpThreshold, 64)
if err != nil {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpThreshold cannot be parsed into a float64")
}
scaleUpThreshold = sut
}
if metrics.ScaleDownThreshold != "" {
sdt, err := strconv.ParseFloat(metrics.ScaleDownThreshold, 64)
if err != nil {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownThreshold cannot be parsed into a float64")
}
scaleDownThreshold = sdt
}
scaleUpAdjustment := metrics.ScaleUpAdjustment
if scaleUpAdjustment != 0 {
if metrics.ScaleUpAdjustment < 0 {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpAdjustment cannot be lower than 0")
}
if metrics.ScaleUpFactor != "" {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleUpAdjustment and scaleUpFactor cannot be specified together")
}
} else if metrics.ScaleUpFactor != "" {
suf, err := strconv.ParseFloat(metrics.ScaleUpFactor, 64)
if err != nil {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpFactor cannot be parsed into a float64")
}
scaleUpFactor = suf
}
scaleDownAdjustment := metrics.ScaleDownAdjustment
if scaleDownAdjustment != 0 {
if metrics.ScaleDownAdjustment < 0 {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownAdjustment cannot be lower than 0")
}
if metrics.ScaleDownFactor != "" {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleDownAdjustment and scaleDownFactor cannot be specified together")
}
} else if metrics.ScaleDownFactor != "" {
sdf, err := strconv.ParseFloat(metrics.ScaleDownFactor, 64)
if err != nil {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownFactor cannot be parsed into a float64")
}
scaleDownFactor = sdf
}
runnerMap, err := st.getRunnerMap()
if err != nil {
return nil, err
}
var (
enterprise = st.enterprise
organization = st.org
repository = st.repo
)
// ListRunners will return all runners managed by GitHub - not restricted to ns
runners, err := r.GitHubClient.ListRunners(
ctx,
enterprise,
organization,
repository)
if err != nil {
return nil, err
}
var desiredReplicasBefore int
if v := st.replicas; v == nil {
desiredReplicasBefore = 1
} else {
desiredReplicasBefore = *v
}
var (
numRunners int
numRunnersRegistered int
numRunnersBusy int
)
numRunners = len(runnerMap)
for _, runner := range runners {
if _, ok := runnerMap[*runner.Name]; ok {
numRunnersRegistered++
if runner.GetBusy() {
numRunnersBusy++
}
}
}
var desiredReplicas int
fractionBusy := float64(numRunnersBusy) / float64(desiredReplicasBefore)
if fractionBusy >= scaleUpThreshold {
if scaleUpAdjustment > 0 {
desiredReplicas = desiredReplicasBefore + scaleUpAdjustment
} else {
desiredReplicas = int(math.Ceil(float64(desiredReplicasBefore) * scaleUpFactor))
}
} else if fractionBusy < scaleDownThreshold {
if scaleDownAdjustment > 0 {
desiredReplicas = desiredReplicasBefore - scaleDownAdjustment
} else {
desiredReplicas = int(float64(desiredReplicasBefore) * scaleDownFactor)
}
} else {
desiredReplicas = *st.replicas
}
// NOTES for operators:
//
// - num_runners can be as twice as large as replicas_desired_before while
// the runnerdeployment controller is replacing RunnerReplicaSet for runner update.
r.Log.V(1).Info(
fmt.Sprintf("Suggested desired replicas of %d by PercentageRunnersBusy", desiredReplicas),
"replicas_desired_before", desiredReplicasBefore,
"replicas_desired", desiredReplicas,
"num_runners", numRunners,
"num_runners_registered", numRunnersRegistered,
"num_runners_busy", numRunnersBusy,
"namespace", hra.Namespace,
"kind", st.kind,
"name", st.st,
"horizontal_runner_autoscaler", hra.Name,
"enterprise", enterprise,
"organization", organization,
"repository", repository,
)
return &desiredReplicas, nil
}