diff --git a/controllers/autoscaling.go b/controllers/autoscaling.go index 661534df..95e9dfed 100644 --- a/controllers/autoscaling.go +++ b/controllers/autoscaling.go @@ -4,8 +4,9 @@ import ( "context" "errors" "fmt" - "github.com/summerwind/actions-runner-controller/api/v1alpha1" "strings" + + "github.com/summerwind/actions-runner-controller/api/v1alpha1" ) func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alpha1.RunnerDeployment, hra v1alpha1.HorizontalRunnerAutoscaler) (*int, error) { @@ -44,6 +45,38 @@ func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alp } var total, inProgress, queued, completed, unknown int + type callback func() + listWorkflowJobs := func(user string, repoName string, runID int64, fallback_cb callback) { + if runID == 0 { + fallback_cb() + return + } + jobs, _, err := r.GitHubClient.Actions.ListWorkflowJobs(context.TODO(), user, repoName, runID, nil) + if err != nil { + r.Log.Error(err, "Error listing workflow jobs") + fallback_cb() + } else if len(jobs.Jobs) == 0 { + fallback_cb() + } else { + for _, job := range jobs.Jobs { + switch job.GetStatus() { + case "completed": + // We add a case for `completed` so it is not counted in `unknown`. + // And we do not increment the counter for completed because + // that counter only refers to workflows. The reason for + // this is because we do not get a list of jobs for + // completed workflows in order to keep the number of API + // calls to a minimum. + case "in_progress": + inProgress++ + case "queued": + queued++ + default: + unknown++ + } + } + } + } for _, repo := range repos { user, repoName := repo[0], repo[1] @@ -52,20 +85,20 @@ func (r *HorizontalRunnerAutoscalerReconciler) determineDesiredReplicas(rd v1alp return nil, err } - for _, r := range list.WorkflowRuns { + for _, run := range list.WorkflowRuns { total++ // In May 2020, there are only 3 statuses. // Follow the below links for more details: // - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs // - https://developer.github.com/v3/checks/runs/#create-a-check-run - switch r.GetStatus() { + switch run.GetStatus() { case "completed": completed++ case "in_progress": - inProgress++ + listWorkflowJobs(user, repoName, run.GetID(), func() { inProgress++ }) case "queued": - queued++ + listWorkflowJobs(user, repoName, run.GetID(), func() { queued++ }) default: unknown++ } diff --git a/controllers/autoscaling_test.go b/controllers/autoscaling_test.go index 56c65b16..aac3ee0d 100644 --- a/controllers/autoscaling_test.go +++ b/controllers/autoscaling_test.go @@ -2,16 +2,17 @@ package controllers import ( "fmt" + "net/http/httptest" + "net/url" + "testing" + "github.com/summerwind/actions-runner-controller/api/v1alpha1" "github.com/summerwind/actions-runner-controller/github" "github.com/summerwind/actions-runner-controller/github/fake" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "net/http/httptest" - "net/url" "sigs.k8s.io/controller-runtime/pkg/log/zap" - "testing" ) func newGithubClient(server *httptest.Server) *github.Client { @@ -44,9 +45,11 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) { sReplicas *int sTime *metav1.Time workflowRuns string + workflowJobs map[int]string want int err string }{ + // Legacy functionality // 3 demanded, max at 3 { repo: "test/valid", @@ -122,6 +125,21 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) { workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"in_progress"}, {"status":"in_progress"}, {"status":"in_progress"}, {"status":"completed"}]}"`, want: 3, }, + + // Job-level autoscaling + // 5 requested from 3 workflows + { + repo: "test/valid", + min: intPtr(2), + max: intPtr(10), + workflowRuns: `{"total_count": 4, "workflow_runs":[{"id": 1, "status":"queued"}, {"id": 2, "status":"in_progress"}, {"id": 3, "status":"in_progress"}, {"status":"completed"}]}"`, + workflowJobs: map[int]string{ + 1: `{"jobs": [{"status":"queued"}, {"status":"queued"}]}`, + 2: `{"jobs": [{"status": "in_progress"}, {"status":"completed"}]}`, + 3: `{"jobs": [{"status": "in_progress"}, {"status":"queued"}]}`, + }, + want: 5, + }, } for i := range testcases { @@ -136,7 +154,7 @@ func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) { _ = v1alpha1.AddToScheme(scheme) t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { - server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns)) + server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns), fake.WithListWorkflowJobsResponse(200, tc.workflowJobs)) defer server.Close() client := newGithubClient(server) @@ -211,6 +229,7 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) { sReplicas *int sTime *metav1.Time workflowRuns string + workflowJobs map[int]string want int err string }{ @@ -316,6 +335,22 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) { workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`, err: "validating autoscaling metrics: spec.autoscaling.metrics[].repositoryNames is required and must have one more more entries for organizational runner deployment", }, + + // Job-level autoscaling + // 5 requested from 3 workflows + { + org: "test", + repos: []string{"valid"}, + min: intPtr(2), + max: intPtr(10), + workflowRuns: `{"total_count": 4, "workflow_runs":[{"id": 1, "status":"queued"}, {"id": 2, "status":"in_progress"}, {"id": 3, "status":"in_progress"}, {"status":"completed"}]}"`, + workflowJobs: map[int]string{ + 1: `{"jobs": [{"status":"queued"}, {"status":"queued"}]}`, + 2: `{"jobs": [{"status": "in_progress"}, {"status":"completed"}]}`, + 3: `{"jobs": [{"status": "in_progress"}, {"status":"queued"}]}`, + }, + want: 5, + }, } for i := range testcases { @@ -330,7 +365,7 @@ func TestDetermineDesiredReplicas_OrganizationalRunner(t *testing.T) { _ = v1alpha1.AddToScheme(scheme) t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { - server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns)) + server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns), fake.WithListWorkflowJobsResponse(200, tc.workflowJobs)) defer server.Close() client := newGithubClient(server) diff --git a/github/fake/fake.go b/github/fake/fake.go index 4d7cca05..49cb2305 100644 --- a/github/fake/fake.go +++ b/github/fake/fake.go @@ -4,7 +4,10 @@ import ( "fmt" "net/http" "net/http/httptest" + "strconv" + "strings" "time" + "unicode" ) const ( @@ -31,6 +34,24 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { fmt.Fprintf(w, h.Body) } +type MapHandler struct { + Status int + Bodies map[int]string +} + +func (h *MapHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + // Parse out int key from URL path + key, err := strconv.Atoi(strings.TrimFunc(req.URL.Path, func(r rune) bool { return !unicode.IsNumber(r) })) + if err != nil { + w.WriteHeader(400) + } else if body := h.Bodies[key]; len(body) == 0 { + w.WriteHeader(404) + } else { + w.WriteHeader(h.Status) + fmt.Fprintf(w, body) + } +} + type ServerConfig struct { *FixedResponses } @@ -45,7 +66,7 @@ func NewServer(opts ...Option) *httptest.Server { o(&config) } - routes := map[string]*Handler{ + routes := map[string]http.Handler{ // For CreateRegistrationToken "/repos/test/valid/actions/runners/registration-token": &Handler{ Status: http.StatusCreated, @@ -126,6 +147,9 @@ func NewServer(opts ...Option) *httptest.Server { // For auto-scaling based on the number of queued(pending) workflow runs "/repos/test/valid/actions/runs": config.FixedResponses.ListRepositoryWorkflowRuns, + + // For auto-scaling based on the number of queued(pending) workflow jobs + "/repos/test/valid/actions/runs/": config.FixedResponses.ListWorkflowJobs, } mux := http.NewServeMux() diff --git a/github/fake/options.go b/github/fake/options.go index b50b04e9..2b78c888 100644 --- a/github/fake/options.go +++ b/github/fake/options.go @@ -2,6 +2,7 @@ package fake type FixedResponses struct { ListRepositoryWorkflowRuns *Handler + ListWorkflowJobs *MapHandler } type Option func(*ServerConfig) @@ -15,6 +16,15 @@ func WithListRepositoryWorkflowRunsResponse(status int, body string) Option { } } +func WithListWorkflowJobsResponse(status int, bodies map[int]string) Option { + return func(c *ServerConfig) { + c.FixedResponses.ListWorkflowJobs = &MapHandler{ + Status: status, + Bodies: bodies, + } + } +} + func WithFixedResponses(responses *FixedResponses) Option { return func(c *ServerConfig) { c.FixedResponses = responses