Prefer autoscaling based on jobs rather than workflows if available (#114)

Adds the ability to autoscale on jobs in addition to workflows. We fall back to using workflow metrics if job details are not present.

Resolves #89
This commit is contained in:
Dominic LoBue 2020-10-07 17:00:44 -07:00 committed by GitHub
parent 1bc6809c1b
commit a63860029a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 11 deletions

View File

@ -4,8 +4,9 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/summerwind/actions-runner-controller/api/v1alpha1"
"strings" "strings"
"github.com/summerwind/actions-runner-controller/api/v1alpha1"
) )
func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) { func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) {
@ -44,6 +45,38 @@ func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alp
} }
var total, inProgress, queued, completed, unknown int var total, inProgress, queued, completed, unknown int
type callback func()
listWorkflowJobs := func(user string, repoName string, runID int64, fallback_cb callback) {
if runID == 0 {
fallback_cb()
return
}
jobs, _, err := r.GitHubClient.Actions.ListWorkflowJobs(context.TODO(), user, repoName, runID, nil)
if err != nil {
r.Log.Error(err, "Error listing workflow jobs")
fallback_cb()
} else if len(jobs.Jobs) == 0 {
fallback_cb()
} else {
for _, job := range jobs.Jobs {
switch job.GetStatus() {
case "completed":
// We add a case for `completed` so it is not counted in `unknown`.
// And we do not increment the counter for completed because
// that counter only refers to workflows. The reason for
// this is because we do not get a list of jobs for
// completed workflows in order to keep the number of API
// calls to a minimum.
case "in_progress":
inProgress++
case "queued":
queued++
default:
unknown++
}
}
}
}
for _, repo := range repos { for _, repo := range repos {
user, repoName := repo[0], repo[1] user, repoName := repo[0], repo[1]
@ -52,20 +85,20 @@ func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alp
return nil, err return nil, err
} }
for _, r := range list.WorkflowRuns { for _, run := range list.WorkflowRuns {
total++ total++
// In May 2020, there are only 3 statuses. // In May 2020, there are only 3 statuses.
// Follow the below links for more details: // Follow the below links for more details:
// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs // - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
// - https://developer.github.com/v3/checks/runs/#create-a-check-run // - https://developer.github.com/v3/checks/runs/#create-a-check-run
switch r.GetStatus() { switch run.GetStatus() {
case "completed": case "completed":
completed++ completed++
case "in_progress": case "in_progress":
inProgress++ listWorkflowJobs(user, repoName, run.GetID(), func() { inProgress++ })
case "queued": case "queued":
queued++ listWorkflowJobs(user, repoName, run.GetID(), func() { queued++ })
default: default:
unknown++ unknown++
} }

View File

@ -2,16 +2,17 @@ package controllers
import ( import (
"fmt" "fmt"
"net/http/httptest"
"net/url"
"testing"
"github.com/summerwind/actions-runner-controller/api/v1alpha1" "github.com/summerwind/actions-runner-controller/api/v1alpha1"
"github.com/summerwind/actions-runner-controller/github" "github.com/summerwind/actions-runner-controller/github"
"github.com/summerwind/actions-runner-controller/github/fake" "github.com/summerwind/actions-runner-controller/github/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme" clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"net/http/httptest"
"net/url"
"sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/log/zap"
"testing"
) )
func newGithubClient(server *httptest.Server) *github.Client { func newGithubClient(server *httptest.Server) *github.Client {
@ -44,9 +45,11 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) {
sReplicas *int sReplicas *int
sTime *metav1.Time sTime *metav1.Time
workflowRuns string workflowRuns string
workflowJobs map[int]string
want int want int
err string err string
}{ }{
// Legacy functionality
// 3 demanded, max at 3 // 3 demanded, max at 3
{ {
repo: "test/valid", repo: "test/valid",
@ -122,6 +125,21 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) {
workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"in_progress"}, {"status":"in_progress"}, {"status":"in_progress"}, {"status":"completed"}]}"`, workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"in_progress"}, {"status":"in_progress"}, {"status":"in_progress"}, {"status":"completed"}]}"`,
want: 3, want: 3,
}, },
// Job-level autoscaling
// 5 requested from 3 workflows
{
repo: "test/valid",
min: intPtr(2),
max: intPtr(10),
workflowRuns: `{"total_count": 4, "workflow_runs":[{"id": 1, "status":"queued"}, {"id": 2, "status":"in_progress"}, {"id": 3, "status":"in_progress"}, {"status":"completed"}]}"`,
workflowJobs: map[int]string{
1: `{"jobs": [{"status":"queued"}, {"status":"queued"}]}`,
2: `{"jobs": [{"status": "in_progress"}, {"status":"completed"}]}`,
3: `{"jobs": [{"status": "in_progress"}, {"status":"queued"}]}`,
},
want: 5,
},
} }
for i := range testcases { for i := range testcases {
@ -136,7 +154,7 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) {
_ = v1alpha1.AddToScheme(scheme) _ = v1alpha1.AddToScheme(scheme)
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns)) server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns), fake.WithListWorkflowJobsResponse(200, tc.workflowJobs))
defer server.Close() defer server.Close()
client := newGithubClient(server) client := newGithubClient(server)
@ -211,6 +229,7 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) {
sReplicas *int sReplicas *int
sTime *metav1.Time sTime *metav1.Time
workflowRuns string workflowRuns string
workflowJobs map[int]string
want int want int
err string err string
}{ }{
@ -316,6 +335,22 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) {
workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`, workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`,
err: "validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment", err: "validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment",
}, },
// Job-level autoscaling
// 5 requested from 3 workflows
{
org: "test",
repos: []string{"valid"},
min: intPtr(2),
max: intPtr(10),
workflowRuns: `{"total_count": 4, "workflow_runs":[{"id": 1, "status":"queued"}, {"id": 2, "status":"in_progress"}, {"id": 3, "status":"in_progress"}, {"status":"completed"}]}"`,
workflowJobs: map[int]string{
1: `{"jobs": [{"status":"queued"}, {"status":"queued"}]}`,
2: `{"jobs": [{"status": "in_progress"}, {"status":"completed"}]}`,
3: `{"jobs": [{"status": "in_progress"}, {"status":"queued"}]}`,
},
want: 5,
},
} }
for i := range testcases { for i := range testcases {
@ -330,7 +365,7 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) {
_ = v1alpha1.AddToScheme(scheme) _ = v1alpha1.AddToScheme(scheme)
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns)) server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns), fake.WithListWorkflowJobsResponse(200, tc.workflowJobs))
defer server.Close() defer server.Close()
client := newGithubClient(server) client := newGithubClient(server)

View File

@ -4,7 +4,10 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strconv"
"strings"
"time" "time"
"unicode"
) )
const ( const (
@ -31,6 +34,24 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
fmt.Fprintf(w, h.Body) fmt.Fprintf(w, h.Body)
} }
type MapHandler struct {
Status int
Bodies map[int]string
}
func (h *MapHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Parse out int key from URL path
key, err := strconv.Atoi(strings.TrimFunc(req.URL.Path, func(r rune) bool { return !unicode.IsNumber(r) }))
if err != nil {
w.WriteHeader(400)
} else if body := h.Bodies[key]; len(body) == 0 {
w.WriteHeader(404)
} else {
w.WriteHeader(h.Status)
fmt.Fprintf(w, body)
}
}
type ServerConfig struct { type ServerConfig struct {
*FixedResponses *FixedResponses
} }
@ -45,7 +66,7 @@ func NewServer(opts ...Option) *httptest.Server {
o(&config) o(&config)
} }
routes := map[string]*Handler{ routes := map[string]http.Handler{
// For CreateRegistrationToken // For CreateRegistrationToken
"/repos/test/valid/actions/runners/registration-token": &Handler{ "/repos/test/valid/actions/runners/registration-token": &Handler{
Status: http.StatusCreated, Status: http.StatusCreated,
@ -126,6 +147,9 @@ func NewServer(opts ...Option) *httptest.Server {
// For auto-scaling based on the number of queued(pending) workflow runs // For auto-scaling based on the number of queued(pending) workflow runs
"/repos/test/valid/actions/runs": config.FixedResponses.ListRepositoryWorkflowRuns, "/repos/test/valid/actions/runs": config.FixedResponses.ListRepositoryWorkflowRuns,
// For auto-scaling based on the number of queued(pending) workflow jobs
"/repos/test/valid/actions/runs/": config.FixedResponses.ListWorkflowJobs,
} }
mux := http.NewServeMux() mux := http.NewServeMux()

View File

@ -2,6 +2,7 @@ package fake
type FixedResponses struct { type FixedResponses struct {
ListRepositoryWorkflowRuns *Handler ListRepositoryWorkflowRuns *Handler
ListWorkflowJobs *MapHandler
} }
type Option func(*ServerConfig) type Option func(*ServerConfig)
@ -15,6 +16,15 @@ func WithListRepositoryWorkflowRunsResponse(status int, body string) Option {
} }
} }
func WithListWorkflowJobsResponse(status int, bodies map[int]string) Option {
return func(c *ServerConfig) {
c.FixedResponses.ListWorkflowJobs = &MapHandler{
Status: status,
Bodies: bodies,
}
}
}
func WithFixedResponses(responses *FixedResponses) Option { func WithFixedResponses(responses *FixedResponses) Option {
return func(c *ServerConfig) { return func(c *ServerConfig) {
c.FixedResponses = responses c.FixedResponses = responses