338 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			338 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
package controllers
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"math"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/summerwind/actions-runner-controller/api/v1alpha1"
 | 
						|
	"sigs.k8s.io/controller-runtime/pkg/client"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	defaultScaleUpThreshold   = 0.8
 | 
						|
	defaultScaleDownThreshold = 0.3
 | 
						|
	defaultScaleUpFactor      = 1.3
 | 
						|
	defaultScaleDownFactor    = 0.7
 | 
						|
)
 | 
						|
 | 
						|
func getValueAvailableAt(now time.Time, from, to *time.Time, reservedValue int) *int {
 | 
						|
	if to != nil && now.After(*to) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if from != nil && now.Before(*from) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	return &reservedValue
 | 
						|
}
 | 
						|
 | 
						|
func (r *HorizontalRunnerAutoscalerReconciler) getDesiredReplicasFromCache(hra v1alpha1.HorizontalRunnerAutoscaler) *int {
 | 
						|
	var entry *v1alpha1.CacheEntry
 | 
						|
 | 
						|
	for i := range hra.Status.CacheEntries {
 | 
						|
		ent := hra.Status.CacheEntries[i]
 | 
						|
 | 
						|
		if ent.Key != v1alpha1.CacheEntryKeyDesiredReplicas {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if !time.Now().Before(ent.ExpirationTime.Time) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		entry = &ent
 | 
						|
 | 
						|
		break
 | 
						|
	}
 | 
						|
 | 
						|
	if entry != nil {
 | 
						|
		v := getValueAvailableAt(time.Now(), nil, &entry.ExpirationTime.Time, entry.Value)
 | 
						|
		if v != nil {
 | 
						|
			return v
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
 | 
						|
	if hra.Spec.MinReplicas == nil {
 | 
						|
		return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing minReplicas", hra.Namespace, hra.Name)
 | 
						|
	} else if hra.Spec.MaxReplicas == nil {
 | 
						|
		return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing maxReplicas", hra.Namespace, hra.Name)
 | 
						|
	}
 | 
						|
 | 
						|
	metrics := hra.Spec.Metrics
 | 
						|
	if len(metrics) == 0 || metrics[0].Type == v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns {
 | 
						|
		return r.calculateReplicasByQueuedAndInProgressWorkflowRuns(rd, hra)
 | 
						|
	} else if metrics[0].Type == v1alpha1.AutoscalingMetricTypePercentageRunnersBusy {
 | 
						|
		return r.calculateReplicasByPercentageRunnersBusy(rd, hra)
 | 
						|
	} else {
 | 
						|
		return nil, fmt.Errorf("validting autoscaling metrics: unsupported metric type %q", metrics[0].Type)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *HorizontalRunnerAutoscalerReconciler) calculateReplicasByQueuedAndInProgressWorkflowRuns(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
 | 
						|
 | 
						|
	var repos [][]string
 | 
						|
	metrics := hra.Spec.Metrics
 | 
						|
	repoID := rd.Spec.Template.Spec.Repository
 | 
						|
	if repoID == "" {
 | 
						|
		orgName := rd.Spec.Template.Spec.Organization
 | 
						|
		if orgName == "" {
 | 
						|
			return nil, fmt.Errorf("asserting runner deployment spec to detect bug: spec.template.organization should not be empty on this code path")
 | 
						|
		}
 | 
						|
 | 
						|
		if len(metrics[0].RepositoryNames) == 0 {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment")
 | 
						|
		}
 | 
						|
 | 
						|
		for _, repoName := range metrics[0].RepositoryNames {
 | 
						|
			repos = append(repos, []string{orgName, repoName})
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		repo := strings.Split(repoID, "/")
 | 
						|
 | 
						|
		repos = append(repos, repo)
 | 
						|
	}
 | 
						|
 | 
						|
	var total, inProgress, queued, completed, unknown int
 | 
						|
	type callback func()
 | 
						|
	listWorkflowJobs := func(user string, repoName string, runID int64, fallback_cb callback) {
 | 
						|
		if runID == 0 {
 | 
						|
			fallback_cb()
 | 
						|
			return
 | 
						|
		}
 | 
						|
		jobs, _, err := r.GitHubClient.Actions.ListWorkflowJobs(context.TODO(), user, repoName, runID, nil)
 | 
						|
		if err != nil {
 | 
						|
			r.Log.Error(err, "Error listing workflow jobs")
 | 
						|
			fallback_cb()
 | 
						|
		} else if len(jobs.Jobs) == 0 {
 | 
						|
			fallback_cb()
 | 
						|
		} else {
 | 
						|
			for _, job := range jobs.Jobs {
 | 
						|
				switch job.GetStatus() {
 | 
						|
				case "completed":
 | 
						|
					// We add a case for `completed` so it is not counted in `unknown`.
 | 
						|
					// And we do not increment the counter for completed because
 | 
						|
					// that counter only refers to workflows. The reason for
 | 
						|
					// this is because we do not get a list of jobs for
 | 
						|
					// completed workflows in order to keep the number of API
 | 
						|
					// calls to a minimum.
 | 
						|
				case "in_progress":
 | 
						|
					inProgress++
 | 
						|
				case "queued":
 | 
						|
					queued++
 | 
						|
				default:
 | 
						|
					unknown++
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	for _, repo := range repos {
 | 
						|
		user, repoName := repo[0], repo[1]
 | 
						|
		workflowRuns, err := r.GitHubClient.ListRepositoryWorkflowRuns(context.TODO(), user, repoName)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		for _, run := range workflowRuns {
 | 
						|
			total++
 | 
						|
 | 
						|
			// In May 2020, there are only 3 statuses.
 | 
						|
			// Follow the below links for more details:
 | 
						|
			// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
 | 
						|
			// - https://developer.github.com/v3/checks/runs/#create-a-check-run
 | 
						|
			switch run.GetStatus() {
 | 
						|
			case "completed":
 | 
						|
				completed++
 | 
						|
			case "in_progress":
 | 
						|
				listWorkflowJobs(user, repoName, run.GetID(), func() { inProgress++ })
 | 
						|
			case "queued":
 | 
						|
				listWorkflowJobs(user, repoName, run.GetID(), func() { queued++ })
 | 
						|
			default:
 | 
						|
				unknown++
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	minReplicas := *hra.Spec.MinReplicas
 | 
						|
	maxReplicas := *hra.Spec.MaxReplicas
 | 
						|
	necessaryReplicas := queued + inProgress
 | 
						|
 | 
						|
	var desiredReplicas int
 | 
						|
 | 
						|
	if necessaryReplicas < minReplicas {
 | 
						|
		desiredReplicas = minReplicas
 | 
						|
	} else if necessaryReplicas > maxReplicas {
 | 
						|
		desiredReplicas = maxReplicas
 | 
						|
	} else {
 | 
						|
		desiredReplicas = necessaryReplicas
 | 
						|
	}
 | 
						|
 | 
						|
	rd.Status.Replicas = &desiredReplicas
 | 
						|
	replicas := desiredReplicas
 | 
						|
 | 
						|
	r.Log.V(1).Info(
 | 
						|
		"Calculated desired replicas",
 | 
						|
		"computed_replicas_desired", desiredReplicas,
 | 
						|
		"spec_replicas_min", minReplicas,
 | 
						|
		"spec_replicas_max", maxReplicas,
 | 
						|
		"workflow_runs_completed", completed,
 | 
						|
		"workflow_runs_in_progress", inProgress,
 | 
						|
		"workflow_runs_queued", queued,
 | 
						|
		"workflow_runs_unknown", unknown,
 | 
						|
		"namespace", hra.Namespace,
 | 
						|
		"runner_deployment", rd.Name,
 | 
						|
		"horizontal_runner_autoscaler", hra.Name,
 | 
						|
	)
 | 
						|
 | 
						|
	return &replicas, nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *HorizontalRunnerAutoscalerReconciler) calculateReplicasByPercentageRunnersBusy(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
 | 
						|
	ctx := context.Background()
 | 
						|
	minReplicas := *hra.Spec.MinReplicas
 | 
						|
	maxReplicas := *hra.Spec.MaxReplicas
 | 
						|
	metrics := hra.Spec.Metrics[0]
 | 
						|
	scaleUpThreshold := defaultScaleUpThreshold
 | 
						|
	scaleDownThreshold := defaultScaleDownThreshold
 | 
						|
	scaleUpFactor := defaultScaleUpFactor
 | 
						|
	scaleDownFactor := defaultScaleDownFactor
 | 
						|
 | 
						|
	if metrics.ScaleUpThreshold != "" {
 | 
						|
		sut, err := strconv.ParseFloat(metrics.ScaleUpThreshold, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpThreshold cannot be parsed into a float64")
 | 
						|
		}
 | 
						|
		scaleUpThreshold = sut
 | 
						|
	}
 | 
						|
	if metrics.ScaleDownThreshold != "" {
 | 
						|
		sdt, err := strconv.ParseFloat(metrics.ScaleDownThreshold, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownThreshold cannot be parsed into a float64")
 | 
						|
		}
 | 
						|
 | 
						|
		scaleDownThreshold = sdt
 | 
						|
	}
 | 
						|
 | 
						|
	scaleUpAdjustment := metrics.ScaleUpAdjustment
 | 
						|
	if scaleUpAdjustment != 0 {
 | 
						|
		if metrics.ScaleUpAdjustment < 0 {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpAdjustment cannot be lower than 0")
 | 
						|
		}
 | 
						|
 | 
						|
		if metrics.ScaleUpFactor != "" {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleUpAdjustment and scaleUpFactor cannot be specified together")
 | 
						|
		}
 | 
						|
	} else if metrics.ScaleUpFactor != "" {
 | 
						|
		suf, err := strconv.ParseFloat(metrics.ScaleUpFactor, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpFactor cannot be parsed into a float64")
 | 
						|
		}
 | 
						|
		scaleUpFactor = suf
 | 
						|
	}
 | 
						|
 | 
						|
	scaleDownAdjustment := metrics.ScaleDownAdjustment
 | 
						|
	if scaleDownAdjustment != 0 {
 | 
						|
		if metrics.ScaleDownAdjustment < 0 {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownAdjustment cannot be lower than 0")
 | 
						|
		}
 | 
						|
 | 
						|
		if metrics.ScaleDownFactor != "" {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleDownAdjustment and scaleDownFactor cannot be specified together")
 | 
						|
		}
 | 
						|
	} else if metrics.ScaleDownFactor != "" {
 | 
						|
		sdf, err := strconv.ParseFloat(metrics.ScaleDownFactor, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownFactor cannot be parsed into a float64")
 | 
						|
		}
 | 
						|
		scaleDownFactor = sdf
 | 
						|
	}
 | 
						|
 | 
						|
	// return the list of runners in namespace. Horizontal Runner Autoscaler should only be responsible for scaling resources in its own ns.
 | 
						|
	var runnerList v1alpha1.RunnerList
 | 
						|
	if err := r.List(ctx, &runnerList, client.InNamespace(rd.Namespace)); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	runnerMap := make(map[string]struct{})
 | 
						|
	for _, items := range runnerList.Items {
 | 
						|
		runnerMap[items.Name] = struct{}{}
 | 
						|
	}
 | 
						|
 | 
						|
	var (
 | 
						|
		enterprise   = rd.Spec.Template.Spec.Enterprise
 | 
						|
		organization = rd.Spec.Template.Spec.Organization
 | 
						|
		repository   = rd.Spec.Template.Spec.Repository
 | 
						|
	)
 | 
						|
 | 
						|
	// ListRunners will return all runners managed by GitHub - not restricted to ns
 | 
						|
	runners, err := r.GitHubClient.ListRunners(
 | 
						|
		ctx,
 | 
						|
		enterprise,
 | 
						|
		organization,
 | 
						|
		repository)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	numRunners := len(runnerList.Items)
 | 
						|
	numRunnersBusy := 0
 | 
						|
	for _, runner := range runners {
 | 
						|
		if _, ok := runnerMap[*runner.Name]; ok && runner.GetBusy() {
 | 
						|
			numRunnersBusy++
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	var desiredReplicas int
 | 
						|
	fractionBusy := float64(numRunnersBusy) / float64(numRunners)
 | 
						|
	if fractionBusy >= scaleUpThreshold {
 | 
						|
		if scaleUpAdjustment > 0 {
 | 
						|
			desiredReplicas = numRunners + scaleUpAdjustment
 | 
						|
		} else {
 | 
						|
			desiredReplicas = int(math.Ceil(float64(numRunners) * scaleUpFactor))
 | 
						|
		}
 | 
						|
	} else if fractionBusy < scaleDownThreshold {
 | 
						|
		if scaleDownAdjustment > 0 {
 | 
						|
			desiredReplicas = numRunners - scaleDownAdjustment
 | 
						|
		} else {
 | 
						|
			desiredReplicas = int(float64(numRunners) * scaleDownFactor)
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		desiredReplicas = *rd.Spec.Replicas
 | 
						|
	}
 | 
						|
 | 
						|
	if desiredReplicas < minReplicas {
 | 
						|
		desiredReplicas = minReplicas
 | 
						|
	} else if desiredReplicas > maxReplicas {
 | 
						|
		desiredReplicas = maxReplicas
 | 
						|
	}
 | 
						|
 | 
						|
	r.Log.V(1).Info(
 | 
						|
		"Calculated desired replicas",
 | 
						|
		"computed_replicas_desired", desiredReplicas,
 | 
						|
		"spec_replicas_min", minReplicas,
 | 
						|
		"spec_replicas_max", maxReplicas,
 | 
						|
		"current_replicas", rd.Spec.Replicas,
 | 
						|
		"num_runners", numRunners,
 | 
						|
		"num_runners_busy", numRunnersBusy,
 | 
						|
		"namespace", hra.Namespace,
 | 
						|
		"runner_deployment", rd.Name,
 | 
						|
		"horizontal_runner_autoscaler", hra.Name,
 | 
						|
		"enterprise", enterprise,
 | 
						|
		"organization", organization,
 | 
						|
		"repository", repository,
 | 
						|
	)
 | 
						|
 | 
						|
	rd.Status.Replicas = &desiredReplicas
 | 
						|
	replicas := desiredReplicas
 | 
						|
 | 
						|
	return &replicas, nil
 | 
						|
}
 |