package controllers import ( "context" "errors" "fmt" "math" "strconv" "strings" "github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1" "github.com/google/go-github/v39/github" ) const ( defaultScaleUpThreshold = 0.8 defaultScaleDownThreshold = 0.3 defaultScaleUpFactor = 1.3 defaultScaleDownFactor = 0.7 ) func (r *HorizontalRunnerAutoscalerReconciler) suggestDesiredReplicas(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) { if hra.Spec.MinReplicas == nil { return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing minReplicas", hra.Namespace, hra.Name) } else if hra.Spec.MaxReplicas == nil { return nil, fmt.Errorf("horizontalrunnerautoscaler %s/%s is missing maxReplicas", hra.Namespace, hra.Name) } metrics := hra.Spec.Metrics numMetrics := len(metrics) if numMetrics == 0 { // We don't default to anything since ARC 0.23.0 // See https://github.com/actions-runner-controller/actions-runner-controller/issues/728 return nil, nil } else if numMetrics > 2 { return nil, fmt.Errorf("too many autoscaling metrics configured: It must be 0 to 2, but got %d", numMetrics) } primaryMetric := metrics[0] primaryMetricType := primaryMetric.Type var ( suggested *int err error ) switch primaryMetricType { case v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns: suggested, err = r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &primaryMetric) case v1alpha1.AutoscalingMetricTypePercentageRunnersBusy: suggested, err = r.suggestReplicasByPercentageRunnersBusy(st, hra, primaryMetric) default: return nil, fmt.Errorf("validating autoscaling metrics: unsupported metric type %q", primaryMetric) } if err != nil { return nil, err } if suggested != nil && *suggested > 0 { return suggested, nil } if len(metrics) == 1 { // This is never supposed to happen but anyway- // Fall-back to `minReplicas + capacityReservedThroughWebhook`. return nil, nil } // At this point, we are sure that there are exactly 2 Metrics entries. fallbackMetric := metrics[1] fallbackMetricType := fallbackMetric.Type if primaryMetricType != v1alpha1.AutoscalingMetricTypePercentageRunnersBusy || fallbackMetricType != v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndInProgressWorkflowRuns { return nil, fmt.Errorf( "invalid HRA Spec: Metrics[0] of %s cannot be combined with Metrics[1] of %s: The only allowed combination is 0=PercentageRunnersBusy and 1=TotalNumberOfQueuedAndInProgressWorkflowRuns", primaryMetricType, fallbackMetricType, ) } return r.suggestReplicasByQueuedAndInProgressWorkflowRuns(st, hra, &fallbackMetric) } func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByQueuedAndInProgressWorkflowRuns(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics *v1alpha1.MetricSpec) (*int, error) { var repos [][]string repoID := st.repo if repoID == "" { orgName := st.org if orgName == "" { return nil, fmt.Errorf("asserting runner deployment spec to detect bug: spec.template.organization should not be empty on this code path") } // In case it's an organizational runners deployment without any scaling metrics defined, // we assume that the desired replicas should always be `minReplicas + capacityReservedThroughWebhook`. // See https://github.com/actions-runner-controller/actions-runner-controller/issues/377#issuecomment-793372693 if metrics == nil { return nil, nil } if len(metrics.RepositoryNames) == 0 { return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment") } for _, repoName := range metrics.RepositoryNames { repos = append(repos, []string{orgName, repoName}) } } else { repo := strings.Split(repoID, "/") repos = append(repos, repo) } var total, inProgress, queued, completed, unknown int type callback func() listWorkflowJobs := func(user string, repoName string, runID int64, fallback_cb callback) { if runID == 0 { fallback_cb() return } opt := github.ListWorkflowJobsOptions{ListOptions: github.ListOptions{PerPage: 50}} var allJobs []*github.WorkflowJob for { jobs, resp, err := r.GitHubClient.Actions.ListWorkflowJobs(context.TODO(), user, repoName, runID, &opt) if err != nil { r.Log.Error(err, "Error listing workflow jobs") return //err } allJobs = append(allJobs, jobs.Jobs...) if resp.NextPage == 0 { break } opt.Page = resp.NextPage } if len(allJobs) == 0 { fallback_cb() } else { JOB: for _, job := range allJobs { runnerLabels := make(map[string]struct{}, len(st.labels)) for _, l := range st.labels { runnerLabels[l] = struct{}{} } if len(job.Labels) == 0 { // This shouldn't usually happen r.Log.Info("Detected job with no labels, which is not supported by ARC. Skipping anyway.", "labels", job.Labels, "run_id", job.GetRunID(), "job_id", job.GetID()) continue JOB } for _, l := range job.Labels { if l == "self-hosted" { continue } if _, ok := runnerLabels[l]; !ok { continue JOB } } switch job.GetStatus() { case "completed": // We add a case for `completed` so it is not counted in `unknown`. // And we do not increment the counter for completed because // that counter only refers to workflows. The reason for // this is because we do not get a list of jobs for // completed workflows in order to keep the number of API // calls to a minimum. case "in_progress": inProgress++ case "queued": queued++ default: unknown++ } } } } for _, repo := range repos { user, repoName := repo[0], repo[1] workflowRuns, err := r.GitHubClient.ListRepositoryWorkflowRuns(context.TODO(), user, repoName) if err != nil { return nil, err } for _, run := range workflowRuns { total++ // In May 2020, there are only 3 statuses. // Follow the below links for more details: // - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs // - https://developer.github.com/v3/checks/runs/#create-a-check-run switch run.GetStatus() { case "completed": completed++ case "in_progress": listWorkflowJobs(user, repoName, run.GetID(), func() { inProgress++ }) case "queued": listWorkflowJobs(user, repoName, run.GetID(), func() { queued++ }) default: unknown++ } } } necessaryReplicas := queued + inProgress r.Log.V(1).Info( fmt.Sprintf("Suggested desired replicas of %d by TotalNumberOfQueuedAndInProgressWorkflowRuns", necessaryReplicas), "workflow_runs_completed", completed, "workflow_runs_in_progress", inProgress, "workflow_runs_queued", queued, "workflow_runs_unknown", unknown, "namespace", hra.Namespace, "kind", st.kind, "name", st.st, "horizontal_runner_autoscaler", hra.Name, ) return &necessaryReplicas, nil } func (r *HorizontalRunnerAutoscalerReconciler) suggestReplicasByPercentageRunnersBusy(st scaleTarget, hra v1alpha1.HorizontalRunnerAutoscaler, metrics v1alpha1.MetricSpec) (*int, error) { ctx := context.Background() scaleUpThreshold := defaultScaleUpThreshold scaleDownThreshold := defaultScaleDownThreshold scaleUpFactor := defaultScaleUpFactor scaleDownFactor := defaultScaleDownFactor if metrics.ScaleUpThreshold != "" { sut, err := strconv.ParseFloat(metrics.ScaleUpThreshold, 64) if err != nil { return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpThreshold cannot be parsed into a float64") } scaleUpThreshold = sut } if metrics.ScaleDownThreshold != "" { sdt, err := strconv.ParseFloat(metrics.ScaleDownThreshold, 64) if err != nil { return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownThreshold cannot be parsed into a float64") } scaleDownThreshold = sdt } scaleUpAdjustment := metrics.ScaleUpAdjustment if scaleUpAdjustment != 0 { if metrics.ScaleUpAdjustment < 0 { return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpAdjustment cannot be lower than 0") } if metrics.ScaleUpFactor != "" { return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleUpAdjustment and scaleUpFactor cannot be specified together") } } else if metrics.ScaleUpFactor != "" { suf, err := strconv.ParseFloat(metrics.ScaleUpFactor, 64) if err != nil { return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleUpFactor cannot be parsed into a float64") } scaleUpFactor = suf } scaleDownAdjustment := metrics.ScaleDownAdjustment if scaleDownAdjustment != 0 { if metrics.ScaleDownAdjustment < 0 { return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownAdjustment cannot be lower than 0") } if metrics.ScaleDownFactor != "" { return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[]: scaleDownAdjustment and scaleDownFactor cannot be specified together") } } else if metrics.ScaleDownFactor != "" { sdf, err := strconv.ParseFloat(metrics.ScaleDownFactor, 64) if err != nil { return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].scaleDownFactor cannot be parsed into a float64") } scaleDownFactor = sdf } runnerMap, err := st.getRunnerMap() if err != nil { return nil, err } var ( enterprise = st.enterprise organization = st.org repository = st.repo ) // ListRunners will return all runners managed by GitHub - not restricted to ns runners, err := r.GitHubClient.ListRunners( ctx, enterprise, organization, repository) if err != nil { return nil, err } var desiredReplicasBefore int if v := st.replicas; v == nil { desiredReplicasBefore = 1 } else { desiredReplicasBefore = *v } var ( numRunners int numRunnersRegistered int numRunnersBusy int ) numRunners = len(runnerMap) for _, runner := range runners { if _, ok := runnerMap[*runner.Name]; ok { numRunnersRegistered++ if runner.GetBusy() { numRunnersBusy++ } } } var desiredReplicas int fractionBusy := float64(numRunnersBusy) / float64(desiredReplicasBefore) if fractionBusy >= scaleUpThreshold { if scaleUpAdjustment > 0 { desiredReplicas = desiredReplicasBefore + scaleUpAdjustment } else { desiredReplicas = int(math.Ceil(float64(desiredReplicasBefore) * scaleUpFactor)) } } else if fractionBusy < scaleDownThreshold { if scaleDownAdjustment > 0 { desiredReplicas = desiredReplicasBefore - scaleDownAdjustment } else { desiredReplicas = int(float64(desiredReplicasBefore) * scaleDownFactor) } } else { desiredReplicas = *st.replicas } // NOTES for operators: // // - num_runners can be as twice as large as replicas_desired_before while // the runnerdeployment controller is replacing RunnerReplicaSet for runner update. r.Log.V(1).Info( fmt.Sprintf("Suggested desired replicas of %d by PercentageRunnersBusy", desiredReplicas), "replicas_desired_before", desiredReplicasBefore, "replicas_desired", desiredReplicas, "num_runners", numRunners, "num_runners_registered", numRunnersRegistered, "num_runners_busy", numRunnersBusy, "namespace", hra.Namespace, "kind", st.kind, "name", st.st, "horizontal_runner_autoscaler", hra.Name, "enterprise", enterprise, "organization", organization, "repository", repository, ) return &desiredReplicas, nil }