feat: Organizational RunnerDeployment Autoscaling

Enhances #57 to add support for organizational runners.

As GitHub Actions does not have an appropriate API for this, this is the spec you need:

```
apiVersion: actions.summerwind.dev/v1alpha1
kind: RunnerDeployment
metadata:
  name: myrunners
spec:
  minReplicas: 1
  maxReplicas: 3
  autoscaling:
    metrics:
    - type: TotalNumberOfQueuedAndProgressingWorkflowRuns
      repositories:
      # Assumes that you have `github.com/myorg/myrepo1` repo
      - myrepo1
      - myrepo2
  template:
    spec:
      organization: myorg
```

It works by collecting "in_progress" and "queued" workflow runs for the repositories `myrepo1` and `myrepo2` to autoscale the number of replicas, assuming you have this organizational runner deployment only for those two repositories.

For example, if `myrepo1` had 1 `in_progress` and 2 `queued` workflow runs, and `myrepo2` had 4 `in_progress` and 8 `queued` workflow runs at the time of running the reconcilation loop on the runner deployment, it will scale replicas to 1 + 2 + 4 + 8 = 15.

Perhaps we might be better add a kind of "ratio" setting so that you can configure the controller to create e.g. 2x runners than demanded. But that's another story.

Ref #10
This commit is contained in:
Yusuke Kuoka 2020-07-03 09:05:46 +09:00
parent f1556ff060
commit eca6917c6a
6 changed files with 321 additions and 50 deletions

View File

@ -20,6 +20,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
AutoscalingMetricTypeTotalNumberOfQueuedAndProgressingWorkflowRuns = "TotalNumberOfQueuedAndProgressingWorkflowRuns"
)
// RunnerReplicaSetSpec defines the desired state of RunnerDeployment
type RunnerDeploymentSpec struct {
// +optional
@ -38,9 +42,28 @@ type RunnerDeploymentSpec struct {
// +optional
ScaleDownDelaySecondsAfterScaleUp *int `json:"scaleDownDelaySecondsAfterScaleOut,omitempty"`
// Autoscaling is set various configuration options for autoscaling this runner deployment.
// +optional
Autoscaling AutoscalingSpec `json:"autoscaling,omitempty"`
Template RunnerTemplate `json:"template"`
}
type AutoscalingSpec struct {
Metrics []MetricSpec `json:"metrics,omitempty"`
}
type MetricSpec struct {
// Type is the type of metric to be used for autoscaling.
// The only supported Type is TotalNumberOfQueuedAndProgressingWorkflowRuns
Type string `json:"type,omitempty"`
// RepositoryNames is the list of repository names to be used for calculating the metric.
// For example, a repository name is the REPO part of `github.com/USER/REPO`.
// +optional
RepositoryNames []string `json:"repositoryNames,omitempty"`
}
type RunnerDeploymentStatus struct {
AvailableReplicas int `json:"availableReplicas"`
ReadyReplicas int `json:"readyReplicas"`

View File

@ -25,6 +25,48 @@ import (
"k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AutoscalingSpec) DeepCopyInto(out *AutoscalingSpec) {
*out = *in
if in.Metrics != nil {
in, out := &in.Metrics, &out.Metrics
*out = make([]MetricSpec, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AutoscalingSpec.
func (in *AutoscalingSpec) DeepCopy() *AutoscalingSpec {
if in == nil {
return nil
}
out := new(AutoscalingSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MetricSpec) DeepCopyInto(out *MetricSpec) {
*out = *in
if in.RepositoryNames != nil {
in, out := &in.RepositoryNames, &out.RepositoryNames
*out = make([]string, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricSpec.
func (in *MetricSpec) DeepCopy() *MetricSpec {
if in == nil {
return nil
}
out := new(MetricSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Runner) DeepCopyInto(out *Runner) {
*out = *in
@ -134,6 +176,7 @@ func (in *RunnerDeploymentSpec) DeepCopyInto(out *RunnerDeploymentSpec) {
*out = new(int)
**out = **in
}
in.Autoscaling.DeepCopyInto(&out.Autoscaling)
in.Template.DeepCopyInto(&out.Template)
}

View File

@ -46,6 +46,27 @@ spec:
spec:
description: RunnerReplicaSetSpec defines the desired state of RunnerDeployment
properties:
autoscaling:
description: Autoscaling is set various configuration options for autoscaling
this runner deployment.
properties:
metrics:
items:
properties:
repositoryNames:
description: RepositoryNames is the list of repository names
to be used for calculating the metric. For example, a repository
name is the REPO part of `github.com/USER/REPO`.
items:
type: string
type: array
type:
description: Type is the type of metric to be used for autoscaling.
The only supported Type is TotalNumberOfQueuedAndProgressingWorkflowRuns
type: string
type: object
type: array
type: object
maxReplicas:
description: MinReplicas is the maximum number of replicas the deployment
is allowed to scale

View File

@ -2,20 +2,12 @@ package controllers
import (
"context"
"errors"
"fmt"
"github.com/summerwind/actions-runner-controller/api/v1alpha1"
"strings"
)
type NotSupported struct {
}
var _ error = NotSupported{}
func (e NotSupported) Error() string {
return "Autoscaling is currently supported only when spec.repository is set"
}
func (r *RunnerDeploymentReconciler) determineDesiredReplicas(rd v1alpha1.RunnerDeployment) (*int, error) {
if rd.Spec.Replicas != nil {
return nil, fmt.Errorf("bug: determineDesiredReplicas should not be called for deplomeny with specific replicas")
@ -25,38 +17,60 @@ func (r *RunnerDeploymentReconciler) determineDesiredReplicas(rd v1alpha1.Runner
return nil, fmt.Errorf("runnerdeployment %s/%s is missing maxReplicas", rd.Namespace, rd.Name)
}
var replicas int
var repos [][]string
repoID := rd.Spec.Template.Spec.Repository
if repoID == "" {
return nil, NotSupported{}
}
orgName := rd.Spec.Template.Spec.Organization
if orgName == "" {
return nil, fmt.Errorf("asserting runner deployment spec to detect bug: spec.template.organization should not be empty on this code path")
}
repo := strings.Split(repoID, "/")
user, repoName := repo[0], repo[1]
list, _, err := r.GitHubClient.Actions.ListRepositoryWorkflowRuns(context.TODO(), user, repoName, nil)
if err != nil {
return nil, err
metrics := rd.Spec.Autoscaling.Metrics
if len(metrics) == 0 {
return nil, fmt.Errorf("validating autoscaling metrics: one or more metrics is required")
} else if tpe := metrics[0].Type; tpe != v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndProgressingWorkflowRuns {
return nil, fmt.Errorf("validting autoscaling metrics: unsupported metric type %q: only supported value is %s", tpe, v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndProgressingWorkflowRuns)
} else if len(metrics[0].RepositoryNames) == 0 {
return nil, errors.New("validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment")
}
for _, repoName := range metrics[0].RepositoryNames {
repos = append(repos, []string{orgName, repoName})
}
} else {
repo := strings.Split(repoID, "/")
repos = append(repos, repo)
}
var total, inProgress, queued, completed, unknown int
for _, r := range list.WorkflowRuns {
total++
for _, repo := range repos {
user, repoName := repo[0], repo[1]
list, _, err := r.GitHubClient.Actions.ListRepositoryWorkflowRuns(context.TODO(), user, repoName, nil)
if err != nil {
return nil, err
}
// In May 2020, there are only 3 statuses.
// Follow the below links for more details:
// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
// - https://developer.github.com/v3/checks/runs/#create-a-check-run
switch r.GetStatus() {
case "completed":
completed++
case "in_progress":
inProgress++
case "queued":
queued++
default:
unknown++
for _, r := range list.WorkflowRuns {
total++
// In May 2020, there are only 3 statuses.
// Follow the below links for more details:
// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
// - https://developer.github.com/v3/checks/runs/#create-a-check-run
switch r.GetStatus() {
case "completed":
completed++
case "in_progress":
inProgress++
case "queued":
queued++
default:
unknown++
}
}
}
@ -75,7 +89,7 @@ func (r *RunnerDeploymentReconciler) determineDesiredReplicas(rd v1alpha1.Runner
}
rd.Status.Replicas = &desiredReplicas
replicas = desiredReplicas
replicas := desiredReplicas
r.Log.V(1).Info(
"Calculated desired replicas",

View File

@ -119,20 +119,6 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) {
fixed: intPtr(3),
want: 3,
},
// org runner, fixed at 3
{
org: "test",
fixed: intPtr(3),
want: 3,
},
// org runner, 1 demanded, min at 1
{
org: "test",
min: intPtr(1),
max: intPtr(3),
workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`,
err: "Autoscaling is currently supported only when spec.repository is set",
},
}
for i := range testcases {
@ -197,3 +183,189 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) {
})
}
}
func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) {
intPtr := func(v int) *int {
return &v
}
metav1Now := metav1.Now()
testcases := []struct {
repos []string
org string
fixed *int
max *int
min *int
sReplicas *int
sTime *metav1.Time
workflowRuns string
want int
err string
}{
// 3 demanded, max at 3
{
org: "test",
repos: []string{"valid"},
min: intPtr(2),
max: intPtr(3),
workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"queued"}, {"status":"in_progress"}, {"status":"in_progress"}, {"status":"completed"}]}"`,
want: 3,
},
// 2 demanded, max at 3, currently 3, delay scaling down due to grace period
{
org: "test",
repos: []string{"valid"},
min: intPtr(2),
max: intPtr(3),
sReplicas: intPtr(3),
sTime: &metav1Now,
workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"queued"}, {"status":"in_progress"}, {"status":"completed"}]}"`,
want: 3,
},
// 3 demanded, max at 2
{
org: "test",
repos: []string{"valid"},
min: intPtr(2),
max: intPtr(2),
workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"queued"}, {"status":"in_progress"}, {"status":"in_progress"}, {"status":"completed"}]}"`,
want: 2,
},
// 2 demanded, min at 2
{
org: "test",
repos: []string{"valid"},
min: intPtr(2),
max: intPtr(3),
workflowRuns: `{"total_count": 3, "workflow_runs":[{"status":"queued"}, {"status":"in_progress"}, {"status":"completed"}]}"`,
want: 2,
},
// 1 demanded, min at 2
{
org: "test",
repos: []string{"valid"},
min: intPtr(2),
max: intPtr(3),
workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"queued"}, {"status":"completed"}]}"`,
want: 2,
},
// 1 demanded, min at 2
{
org: "test",
repos: []string{"valid"},
min: intPtr(2),
max: intPtr(3),
workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`,
want: 2,
},
// 1 demanded, min at 1
{
org: "test",
repos: []string{"valid"},
min: intPtr(1),
max: intPtr(3),
workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"queued"}, {"status":"completed"}]}"`,
want: 1,
},
// 1 demanded, min at 1
{
org: "test",
repos: []string{"valid"},
min: intPtr(1),
max: intPtr(3),
workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`,
want: 1,
},
// fixed at 3
{
org: "test",
repos: []string{"valid"},
fixed: intPtr(3),
want: 3,
},
// org runner, fixed at 3
{
org: "test",
fixed: intPtr(3),
want: 3,
},
// org runner, 1 demanded, min at 1, no repos
{
org: "test",
min: intPtr(1),
max: intPtr(3),
workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`,
err: "validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment",
},
}
for i := range testcases {
tc := testcases[i]
log := zap.New(func(o *zap.Options) {
o.Development = true
})
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
_ = v1alpha1.AddToScheme(scheme)
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns))
defer server.Close()
client := newGithubClient(server)
r := &RunnerDeploymentReconciler{
Log: log,
GitHubClient: client,
Scheme: scheme,
}
rd := v1alpha1.RunnerDeployment{
TypeMeta: metav1.TypeMeta{},
Spec: v1alpha1.RunnerDeploymentSpec{
Template: v1alpha1.RunnerTemplate{
Spec: v1alpha1.RunnerSpec{
Organization: tc.org,
},
},
Autoscaling: v1alpha1.AutoscalingSpec{
Metrics: []v1alpha1.MetricSpec{
{
Type: v1alpha1.AutoscalingMetricTypeTotalNumberOfQueuedAndProgressingWorkflowRuns,
RepositoryNames: tc.repos,
},
},
},
Replicas: tc.fixed,
MaxReplicas: tc.max,
MinReplicas: tc.min,
},
Status: v1alpha1.RunnerDeploymentStatus{
Replicas: tc.sReplicas,
LastSuccessfulScaleOutTime: tc.sTime,
},
}
rs, err := r.newRunnerReplicaSetWithAutoscaling(rd)
if err != nil {
if tc.err == "" {
t.Fatalf("unexpected error: expected none, got %v", err)
} else if err.Error() != tc.err {
t.Fatalf("unexpected error: expected %v, got %v", tc.err, err)
}
return
}
got := rs.Spec.Replicas
if got == nil {
t.Fatalf("unexpected value of rs.Spec.Replicas: nil")
}
if *got != tc.want {
t.Errorf("%d: incorrect desired replicas: want %d, got %d", i, tc.want, *got)
}
})
}
}

View File

@ -99,9 +99,7 @@ func (r *RunnerDeploymentReconciler) Reconcile(req ctrl.Request) (ctrl.Result, e
desiredRS, err := r.newRunnerReplicaSetWithAutoscaling(rd)
if err != nil {
if _, ok := err.(NotSupported); ok {
r.Recorder.Event(&rd, corev1.EventTypeNormal, "RunnerReplicaSetAutoScaleNotSupported", err.Error())
}
r.Recorder.Event(&rd, corev1.EventTypeNormal, "RunnerAutoscalingFailure", err.Error())
log.Error(err, "Could not create runnerreplicaset")