405 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			405 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
package controllers
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"math"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
 | 
						|
	"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
 | 
						|
	"github.com/google/go-github/v45/github"
 | 
						|
	corev1 "k8s.io/api/core/v1"
 | 
						|
	"sigs.k8s.io/controller-runtime/pkg/client"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	defaultScaleUpThreshold   = 0.8
 | 
						|
	defaultScaleDownThreshold = 0.3
 | 
						|
	defaultScaleUpFactor      = 1.3
 | 
						|
	defaultScaleDownFactor    = 0.7
 | 
						|
)
 | 
						|
 | 
						|
func (r *HorizontalRunnerAutoscalerReconciler) suggestDesiredReplicas(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
 | 
						|
	if hra.Spec.MinReplicas == nil {
 | 
						|
		return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing minReplicas", hra.Namespace, hra.Name)
 | 
						|
	} else if hra.Spec.MaxReplicas == nil {
 | 
						|
		return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing maxReplicas", hra.Namespace, hra.Name)
 | 
						|
	}
 | 
						|
 | 
						|
	metrics := hra.Spec.Metrics
 | 
						|
	numMetrics := len(metrics)
 | 
						|
	if numMetrics == 0 {
 | 
						|
		// We don't default to anything since ARC 0.23.0
 | 
						|
		// See https://github.com/actions-runner-controller/actions-runner-controller/issues/728
 | 
						|
		return nil, nil
 | 
						|
	} else if numMetrics > 2 {
 | 
						|
		return nil, fmt.Errorf("too many autoscaling metrics configured: It must be 0 to 2, but got %d", numMetrics)
 | 
						|
	}
 | 
						|
 | 
						|
	primaryMetric := metrics[0]
 | 
						|
	primaryMetricType := primaryMetric.Type
 | 
						|
 | 
						|
	var (
 | 
						|
		suggested *int
 | 
						|
		err       error
 | 
						|
	)
 | 
						|
 | 
						|
	switch primaryMetricType {
 | 
						|
	case v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns:
 | 
						|
		suggested, err = r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &primaryMetric)
 | 
						|
	case v1alpha1.AutoscalingMetricTypePercentageRunnersBusy:
 | 
						|
		suggested, err = r.suggestReplicasByPercentageRunnersBusy(st, hra, primaryMetric)
 | 
						|
	default:
 | 
						|
		return nil, fmt.Errorf("validating autoscaling metrics: unsupported metric type %q", primaryMetric)
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if suggested != nil && *suggested > 0 {
 | 
						|
		return suggested, nil
 | 
						|
	}
 | 
						|
 | 
						|
	if len(metrics) == 1 {
 | 
						|
		// This is never supposed to happen but anyway-
 | 
						|
		// Fall-back to `minReplicas + capacityReservedThroughWebhook`.
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// At this point, we are sure that there are exactly 2 Metrics entries.
 | 
						|
 | 
						|
	fallbackMetric := metrics[1]
 | 
						|
	fallbackMetricType := fallbackMetric.Type
 | 
						|
 | 
						|
	if primaryMetricType != v1alpha1.AutoscalingMetricTypePercentageRunnersBusy ||
 | 
						|
		fallbackMetricType != v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns {
 | 
						|
 | 
						|
		return nil, fmt.Errorf(
 | 
						|
			"invalid HRA Spec: Metrics[0] of %s cannot be combined with Metrics[1] of %s: The only allowed combination is 0=PercentageRunnersBusy and 1=TotalNumberOfQueuedAndInProgressWorkflowRuns",
 | 
						|
			primaryMetricType, fallbackMetricType,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	return r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &fallbackMetric)
 | 
						|
}
 | 
						|
 | 
						|
func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByQueuedAndInProgressWorkflowRuns(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics *v1alpha1.MetricSpec) (*int, error) {
 | 
						|
 | 
						|
	var repos [][]string
 | 
						|
	repoID := st.repo
 | 
						|
	if repoID == "" {
 | 
						|
		orgName := st.org
 | 
						|
		if orgName == "" {
 | 
						|
			return nil, fmt.Errorf("asserting runner deployment spec to detect bug: spec.template.organization should not be empty on this code path")
 | 
						|
		}
 | 
						|
 | 
						|
		// In case it's an organizational runners deployment without any scaling metrics defined,
 | 
						|
		// we assume that the desired replicas should always be `minReplicas + capacityReservedThroughWebhook`.
 | 
						|
		// See https://github.com/actions-runner-controller/actions-runner-controller/issues/377#issuecomment-793372693
 | 
						|
		if metrics == nil {
 | 
						|
			return nil, nil
 | 
						|
		}
 | 
						|
 | 
						|
		if len(metrics.RepositoryNames) == 0 {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment")
 | 
						|
		}
 | 
						|
 | 
						|
		for _, repoName := range metrics.RepositoryNames {
 | 
						|
			repos = append(repos, []string{orgName, repoName})
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		repo := strings.Split(repoID, "/")
 | 
						|
 | 
						|
		repos = append(repos, repo)
 | 
						|
	}
 | 
						|
 | 
						|
	var total, inProgress, queued, completed, unknown int
 | 
						|
	type callback func()
 | 
						|
	listWorkflowJobs := func(user string, repoName string, runID int64, fallback_cb callback) {
 | 
						|
		if runID == 0 {
 | 
						|
			fallback_cb()
 | 
						|
			return
 | 
						|
		}
 | 
						|
		opt := github.ListWorkflowJobsOptions{ListOptions: github.ListOptions{PerPage: 50}}
 | 
						|
		var allJobs []*github.WorkflowJob
 | 
						|
		for {
 | 
						|
			jobs, resp, err := r.GitHubClient.Actions.ListWorkflowJobs(context.TODO(), user, repoName, runID, &opt)
 | 
						|
			if err != nil {
 | 
						|
				r.Log.Error(err, "Error listing workflow jobs")
 | 
						|
				return //err
 | 
						|
			}
 | 
						|
			allJobs = append(allJobs, jobs.Jobs...)
 | 
						|
			if resp.NextPage == 0 {
 | 
						|
				break
 | 
						|
			}
 | 
						|
			opt.Page = resp.NextPage
 | 
						|
		}
 | 
						|
		if len(allJobs) == 0 {
 | 
						|
			fallback_cb()
 | 
						|
		} else {
 | 
						|
		JOB:
 | 
						|
			for _, job := range allJobs {
 | 
						|
				runnerLabels := make(map[string]struct{}, len(st.labels))
 | 
						|
				for _, l := range st.labels {
 | 
						|
					runnerLabels[l] = struct{}{}
 | 
						|
				}
 | 
						|
 | 
						|
				if len(job.Labels) == 0 {
 | 
						|
					// This shouldn't usually happen
 | 
						|
					r.Log.Info("Detected job with no labels, which is not supported by ARC. Skipping anyway.", "labels", job.Labels, "run_id", job.GetRunID(), "job_id", job.GetID())
 | 
						|
					continue JOB
 | 
						|
				}
 | 
						|
 | 
						|
				for _, l := range job.Labels {
 | 
						|
					if l == "self-hosted" {
 | 
						|
						continue
 | 
						|
					}
 | 
						|
 | 
						|
					if _, ok := runnerLabels[l]; !ok {
 | 
						|
						continue JOB
 | 
						|
					}
 | 
						|
				}
 | 
						|
 | 
						|
				switch job.GetStatus() {
 | 
						|
				case "completed":
 | 
						|
					// We add a case for `completed` so it is not counted in `unknown`.
 | 
						|
					// And we do not increment the counter for completed because
 | 
						|
					// that counter only refers to workflows. The reason for
 | 
						|
					// this is because we do not get a list of jobs for
 | 
						|
					// completed workflows in order to keep the number of API
 | 
						|
					// calls to a minimum.
 | 
						|
				case "in_progress":
 | 
						|
					inProgress++
 | 
						|
				case "queued":
 | 
						|
					queued++
 | 
						|
				default:
 | 
						|
					unknown++
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	for _, repo := range repos {
 | 
						|
		user, repoName := repo[0], repo[1]
 | 
						|
		workflowRuns, err := r.GitHubClient.ListRepositoryWorkflowRuns(context.TODO(), user, repoName)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		for _, run := range workflowRuns {
 | 
						|
			total++
 | 
						|
 | 
						|
			// In May 2020, there are only 3 statuses.
 | 
						|
			// Follow the below links for more details:
 | 
						|
			// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
 | 
						|
			// - https://developer.github.com/v3/checks/runs/#create-a-check-run
 | 
						|
			switch run.GetStatus() {
 | 
						|
			case "completed":
 | 
						|
				completed++
 | 
						|
			case "in_progress":
 | 
						|
				listWorkflowJobs(user, repoName, run.GetID(), func() { inProgress++ })
 | 
						|
			case "queued":
 | 
						|
				listWorkflowJobs(user, repoName, run.GetID(), func() { queued++ })
 | 
						|
			default:
 | 
						|
				unknown++
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	necessaryReplicas := queued + inProgress
 | 
						|
 | 
						|
	r.Log.V(1).Info(
 | 
						|
		fmt.Sprintf("Suggested desired replicas of %d by TotalNumberOfQueuedAndInProgressWorkflowRuns", necessaryReplicas),
 | 
						|
		"workflow_runs_completed", completed,
 | 
						|
		"workflow_runs_in_progress", inProgress,
 | 
						|
		"workflow_runs_queued", queued,
 | 
						|
		"workflow_runs_unknown", unknown,
 | 
						|
		"namespace", hra.Namespace,
 | 
						|
		"kind", st.kind,
 | 
						|
		"name", st.st,
 | 
						|
		"horizontal_runner_autoscaler", hra.Name,
 | 
						|
	)
 | 
						|
 | 
						|
	return &necessaryReplicas, nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunnersBusy(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics v1alpha1.MetricSpec) (*int, error) {
 | 
						|
	ctx := context.Background()
 | 
						|
	scaleUpThreshold := defaultScaleUpThreshold
 | 
						|
	scaleDownThreshold := defaultScaleDownThreshold
 | 
						|
	scaleUpFactor := defaultScaleUpFactor
 | 
						|
	scaleDownFactor := defaultScaleDownFactor
 | 
						|
 | 
						|
	if metrics.ScaleUpThreshold != "" {
 | 
						|
		sut, err := strconv.ParseFloat(metrics.ScaleUpThreshold, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpThreshold cannot be parsed into a float64")
 | 
						|
		}
 | 
						|
		scaleUpThreshold = sut
 | 
						|
	}
 | 
						|
	if metrics.ScaleDownThreshold != "" {
 | 
						|
		sdt, err := strconv.ParseFloat(metrics.ScaleDownThreshold, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownThreshold cannot be parsed into a float64")
 | 
						|
		}
 | 
						|
 | 
						|
		scaleDownThreshold = sdt
 | 
						|
	}
 | 
						|
 | 
						|
	scaleUpAdjustment := metrics.ScaleUpAdjustment
 | 
						|
	if scaleUpAdjustment != 0 {
 | 
						|
		if metrics.ScaleUpAdjustment < 0 {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpAdjustment cannot be lower than 0")
 | 
						|
		}
 | 
						|
 | 
						|
		if metrics.ScaleUpFactor != "" {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleUpAdjustment and scaleUpFactor cannot be specified together")
 | 
						|
		}
 | 
						|
	} else if metrics.ScaleUpFactor != "" {
 | 
						|
		suf, err := strconv.ParseFloat(metrics.ScaleUpFactor, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpFactor cannot be parsed into a float64")
 | 
						|
		}
 | 
						|
		scaleUpFactor = suf
 | 
						|
	}
 | 
						|
 | 
						|
	scaleDownAdjustment := metrics.ScaleDownAdjustment
 | 
						|
	if scaleDownAdjustment != 0 {
 | 
						|
		if metrics.ScaleDownAdjustment < 0 {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownAdjustment cannot be lower than 0")
 | 
						|
		}
 | 
						|
 | 
						|
		if metrics.ScaleDownFactor != "" {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleDownAdjustment and scaleDownFactor cannot be specified together")
 | 
						|
		}
 | 
						|
	} else if metrics.ScaleDownFactor != "" {
 | 
						|
		sdf, err := strconv.ParseFloat(metrics.ScaleDownFactor, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownFactor cannot be parsed into a float64")
 | 
						|
		}
 | 
						|
		scaleDownFactor = sdf
 | 
						|
	}
 | 
						|
 | 
						|
	runnerMap, err := st.getRunnerMap()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	var (
 | 
						|
		enterprise   = st.enterprise
 | 
						|
		organization = st.org
 | 
						|
		repository   = st.repo
 | 
						|
	)
 | 
						|
 | 
						|
	// ListRunners will return all runners managed by GitHub - not restricted to ns
 | 
						|
	runners, err := r.GitHubClient.ListRunners(
 | 
						|
		ctx,
 | 
						|
		enterprise,
 | 
						|
		organization,
 | 
						|
		repository)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	var desiredReplicasBefore int
 | 
						|
 | 
						|
	if v := st.replicas; v == nil {
 | 
						|
		desiredReplicasBefore = 1
 | 
						|
	} else {
 | 
						|
		desiredReplicasBefore = *v
 | 
						|
	}
 | 
						|
 | 
						|
	var (
 | 
						|
		numRunners           int
 | 
						|
		numRunnersRegistered int
 | 
						|
		numRunnersBusy       int
 | 
						|
		numTerminatingBusy   int
 | 
						|
	)
 | 
						|
 | 
						|
	numRunners = len(runnerMap)
 | 
						|
 | 
						|
	busyTerminatingRunnerPods := map[string]struct{}{}
 | 
						|
 | 
						|
	kindLabel := LabelKeyRunnerDeploymentName
 | 
						|
	if hra.Spec.ScaleTargetRef.Kind == "RunnerSet" {
 | 
						|
		kindLabel = LabelKeyRunnerSetName
 | 
						|
	}
 | 
						|
 | 
						|
	var runnerPodList corev1.PodList
 | 
						|
	if err := r.Client.List(ctx, &runnerPodList, client.InNamespace(hra.Namespace), client.MatchingLabels(map[string]string{
 | 
						|
		kindLabel: hra.Spec.ScaleTargetRef.Name,
 | 
						|
	})); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	for _, p := range runnerPodList.Items {
 | 
						|
		if p.Annotations[AnnotationKeyUnregistrationFailureMessage] != "" {
 | 
						|
			busyTerminatingRunnerPods[p.Name] = struct{}{}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	for _, runner := range runners {
 | 
						|
		if _, ok := runnerMap[*runner.Name]; ok {
 | 
						|
			numRunnersRegistered++
 | 
						|
 | 
						|
			if runner.GetBusy() {
 | 
						|
				numRunnersBusy++
 | 
						|
			} else if _, ok := busyTerminatingRunnerPods[*runner.Name]; ok {
 | 
						|
				numTerminatingBusy++
 | 
						|
			}
 | 
						|
 | 
						|
			delete(busyTerminatingRunnerPods, *runner.Name)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Remaining busyTerminatingRunnerPods are runners that were not on the ListRunners API response yet
 | 
						|
	for range busyTerminatingRunnerPods {
 | 
						|
		numTerminatingBusy++
 | 
						|
	}
 | 
						|
 | 
						|
	var desiredReplicas int
 | 
						|
	fractionBusy := float64(numRunnersBusy+numTerminatingBusy) / float64(desiredReplicasBefore)
 | 
						|
	if fractionBusy >= scaleUpThreshold {
 | 
						|
		if scaleUpAdjustment > 0 {
 | 
						|
			desiredReplicas = desiredReplicasBefore + scaleUpAdjustment
 | 
						|
		} else {
 | 
						|
			desiredReplicas = int(math.Ceil(float64(desiredReplicasBefore) * scaleUpFactor))
 | 
						|
		}
 | 
						|
	} else if fractionBusy < scaleDownThreshold {
 | 
						|
		if scaleDownAdjustment > 0 {
 | 
						|
			desiredReplicas = desiredReplicasBefore - scaleDownAdjustment
 | 
						|
		} else {
 | 
						|
			desiredReplicas = int(float64(desiredReplicasBefore) * scaleDownFactor)
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		desiredReplicas = *st.replicas
 | 
						|
	}
 | 
						|
 | 
						|
	// NOTES for operators:
 | 
						|
	//
 | 
						|
	// - num_runners can be as twice as large as replicas_desired_before while
 | 
						|
	//   the runnerdeployment controller is replacing RunnerReplicaSet for runner update.
 | 
						|
 | 
						|
	r.Log.V(1).Info(
 | 
						|
		fmt.Sprintf("Suggested desired replicas of %d by PercentageRunnersBusy", desiredReplicas),
 | 
						|
		"replicas_desired_before", desiredReplicasBefore,
 | 
						|
		"replicas_desired", desiredReplicas,
 | 
						|
		"num_runners", numRunners,
 | 
						|
		"num_runners_registered", numRunnersRegistered,
 | 
						|
		"num_runners_busy", numRunnersBusy,
 | 
						|
		"num_terminating_busy", numTerminatingBusy,
 | 
						|
		"namespace", hra.Namespace,
 | 
						|
		"kind", st.kind,
 | 
						|
		"name", st.st,
 | 
						|
		"horizontal_runner_autoscaler", hra.Name,
 | 
						|
		"enterprise", enterprise,
 | 
						|
		"organization", organization,
 | 
						|
		"repository", repository,
 | 
						|
	)
 | 
						|
 | 
						|
	return &desiredReplicas, nil
 | 
						|
}
 |