383 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			383 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
| package controllers
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"math"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	defaultScaleUpThreshold   = 0.8
 | |
| 	defaultScaleDownThreshold = 0.3
 | |
| 	defaultScaleUpFactor      = 1.3
 | |
| 	defaultScaleDownFactor    = 0.7
 | |
| )
 | |
| 
 | |
| func getValueAvailableAt(now time.Time, from, to *time.Time, reservedValue int) *int {
 | |
| 	if to != nil && now.After(*to) {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if from != nil && now.Before(*from) {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return &reservedValue
 | |
| }
 | |
| 
 | |
| func (r *HorizontalRunnerAutoscalerReconciler) fetchSuggestedReplicasFromCache(hra v1alpha1.HorizontalRunnerAutoscaler) *int {
 | |
| 	var entry *v1alpha1.CacheEntry
 | |
| 
 | |
| 	for i := range hra.Status.CacheEntries {
 | |
| 		ent := hra.Status.CacheEntries[i]
 | |
| 
 | |
| 		if ent.Key != v1alpha1.CacheEntryKeyDesiredReplicas {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if !time.Now().Before(ent.ExpirationTime.Time) {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		entry = &ent
 | |
| 
 | |
| 		break
 | |
| 	}
 | |
| 
 | |
| 	if entry != nil {
 | |
| 		v := getValueAvailableAt(time.Now(), nil, &entry.ExpirationTime.Time, entry.Value)
 | |
| 		if v != nil {
 | |
| 			return v
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (r *HorizontalRunnerAutoscalerReconciler) suggestDesiredReplicas(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
 | |
| 	if hra.Spec.MinReplicas == nil {
 | |
| 		return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing minReplicas", hra.Namespace, hra.Name)
 | |
| 	} else if hra.Spec.MaxReplicas == nil {
 | |
| 		return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing maxReplicas", hra.Namespace, hra.Name)
 | |
| 	}
 | |
| 
 | |
| 	metrics := hra.Spec.Metrics
 | |
| 	numMetrics := len(metrics)
 | |
| 	if numMetrics == 0 {
 | |
| 		if len(hra.Spec.ScaleUpTriggers) == 0 {
 | |
| 			return r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, nil)
 | |
| 		}
 | |
| 
 | |
| 		return nil, nil
 | |
| 	} else if numMetrics > 2 {
 | |
| 		return nil, fmt.Errorf("too many autoscaling metrics configured: It must be 0 to 2, but got %d", numMetrics)
 | |
| 	}
 | |
| 
 | |
| 	primaryMetric := metrics[0]
 | |
| 	primaryMetricType := primaryMetric.Type
 | |
| 
 | |
| 	var (
 | |
| 		suggested *int
 | |
| 		err       error
 | |
| 	)
 | |
| 
 | |
| 	switch primaryMetricType {
 | |
| 	case v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns:
 | |
| 		suggested, err = r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &primaryMetric)
 | |
| 	case v1alpha1.AutoscalingMetricTypePercentageRunnersBusy:
 | |
| 		suggested, err = r.suggestReplicasByPercentageRunnersBusy(st, hra, primaryMetric)
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("validating autoscaling metrics: unsupported metric type %q", primaryMetric)
 | |
| 	}
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if suggested != nil && *suggested > 0 {
 | |
| 		return suggested, nil
 | |
| 	}
 | |
| 
 | |
| 	if len(metrics) == 1 {
 | |
| 		// This is never supposed to happen but anyway-
 | |
| 		// Fall-back to `minReplicas + capacityReservedThroughWebhook`.
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 
 | |
| 	// At this point, we are sure that there are exactly 2 Metrics entries.
 | |
| 
 | |
| 	fallbackMetric := metrics[1]
 | |
| 	fallbackMetricType := fallbackMetric.Type
 | |
| 
 | |
| 	if primaryMetricType != v1alpha1.AutoscalingMetricTypePercentageRunnersBusy ||
 | |
| 		fallbackMetricType != v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns {
 | |
| 
 | |
| 		return nil, fmt.Errorf(
 | |
| 			"invalid HRA Spec: Metrics[0] of %s cannot be combined with Metrics[1] of %s: The only allowed combination is 0=PercentageRunnersBusy and 1=TotalNumberOfQueuedAndInProgressWorkflowRuns",
 | |
| 			primaryMetricType, fallbackMetricType,
 | |
| 		)
 | |
| 	}
 | |
| 
 | |
| 	return r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &fallbackMetric)
 | |
| }
 | |
| 
 | |
| func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByQueuedAndInProgressWorkflowRuns(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics *v1alpha1.MetricSpec) (*int, error) {
 | |
| 
 | |
| 	var repos [][]string
 | |
| 	repoID := st.repo
 | |
| 	if repoID == "" {
 | |
| 		orgName := st.org
 | |
| 		if orgName == "" {
 | |
| 			return nil, fmt.Errorf("asserting runner deployment spec to detect bug: spec.template.organization should not be empty on this code path")
 | |
| 		}
 | |
| 
 | |
| 		// In case it's an organizational runners deployment without any scaling metrics defined,
 | |
| 		// we assume that the desired replicas should always be `minReplicas + capacityReservedThroughWebhook`.
 | |
| 		// See https://github.com/actions-runner-controller/actions-runner-controller/issues/377#issuecomment-793372693
 | |
| 		if metrics == nil {
 | |
| 			return nil, nil
 | |
| 		}
 | |
| 
 | |
| 		if len(metrics.RepositoryNames) == 0 {
 | |
| 			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment")
 | |
| 		}
 | |
| 
 | |
| 		for _, repoName := range metrics.RepositoryNames {
 | |
| 			repos = append(repos, []string{orgName, repoName})
 | |
| 		}
 | |
| 	} else {
 | |
| 		repo := strings.Split(repoID, "/")
 | |
| 
 | |
| 		repos = append(repos, repo)
 | |
| 	}
 | |
| 
 | |
| 	var total, inProgress, queued, completed, unknown int
 | |
| 	type callback func()
 | |
| 	listWorkflowJobs := func(user string, repoName string, runID int64, fallback_cb callback) {
 | |
| 		if runID == 0 {
 | |
| 			fallback_cb()
 | |
| 			return
 | |
| 		}
 | |
| 		jobs, _, err := r.GitHubClient.Actions.ListWorkflowJobs(context.TODO(), user, repoName, runID, nil)
 | |
| 		if err != nil {
 | |
| 			r.Log.Error(err, "Error listing workflow jobs")
 | |
| 			fallback_cb()
 | |
| 		} else if len(jobs.Jobs) == 0 {
 | |
| 			fallback_cb()
 | |
| 		} else {
 | |
| 			for _, job := range jobs.Jobs {
 | |
| 				switch job.GetStatus() {
 | |
| 				case "completed":
 | |
| 					// We add a case for `completed` so it is not counted in `unknown`.
 | |
| 					// And we do not increment the counter for completed because
 | |
| 					// that counter only refers to workflows. The reason for
 | |
| 					// this is because we do not get a list of jobs for
 | |
| 					// completed workflows in order to keep the number of API
 | |
| 					// calls to a minimum.
 | |
| 				case "in_progress":
 | |
| 					inProgress++
 | |
| 				case "queued":
 | |
| 					queued++
 | |
| 				default:
 | |
| 					unknown++
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for _, repo := range repos {
 | |
| 		user, repoName := repo[0], repo[1]
 | |
| 		workflowRuns, err := r.GitHubClient.ListRepositoryWorkflowRuns(context.TODO(), user, repoName)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		for _, run := range workflowRuns {
 | |
| 			total++
 | |
| 
 | |
| 			// In May 2020, there are only 3 statuses.
 | |
| 			// Follow the below links for more details:
 | |
| 			// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
 | |
| 			// - https://developer.github.com/v3/checks/runs/#create-a-check-run
 | |
| 			switch run.GetStatus() {
 | |
| 			case "completed":
 | |
| 				completed++
 | |
| 			case "in_progress":
 | |
| 				listWorkflowJobs(user, repoName, run.GetID(), func() { inProgress++ })
 | |
| 			case "queued":
 | |
| 				listWorkflowJobs(user, repoName, run.GetID(), func() { queued++ })
 | |
| 			default:
 | |
| 				unknown++
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	necessaryReplicas := queued + inProgress
 | |
| 
 | |
| 	r.Log.V(1).Info(
 | |
| 		fmt.Sprintf("Suggested desired replicas of %d by TotalNumberOfQueuedAndInProgressWorkflowRuns", necessaryReplicas),
 | |
| 		"workflow_runs_completed", completed,
 | |
| 		"workflow_runs_in_progress", inProgress,
 | |
| 		"workflow_runs_queued", queued,
 | |
| 		"workflow_runs_unknown", unknown,
 | |
| 		"namespace", hra.Namespace,
 | |
| 		"kind", st.kind,
 | |
| 		"name", st.st,
 | |
| 		"horizontal_runner_autoscaler", hra.Name,
 | |
| 	)
 | |
| 
 | |
| 	return &necessaryReplicas, nil
 | |
| }
 | |
| 
 | |
| func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunnersBusy(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics v1alpha1.MetricSpec) (*int, error) {
 | |
| 	ctx := context.Background()
 | |
| 	scaleUpThreshold := defaultScaleUpThreshold
 | |
| 	scaleDownThreshold := defaultScaleDownThreshold
 | |
| 	scaleUpFactor := defaultScaleUpFactor
 | |
| 	scaleDownFactor := defaultScaleDownFactor
 | |
| 
 | |
| 	if metrics.ScaleUpThreshold != "" {
 | |
| 		sut, err := strconv.ParseFloat(metrics.ScaleUpThreshold, 64)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpThreshold cannot be parsed into a float64")
 | |
| 		}
 | |
| 		scaleUpThreshold = sut
 | |
| 	}
 | |
| 	if metrics.ScaleDownThreshold != "" {
 | |
| 		sdt, err := strconv.ParseFloat(metrics.ScaleDownThreshold, 64)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownThreshold cannot be parsed into a float64")
 | |
| 		}
 | |
| 
 | |
| 		scaleDownThreshold = sdt
 | |
| 	}
 | |
| 
 | |
| 	scaleUpAdjustment := metrics.ScaleUpAdjustment
 | |
| 	if scaleUpAdjustment != 0 {
 | |
| 		if metrics.ScaleUpAdjustment < 0 {
 | |
| 			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpAdjustment cannot be lower than 0")
 | |
| 		}
 | |
| 
 | |
| 		if metrics.ScaleUpFactor != "" {
 | |
| 			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleUpAdjustment and scaleUpFactor cannot be specified together")
 | |
| 		}
 | |
| 	} else if metrics.ScaleUpFactor != "" {
 | |
| 		suf, err := strconv.ParseFloat(metrics.ScaleUpFactor, 64)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpFactor cannot be parsed into a float64")
 | |
| 		}
 | |
| 		scaleUpFactor = suf
 | |
| 	}
 | |
| 
 | |
| 	scaleDownAdjustment := metrics.ScaleDownAdjustment
 | |
| 	if scaleDownAdjustment != 0 {
 | |
| 		if metrics.ScaleDownAdjustment < 0 {
 | |
| 			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownAdjustment cannot be lower than 0")
 | |
| 		}
 | |
| 
 | |
| 		if metrics.ScaleDownFactor != "" {
 | |
| 			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleDownAdjustment and scaleDownFactor cannot be specified together")
 | |
| 		}
 | |
| 	} else if metrics.ScaleDownFactor != "" {
 | |
| 		sdf, err := strconv.ParseFloat(metrics.ScaleDownFactor, 64)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownFactor cannot be parsed into a float64")
 | |
| 		}
 | |
| 		scaleDownFactor = sdf
 | |
| 	}
 | |
| 
 | |
| 	runnerMap, err := st.getRunnerMap()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		enterprise   = st.enterprise
 | |
| 		organization = st.org
 | |
| 		repository   = st.repo
 | |
| 	)
 | |
| 
 | |
| 	// ListRunners will return all runners managed by GitHub - not restricted to ns
 | |
| 	runners, err := r.GitHubClient.ListRunners(
 | |
| 		ctx,
 | |
| 		enterprise,
 | |
| 		organization,
 | |
| 		repository)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var desiredReplicasBefore int
 | |
| 
 | |
| 	if v := st.replicas; v == nil {
 | |
| 		desiredReplicasBefore = 1
 | |
| 	} else {
 | |
| 		desiredReplicasBefore = *v
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		numRunners           int
 | |
| 		numRunnersRegistered int
 | |
| 		numRunnersBusy       int
 | |
| 	)
 | |
| 
 | |
| 	numRunners = len(runnerMap)
 | |
| 
 | |
| 	for _, runner := range runners {
 | |
| 		if _, ok := runnerMap[*runner.Name]; ok {
 | |
| 			numRunnersRegistered++
 | |
| 
 | |
| 			if runner.GetBusy() {
 | |
| 				numRunnersBusy++
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var desiredReplicas int
 | |
| 	fractionBusy := float64(numRunnersBusy) / float64(desiredReplicasBefore)
 | |
| 	if fractionBusy >= scaleUpThreshold {
 | |
| 		if scaleUpAdjustment > 0 {
 | |
| 			desiredReplicas = desiredReplicasBefore + scaleUpAdjustment
 | |
| 		} else {
 | |
| 			desiredReplicas = int(math.Ceil(float64(desiredReplicasBefore) * scaleUpFactor))
 | |
| 		}
 | |
| 	} else if fractionBusy < scaleDownThreshold {
 | |
| 		if scaleDownAdjustment > 0 {
 | |
| 			desiredReplicas = desiredReplicasBefore - scaleDownAdjustment
 | |
| 		} else {
 | |
| 			desiredReplicas = int(float64(desiredReplicasBefore) * scaleDownFactor)
 | |
| 		}
 | |
| 	} else {
 | |
| 		desiredReplicas = *st.replicas
 | |
| 	}
 | |
| 
 | |
| 	// NOTES for operators:
 | |
| 	//
 | |
| 	// - num_runners can be as twice as large as replicas_desired_before while
 | |
| 	//   the runnerdeployment controller is replacing RunnerReplicaSet for runner update.
 | |
| 
 | |
| 	r.Log.V(1).Info(
 | |
| 		fmt.Sprintf("Suggested desired replicas of %d by PercentageRunnersBusy", desiredReplicas),
 | |
| 		"replicas_desired_before", desiredReplicasBefore,
 | |
| 		"replicas_desired", desiredReplicas,
 | |
| 		"num_runners", numRunners,
 | |
| 		"num_runners_registered", numRunnersRegistered,
 | |
| 		"num_runners_busy", numRunnersBusy,
 | |
| 		"namespace", hra.Namespace,
 | |
| 		"kind", st.kind,
 | |
| 		"name", st.st,
 | |
| 		"horizontal_runner_autoscaler", hra.Name,
 | |
| 		"enterprise", enterprise,
 | |
| 		"organization", organization,
 | |
| 		"repository", repository,
 | |
| 	)
 | |
| 
 | |
| 	return &desiredReplicas, nil
 | |
| }
 |