feat: Repository-wide RunnerDeployment Autoscaling (#57)
* feat: Repository-wide RunnerDeployment Autoscaling This adds `maxReplicas` and `minReplicas` to the RunnerDeploymentSpec. If and only if both fields are set, the controller computes and sets desired `replicas` automatically depending on the demand. The number of demanded runner replicas is computed by `queued workflow runs + in_progress workflow runs` for the repository. The support for organizational runners is not included. Ref https://github.com/summerwind/actions-runner-controller/issues/10
This commit is contained in:
		
							parent
							
								
									512cae68a1
								
							
						
					
					
						commit
						5bb2694349
					
				
							
								
								
									
										38
									
								
								README.md
								
								
								
								
							
							
						
						
									
										38
									
								
								README.md
								
								
								
								
							|  | @ -189,6 +189,44 @@ example-runnerdeploy2475h595fr   mumoshu/actions-runner-controller-ci   Running | ||||||
| example-runnerdeploy2475ht2qbr   mumoshu/actions-runner-controller-ci   Running | example-runnerdeploy2475ht2qbr   mumoshu/actions-runner-controller-ci   Running | ||||||
| ``` | ``` | ||||||
| 
 | 
 | ||||||
|  | #### Autoscaling | ||||||
|  | 
 | ||||||
|  | `RunnerDeployment` can scale number of runners between `minReplicas` and `maxReplicas` fields, depending on pending workflow runs. | ||||||
|  | 
 | ||||||
|  | In the below example, `actions-runner` checks for pending workflow runs for each sync period, and scale to e.g. 3 if there're 3 pending jobs at sync time. | ||||||
|  | 
 | ||||||
|  | ``` | ||||||
|  | apiVersion: actions.summerwind.dev/v1alpha1 | ||||||
|  | kind: RunnerDeployment | ||||||
|  | metadata: | ||||||
|  |   name: summerwind-actions-runner-controller | ||||||
|  | spec: | ||||||
|  |   minReplicas: 1 | ||||||
|  |   maxReplicas: 3 | ||||||
|  |   template: | ||||||
|  |     spec: | ||||||
|  |       repository: summerwind/actions-runner-controller | ||||||
|  | ``` | ||||||
|  | 
 | ||||||
|  | Please also note that the sync period is set to 10 minutes by default and it's configurable via `--sync-period` flag. | ||||||
|  | 
 | ||||||
|  | Additionally, the autoscaling feature has an anti-flapping option that prevents periodic loop of scaling up and down. | ||||||
|  | By default, it doesn't scale down until the grace period of 10 minutes passes after a scale up. The grace period can be configured by setting `scaleDownDelaySecondsAfterScaleUp`: | ||||||
|  | 
 | ||||||
|  | ``` | ||||||
|  | apiVersion: actions.summerwind.dev/v1alpha1 | ||||||
|  | kind: RunnerDeployment | ||||||
|  | metadata: | ||||||
|  |   name: summerwind-actions-runner-controller | ||||||
|  | spec: | ||||||
|  |   minReplicas: 1 | ||||||
|  |   maxReplicas: 3 | ||||||
|  |   scaleDownDelaySecondsAfterScaleUp: 1m | ||||||
|  |   template: | ||||||
|  |     spec: | ||||||
|  |       repository: summerwind/actions-runner-controller | ||||||
|  | ``` | ||||||
|  | 
 | ||||||
| ## Additional tweaks | ## Additional tweaks | ||||||
| 
 | 
 | ||||||
| You can pass details through the spec selector. Here's an eg. of what you may like to do: | You can pass details through the spec selector. Here's an eg. of what you may like to do: | ||||||
|  |  | ||||||
|  | @ -22,7 +22,21 @@ import ( | ||||||
| 
 | 
 | ||||||
| // RunnerReplicaSetSpec defines the desired state of RunnerDeployment
 | // RunnerReplicaSetSpec defines the desired state of RunnerDeployment
 | ||||||
| type RunnerDeploymentSpec struct { | type RunnerDeploymentSpec struct { | ||||||
| 	Replicas *int `json:"replicas"` | 	// +optional
 | ||||||
|  | 	Replicas *int `json:"replicas,omitempty"` | ||||||
|  | 
 | ||||||
|  | 	// MinReplicas is the minimum number of replicas the deployment is allowed to scale
 | ||||||
|  | 	// +optional
 | ||||||
|  | 	MinReplicas *int `json:"minReplicas,omitempty"` | ||||||
|  | 
 | ||||||
|  | 	// MinReplicas is the maximum number of replicas the deployment is allowed to scale
 | ||||||
|  | 	// +optional
 | ||||||
|  | 	MaxReplicas *int `json:"maxReplicas,omitempty"` | ||||||
|  | 
 | ||||||
|  | 	// ScaleDownDelaySecondsAfterScaleUp is the approximate delay for a scale down followed by a scale up
 | ||||||
|  | 	// Used to prevent flapping (down->up->down->... loop)
 | ||||||
|  | 	// +optional
 | ||||||
|  | 	ScaleDownDelaySecondsAfterScaleUp *int `json:"scaleDownDelaySecondsAfterScaleOut,omitempty"` | ||||||
| 
 | 
 | ||||||
| 	Template RunnerTemplate `json:"template"` | 	Template RunnerTemplate `json:"template"` | ||||||
| } | } | ||||||
|  | @ -30,6 +44,14 @@ type RunnerDeploymentSpec struct { | ||||||
| type RunnerDeploymentStatus struct { | type RunnerDeploymentStatus struct { | ||||||
| 	AvailableReplicas int `json:"availableReplicas"` | 	AvailableReplicas int `json:"availableReplicas"` | ||||||
| 	ReadyReplicas     int `json:"readyReplicas"` | 	ReadyReplicas     int `json:"readyReplicas"` | ||||||
|  | 
 | ||||||
|  | 	// Replicas is the total number of desired, non-terminated and latest pods to be set for the primary RunnerSet
 | ||||||
|  | 	// This doesn't include outdated pods while upgrading the deployment and replacing the runnerset.
 | ||||||
|  | 	// +optional
 | ||||||
|  | 	Replicas *int `json:"desiredReplicas,omitempty"` | ||||||
|  | 
 | ||||||
|  | 	// +optional
 | ||||||
|  | 	LastSuccessfulScaleOutTime *metav1.Time `json:"lastSuccessfulScaleOutTime,omitempty"` | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // +kubebuilder:object:root=true
 | // +kubebuilder:object:root=true
 | ||||||
|  |  | ||||||
|  | @ -58,7 +58,7 @@ func (in *RunnerDeployment) DeepCopyInto(out *RunnerDeployment) { | ||||||
| 	out.TypeMeta = in.TypeMeta | 	out.TypeMeta = in.TypeMeta | ||||||
| 	in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) | 	in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) | ||||||
| 	in.Spec.DeepCopyInto(&out.Spec) | 	in.Spec.DeepCopyInto(&out.Spec) | ||||||
| 	out.Status = in.Status | 	in.Status.DeepCopyInto(&out.Status) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunnerDeployment.
 | // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunnerDeployment.
 | ||||||
|  | @ -119,6 +119,21 @@ func (in *RunnerDeploymentSpec) DeepCopyInto(out *RunnerDeploymentSpec) { | ||||||
| 		*out = new(int) | 		*out = new(int) | ||||||
| 		**out = **in | 		**out = **in | ||||||
| 	} | 	} | ||||||
|  | 	if in.MinReplicas != nil { | ||||||
|  | 		in, out := &in.MinReplicas, &out.MinReplicas | ||||||
|  | 		*out = new(int) | ||||||
|  | 		**out = **in | ||||||
|  | 	} | ||||||
|  | 	if in.MaxReplicas != nil { | ||||||
|  | 		in, out := &in.MaxReplicas, &out.MaxReplicas | ||||||
|  | 		*out = new(int) | ||||||
|  | 		**out = **in | ||||||
|  | 	} | ||||||
|  | 	if in.ScaleDownDelaySecondsAfterScaleUp != nil { | ||||||
|  | 		in, out := &in.ScaleDownDelaySecondsAfterScaleUp, &out.ScaleDownDelaySecondsAfterScaleUp | ||||||
|  | 		*out = new(int) | ||||||
|  | 		**out = **in | ||||||
|  | 	} | ||||||
| 	in.Template.DeepCopyInto(&out.Template) | 	in.Template.DeepCopyInto(&out.Template) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -135,6 +150,15 @@ func (in *RunnerDeploymentSpec) DeepCopy() *RunnerDeploymentSpec { | ||||||
| // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 | // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 | ||||||
| func (in *RunnerDeploymentStatus) DeepCopyInto(out *RunnerDeploymentStatus) { | func (in *RunnerDeploymentStatus) DeepCopyInto(out *RunnerDeploymentStatus) { | ||||||
| 	*out = *in | 	*out = *in | ||||||
|  | 	if in.Replicas != nil { | ||||||
|  | 		in, out := &in.Replicas, &out.Replicas | ||||||
|  | 		*out = new(int) | ||||||
|  | 		**out = **in | ||||||
|  | 	} | ||||||
|  | 	if in.LastSuccessfulScaleOutTime != nil { | ||||||
|  | 		in, out := &in.LastSuccessfulScaleOutTime, &out.LastSuccessfulScaleOutTime | ||||||
|  | 		*out = (*in).DeepCopy() | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunnerDeploymentStatus.
 | // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunnerDeploymentStatus.
 | ||||||
|  |  | ||||||
|  | @ -46,8 +46,21 @@ spec: | ||||||
|         spec: |         spec: | ||||||
|           description: RunnerReplicaSetSpec defines the desired state of RunnerDeployment |           description: RunnerReplicaSetSpec defines the desired state of RunnerDeployment | ||||||
|           properties: |           properties: | ||||||
|  |             maxReplicas: | ||||||
|  |               description: MinReplicas is the maximum number of replicas the deployment | ||||||
|  |                 is allowed to scale | ||||||
|  |               type: integer | ||||||
|  |             minReplicas: | ||||||
|  |               description: MinReplicas is the minimum number of replicas the deployment | ||||||
|  |                 is allowed to scale | ||||||
|  |               type: integer | ||||||
|             replicas: |             replicas: | ||||||
|               type: integer |               type: integer | ||||||
|  |             scaleDownDelaySecondsAfterScaleOut: | ||||||
|  |               description: ScaleDownDelaySecondsAfterScaleUp is the approximate delay | ||||||
|  |                 for a scale down followed by a scale up Used to prevent flapping (down->up->down->... | ||||||
|  |                 loop) | ||||||
|  |               type: integer | ||||||
|             template: |             template: | ||||||
|               properties: |               properties: | ||||||
|                 metadata: |                 metadata: | ||||||
|  | @ -6717,13 +6730,20 @@ spec: | ||||||
|                   type: object |                   type: object | ||||||
|               type: object |               type: object | ||||||
|           required: |           required: | ||||||
|           - replicas |  | ||||||
|           - template |           - template | ||||||
|           type: object |           type: object | ||||||
|         status: |         status: | ||||||
|           properties: |           properties: | ||||||
|             availableReplicas: |             availableReplicas: | ||||||
|               type: integer |               type: integer | ||||||
|  |             desiredReplicas: | ||||||
|  |               description: Replicas is the total number of desired, non-terminated | ||||||
|  |                 and latest pods to be set for the primary RunnerSet This doesn't include | ||||||
|  |                 outdated pods while upgrading the deployment and replacing the runnerset. | ||||||
|  |               type: integer | ||||||
|  |             lastSuccessfulScaleOutTime: | ||||||
|  |               format: date-time | ||||||
|  |               type: string | ||||||
|             readyReplicas: |             readyReplicas: | ||||||
|               type: integer |               type: integer | ||||||
|           required: |           required: | ||||||
|  |  | ||||||
|  | @ -0,0 +1,92 @@ | ||||||
|  | package controllers | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"fmt" | ||||||
|  | 	"github.com/summerwind/actions-runner-controller/api/v1alpha1" | ||||||
|  | 	"strings" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type NotSupported struct { | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var _ error = NotSupported{} | ||||||
|  | 
 | ||||||
|  | func (e NotSupported) Error() string { | ||||||
|  | 	return "Autoscaling is currently supported only when spec.repository is set" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (r *RunnerDeploymentReconciler) determineDesiredReplicas(rd v1alpha1.RunnerDeployment) (*int, error) { | ||||||
|  | 	if rd.Spec.Replicas != nil { | ||||||
|  | 		return nil, fmt.Errorf("bug: determineDesiredReplicas should not be called for deplomeny with specific replicas") | ||||||
|  | 	} else if rd.Spec.MinReplicas == nil { | ||||||
|  | 		return nil, fmt.Errorf("runnerdeployment %s/%s is missing minReplicas", rd.Namespace, rd.Name) | ||||||
|  | 	} else if rd.Spec.MaxReplicas == nil { | ||||||
|  | 		return nil, fmt.Errorf("runnerdeployment %s/%s is missing maxReplicas", rd.Namespace, rd.Name) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var replicas int | ||||||
|  | 
 | ||||||
|  | 	repoID := rd.Spec.Template.Spec.Repository | ||||||
|  | 	if repoID == "" { | ||||||
|  | 		return nil, NotSupported{} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	repo := strings.Split(repoID, "/") | ||||||
|  | 	user, repoName := repo[0], repo[1] | ||||||
|  | 	list, _, err := r.GitHubClient.Actions.ListRepositoryWorkflowRuns(context.TODO(), user, repoName, nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var total, inProgress, queued, completed, unknown int | ||||||
|  | 
 | ||||||
|  | 	for _, r := range list.WorkflowRuns { | ||||||
|  | 		total++ | ||||||
|  | 
 | ||||||
|  | 		// In May 2020, there are only 3 statuses.
 | ||||||
|  | 		// Follow the below links for more details:
 | ||||||
|  | 		// - https://developer.github.com/v3/actions/workflow-runs/#list-repository-workflow-runs
 | ||||||
|  | 		// - https://developer.github.com/v3/checks/runs/#create-a-check-run
 | ||||||
|  | 		switch r.GetStatus() { | ||||||
|  | 		case "completed": | ||||||
|  | 			completed++ | ||||||
|  | 		case "in_progress": | ||||||
|  | 			inProgress++ | ||||||
|  | 		case "queued": | ||||||
|  | 			queued++ | ||||||
|  | 		default: | ||||||
|  | 			unknown++ | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	minReplicas := *rd.Spec.MinReplicas | ||||||
|  | 	maxReplicas := *rd.Spec.MaxReplicas | ||||||
|  | 	necessaryReplicas := queued + inProgress | ||||||
|  | 
 | ||||||
|  | 	var desiredReplicas int | ||||||
|  | 
 | ||||||
|  | 	if necessaryReplicas < minReplicas { | ||||||
|  | 		desiredReplicas = minReplicas | ||||||
|  | 	} else if necessaryReplicas > maxReplicas { | ||||||
|  | 		desiredReplicas = maxReplicas | ||||||
|  | 	} else { | ||||||
|  | 		desiredReplicas = necessaryReplicas | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	rd.Status.Replicas = &desiredReplicas | ||||||
|  | 	replicas = desiredReplicas | ||||||
|  | 
 | ||||||
|  | 	r.Log.V(1).Info( | ||||||
|  | 		"Calculated desired replicas", | ||||||
|  | 		"computed_replicas_desired", desiredReplicas, | ||||||
|  | 		"spec_replicas_min", minReplicas, | ||||||
|  | 		"spec_replicas_max", maxReplicas, | ||||||
|  | 		"workflow_runs_completed", completed, | ||||||
|  | 		"workflow_runs_in_progress", inProgress, | ||||||
|  | 		"workflow_runs_queued", queued, | ||||||
|  | 		"workflow_runs_unknown", unknown, | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	return &replicas, nil | ||||||
|  | } | ||||||
|  | @ -0,0 +1,199 @@ | ||||||
|  | package controllers | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"github.com/summerwind/actions-runner-controller/api/v1alpha1" | ||||||
|  | 	"github.com/summerwind/actions-runner-controller/github" | ||||||
|  | 	"github.com/summerwind/actions-runner-controller/github/fake" | ||||||
|  | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
|  | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
|  | 	clientgoscheme "k8s.io/client-go/kubernetes/scheme" | ||||||
|  | 	"net/http/httptest" | ||||||
|  | 	"net/url" | ||||||
|  | 	"sigs.k8s.io/controller-runtime/pkg/log/zap" | ||||||
|  | 	"testing" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func newGithubClient(server *httptest.Server) *github.Client { | ||||||
|  | 	client, err := github.NewClientWithAccessToken("token") | ||||||
|  | 	if err != nil { | ||||||
|  | 		panic(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	baseURL, err := url.Parse(server.URL + "/") | ||||||
|  | 	if err != nil { | ||||||
|  | 		panic(err) | ||||||
|  | 	} | ||||||
|  | 	client.Client.BaseURL = baseURL | ||||||
|  | 
 | ||||||
|  | 	return client | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestDetermineDesiredReplicas_RepositoryRunner(t *testing.T) { | ||||||
|  | 	intPtr := func(v int) *int { | ||||||
|  | 		return &v | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	metav1Now := metav1.Now() | ||||||
|  | 	testcases := []struct { | ||||||
|  | 		repo         string | ||||||
|  | 		org          string | ||||||
|  | 		fixed        *int | ||||||
|  | 		max          *int | ||||||
|  | 		min          *int | ||||||
|  | 		sReplicas    *int | ||||||
|  | 		sTime        *metav1.Time | ||||||
|  | 		workflowRuns string | ||||||
|  | 		want         int | ||||||
|  | 		err          string | ||||||
|  | 	}{ | ||||||
|  | 		// 3 demanded, max at 3
 | ||||||
|  | 		{ | ||||||
|  | 			repo:         "test/valid", | ||||||
|  | 			min:          intPtr(2), | ||||||
|  | 			max:          intPtr(3), | ||||||
|  | 			workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"queued"}, {"status":"in_progress"}, {"status":"in_progress"}, {"status":"completed"}]}"`, | ||||||
|  | 			want:         3, | ||||||
|  | 		}, | ||||||
|  | 		// 2 demanded, max at 3, currently 3, delay scaling down due to grace period
 | ||||||
|  | 		{ | ||||||
|  | 			repo:         "test/valid", | ||||||
|  | 			min:          intPtr(2), | ||||||
|  | 			max:          intPtr(3), | ||||||
|  | 			sReplicas:    intPtr(3), | ||||||
|  | 			sTime:        &metav1Now, | ||||||
|  | 			workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"queued"}, {"status":"in_progress"}, {"status":"completed"}]}"`, | ||||||
|  | 			want:         3, | ||||||
|  | 		}, | ||||||
|  | 		// 3 demanded, max at 2
 | ||||||
|  | 		{ | ||||||
|  | 			repo:         "test/valid", | ||||||
|  | 			min:          intPtr(2), | ||||||
|  | 			max:          intPtr(2), | ||||||
|  | 			workflowRuns: `{"total_count": 4, "workflow_runs":[{"status":"queued"}, {"status":"in_progress"}, {"status":"in_progress"}, {"status":"completed"}]}"`, | ||||||
|  | 			want:         2, | ||||||
|  | 		}, | ||||||
|  | 		// 2 demanded, min at 2
 | ||||||
|  | 		{ | ||||||
|  | 			repo:         "test/valid", | ||||||
|  | 			min:          intPtr(2), | ||||||
|  | 			max:          intPtr(3), | ||||||
|  | 			workflowRuns: `{"total_count": 3, "workflow_runs":[{"status":"queued"}, {"status":"in_progress"}, {"status":"completed"}]}"`, | ||||||
|  | 			want:         2, | ||||||
|  | 		}, | ||||||
|  | 		// 1 demanded, min at 2
 | ||||||
|  | 		{ | ||||||
|  | 			repo:         "test/valid", | ||||||
|  | 			min:          intPtr(2), | ||||||
|  | 			max:          intPtr(3), | ||||||
|  | 			workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"queued"}, {"status":"completed"}]}"`, | ||||||
|  | 			want:         2, | ||||||
|  | 		}, | ||||||
|  | 		// 1 demanded, min at 2
 | ||||||
|  | 		{ | ||||||
|  | 			repo:         "test/valid", | ||||||
|  | 			min:          intPtr(2), | ||||||
|  | 			max:          intPtr(3), | ||||||
|  | 			workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`, | ||||||
|  | 			want:         2, | ||||||
|  | 		}, | ||||||
|  | 		// 1 demanded, min at 1
 | ||||||
|  | 		{ | ||||||
|  | 			repo:         "test/valid", | ||||||
|  | 			min:          intPtr(1), | ||||||
|  | 			max:          intPtr(3), | ||||||
|  | 			workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"queued"}, {"status":"completed"}]}"`, | ||||||
|  | 			want:         1, | ||||||
|  | 		}, | ||||||
|  | 		// 1 demanded, min at 1
 | ||||||
|  | 		{ | ||||||
|  | 			repo:         "test/valid", | ||||||
|  | 			min:          intPtr(1), | ||||||
|  | 			max:          intPtr(3), | ||||||
|  | 			workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`, | ||||||
|  | 			want:         1, | ||||||
|  | 		}, | ||||||
|  | 		// fixed at 3
 | ||||||
|  | 		{ | ||||||
|  | 			repo:  "test/valid", | ||||||
|  | 			fixed: intPtr(3), | ||||||
|  | 			want:  3, | ||||||
|  | 		}, | ||||||
|  | 		// org runner, fixed at 3
 | ||||||
|  | 		{ | ||||||
|  | 			org:   "test", | ||||||
|  | 			fixed: intPtr(3), | ||||||
|  | 			want:  3, | ||||||
|  | 		}, | ||||||
|  | 		// org runner, 1 demanded, min at 1
 | ||||||
|  | 		{ | ||||||
|  | 			org:          "test", | ||||||
|  | 			min:          intPtr(1), | ||||||
|  | 			max:          intPtr(3), | ||||||
|  | 			workflowRuns: `{"total_count": 2, "workflow_runs":[{"status":"in_progress"}, {"status":"completed"}]}"`, | ||||||
|  | 			err:          "Autoscaling is currently supported only when spec.repository is set", | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for i := range testcases { | ||||||
|  | 		tc := testcases[i] | ||||||
|  | 
 | ||||||
|  | 		log := zap.New(func(o *zap.Options) { | ||||||
|  | 			o.Development = true | ||||||
|  | 		}) | ||||||
|  | 
 | ||||||
|  | 		scheme := runtime.NewScheme() | ||||||
|  | 		_ = clientgoscheme.AddToScheme(scheme) | ||||||
|  | 		_ = v1alpha1.AddToScheme(scheme) | ||||||
|  | 
 | ||||||
|  | 		t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { | ||||||
|  | 			server := fake.NewServer(fake.WithListRepositoryWorkflowRunsResponse(200, tc.workflowRuns)) | ||||||
|  | 			defer server.Close() | ||||||
|  | 			client := newGithubClient(server) | ||||||
|  | 
 | ||||||
|  | 			r := &RunnerDeploymentReconciler{ | ||||||
|  | 				Log:          log, | ||||||
|  | 				GitHubClient: client, | ||||||
|  | 				Scheme:       scheme, | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			rd := v1alpha1.RunnerDeployment{ | ||||||
|  | 				TypeMeta: metav1.TypeMeta{}, | ||||||
|  | 				Spec: v1alpha1.RunnerDeploymentSpec{ | ||||||
|  | 					Template: v1alpha1.RunnerTemplate{ | ||||||
|  | 						Spec: v1alpha1.RunnerSpec{ | ||||||
|  | 							Repository: tc.repo, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 					Replicas:    tc.fixed, | ||||||
|  | 					MaxReplicas: tc.max, | ||||||
|  | 					MinReplicas: tc.min, | ||||||
|  | 				}, | ||||||
|  | 				Status: v1alpha1.RunnerDeploymentStatus{ | ||||||
|  | 					Replicas:                   tc.sReplicas, | ||||||
|  | 					LastSuccessfulScaleOutTime: tc.sTime, | ||||||
|  | 				}, | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			rs, err := r.newRunnerReplicaSetWithAutoscaling(rd) | ||||||
|  | 			if err != nil { | ||||||
|  | 				if tc.err == "" { | ||||||
|  | 					t.Fatalf("unexpected error: expected none, got %v", err) | ||||||
|  | 				} else if err.Error() != tc.err { | ||||||
|  | 					t.Fatalf("unexpected error: expected %v, got %v", tc.err, err) | ||||||
|  | 				} | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			got := rs.Spec.Replicas | ||||||
|  | 
 | ||||||
|  | 			if got == nil { | ||||||
|  | 				t.Fatalf("unexpected value of rs.Spec.Replicas: nil") | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if *got != tc.want { | ||||||
|  | 				t.Errorf("%d: incorrect desired replicas: want %d, got %d", i, tc.want, *got) | ||||||
|  | 			} | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | @ -20,10 +20,12 @@ import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"hash/fnv" | 	"hash/fnv" | ||||||
| 	"k8s.io/apimachinery/pkg/types" |  | ||||||
| 	"sort" | 	"sort" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/summerwind/actions-runner-controller/github" | ||||||
|  | 	"k8s.io/apimachinery/pkg/types" | ||||||
|  | 
 | ||||||
| 	"github.com/davecgh/go-spew/spew" | 	"github.com/davecgh/go-spew/spew" | ||||||
| 	"github.com/go-logr/logr" | 	"github.com/go-logr/logr" | ||||||
| 	"k8s.io/apimachinery/pkg/runtime" | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
|  | @ -47,6 +49,7 @@ const ( | ||||||
| // RunnerDeploymentReconciler reconciles a Runner object
 | // RunnerDeploymentReconciler reconciles a Runner object
 | ||||||
| type RunnerDeploymentReconciler struct { | type RunnerDeploymentReconciler struct { | ||||||
| 	client.Client | 	client.Client | ||||||
|  | 	GitHubClient *github.Client | ||||||
| 	Log          logr.Logger | 	Log          logr.Logger | ||||||
| 	Recorder     record.EventRecorder | 	Recorder     record.EventRecorder | ||||||
| 	Scheme       *runtime.Scheme | 	Scheme       *runtime.Scheme | ||||||
|  | @ -94,15 +97,19 @@ func (r *RunnerDeploymentReconciler) Reconcile(req ctrl.Request) (ctrl.Result, e | ||||||
| 		oldSets = myRunnerReplicaSets[1:] | 		oldSets = myRunnerReplicaSets[1:] | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	desiredRS, err := r.newRunnerReplicaSet(rd) | 	desiredRS, err := r.newRunnerReplicaSetWithAutoscaling(rd) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | 		if _, ok := err.(NotSupported); ok { | ||||||
|  | 			r.Recorder.Event(&rd, corev1.EventTypeNormal, "RunnerReplicaSetAutoScaleNotSupported", err.Error()) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		log.Error(err, "Could not create runnerreplicaset") | 		log.Error(err, "Could not create runnerreplicaset") | ||||||
| 
 | 
 | ||||||
| 		return ctrl.Result{}, err | 		return ctrl.Result{}, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if newestSet == nil { | 	if newestSet == nil { | ||||||
| 		if err := r.Client.Create(ctx, &desiredRS); err != nil { | 		if err := r.Client.Create(ctx, desiredRS); err != nil { | ||||||
| 			log.Error(err, "Failed to create runnerreplicaset resource") | 			log.Error(err, "Failed to create runnerreplicaset resource") | ||||||
| 
 | 
 | ||||||
| 			return ctrl.Result{}, err | 			return ctrl.Result{}, err | ||||||
|  | @ -118,7 +125,7 @@ func (r *RunnerDeploymentReconciler) Reconcile(req ctrl.Request) (ctrl.Result, e | ||||||
| 		return ctrl.Result{}, nil | 		return ctrl.Result{}, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	desiredTemplateHash, ok := getTemplateHash(&desiredRS) | 	desiredTemplateHash, ok := getTemplateHash(desiredRS) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		log.Info("Failed to get template hash of desired runnerreplicaset resource. It must be in an invalid state. Please manually delete the runnerreplicaset so that it is recreated") | 		log.Info("Failed to get template hash of desired runnerreplicaset resource. It must be in an invalid state. Please manually delete the runnerreplicaset so that it is recreated") | ||||||
| 
 | 
 | ||||||
|  | @ -126,7 +133,7 @@ func (r *RunnerDeploymentReconciler) Reconcile(req ctrl.Request) (ctrl.Result, e | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if newestTemplateHash != desiredTemplateHash { | 	if newestTemplateHash != desiredTemplateHash { | ||||||
| 		if err := r.Client.Create(ctx, &desiredRS); err != nil { | 		if err := r.Client.Create(ctx, desiredRS); err != nil { | ||||||
| 			log.Error(err, "Failed to create runnerreplicaset resource") | 			log.Error(err, "Failed to create runnerreplicaset resource") | ||||||
| 
 | 
 | ||||||
| 			return ctrl.Result{}, err | 			return ctrl.Result{}, err | ||||||
|  | @ -184,6 +191,23 @@ func (r *RunnerDeploymentReconciler) Reconcile(req ctrl.Request) (ctrl.Result, e | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if rd.Spec.Replicas == nil && desiredRS.Spec.Replicas != nil { | ||||||
|  | 		updated := rd.DeepCopy() | ||||||
|  | 		updated.Status.Replicas = desiredRS.Spec.Replicas | ||||||
|  | 
 | ||||||
|  | 		if (rd.Status.Replicas == nil && *desiredRS.Spec.Replicas > 1) || | ||||||
|  | 			(rd.Status.Replicas != nil && *desiredRS.Spec.Replicas > *rd.Status.Replicas) { | ||||||
|  | 
 | ||||||
|  | 			updated.Status.LastSuccessfulScaleOutTime = &metav1.Time{Time: time.Now()} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if err := r.Status().Update(ctx, updated); err != nil { | ||||||
|  | 			log.Error(err, "Failed to update runnerdeployment status") | ||||||
|  | 
 | ||||||
|  | 			return ctrl.Result{}, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return ctrl.Result{}, nil | 	return ctrl.Result{}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -241,7 +265,7 @@ func CloneAndAddLabel(labels map[string]string, labelKey, labelValue string) map | ||||||
| 	return newLabels | 	return newLabels | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (r *RunnerDeploymentReconciler) newRunnerReplicaSet(rd v1alpha1.RunnerDeployment) (v1alpha1.RunnerReplicaSet, error) { | func (r *RunnerDeploymentReconciler) newRunnerReplicaSet(rd v1alpha1.RunnerDeployment, computedReplicas *int) (*v1alpha1.RunnerReplicaSet, error) { | ||||||
| 	newRSTemplate := *rd.Spec.Template.DeepCopy() | 	newRSTemplate := *rd.Spec.Template.DeepCopy() | ||||||
| 	templateHash := ComputeHash(&newRSTemplate) | 	templateHash := ComputeHash(&newRSTemplate) | ||||||
| 	// Add template hash label to selector.
 | 	// Add template hash label to selector.
 | ||||||
|  | @ -262,11 +286,15 @@ func (r *RunnerDeploymentReconciler) newRunnerReplicaSet(rd v1alpha1.RunnerDeplo | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if err := ctrl.SetControllerReference(&rd, &rs, r.Scheme); err != nil { | 	if computedReplicas != nil { | ||||||
| 		return rs, err | 		rs.Spec.Replicas = computedReplicas | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return rs, nil | 	if err := ctrl.SetControllerReference(&rd, &rs, r.Scheme); err != nil { | ||||||
|  | 		return &rs, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return &rs, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (r *RunnerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { | func (r *RunnerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||||||
|  | @ -293,3 +321,36 @@ func (r *RunnerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||||||
| 		Owns(&v1alpha1.RunnerReplicaSet{}). | 		Owns(&v1alpha1.RunnerReplicaSet{}). | ||||||
| 		Complete(r) | 		Complete(r) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (r *RunnerDeploymentReconciler) newRunnerReplicaSetWithAutoscaling(rd v1alpha1.RunnerDeployment) (*v1alpha1.RunnerReplicaSet, error) { | ||||||
|  | 	var computedReplicas *int | ||||||
|  | 
 | ||||||
|  | 	if rd.Spec.Replicas == nil { | ||||||
|  | 		replicas, err := r.determineDesiredReplicas(rd) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		var scaleDownDelay time.Duration | ||||||
|  | 
 | ||||||
|  | 		if rd.Spec.ScaleDownDelaySecondsAfterScaleUp != nil { | ||||||
|  | 			scaleDownDelay = time.Duration(*rd.Spec.ScaleDownDelaySecondsAfterScaleUp) * time.Second | ||||||
|  | 		} else { | ||||||
|  | 			scaleDownDelay = 10 * time.Minute | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		now := time.Now() | ||||||
|  | 
 | ||||||
|  | 		if rd.Status.Replicas == nil || | ||||||
|  | 			*rd.Status.Replicas < *replicas || | ||||||
|  | 			rd.Status.LastSuccessfulScaleOutTime == nil || | ||||||
|  | 			rd.Status.LastSuccessfulScaleOutTime.Add(scaleDownDelay).Before(now) { | ||||||
|  | 
 | ||||||
|  | 			computedReplicas = replicas | ||||||
|  | 		} else { | ||||||
|  | 			computedReplicas = rd.Status.Replicas | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return r.newRunnerReplicaSet(rd, computedReplicas) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -32,7 +32,13 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // NewServer creates a fake server for running unit tests
 | // NewServer creates a fake server for running unit tests
 | ||||||
| func NewServer() *httptest.Server { | func NewServer(opts ...Option) *httptest.Server { | ||||||
|  | 	var responses FixedResponses | ||||||
|  | 
 | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&responses) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	routes := map[string]handler{ | 	routes := map[string]handler{ | ||||||
| 		// For CreateRegistrationToken
 | 		// For CreateRegistrationToken
 | ||||||
| 		"/repos/test/valid/actions/runners/registration-token": handler{ | 		"/repos/test/valid/actions/runners/registration-token": handler{ | ||||||
|  | @ -111,6 +117,9 @@ func NewServer() *httptest.Server { | ||||||
| 			Status: http.StatusBadRequest, | 			Status: http.StatusBadRequest, | ||||||
| 			Body:   "", | 			Body:   "", | ||||||
| 		}, | 		}, | ||||||
|  | 
 | ||||||
|  | 		// For auto-scaling based on the number of queued(pending) workflow runs
 | ||||||
|  | 		"/repos/test/valid/actions/runs": responses.listRepositoryWorkflowRuns.handler(), | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	mux := http.NewServeMux() | 	mux := http.NewServeMux() | ||||||
|  |  | ||||||
|  | @ -0,0 +1,28 @@ | ||||||
|  | package fake | ||||||
|  | 
 | ||||||
|  | type FixedResponses struct { | ||||||
|  | 	listRepositoryWorkflowRuns FixedResponse | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type FixedResponse struct { | ||||||
|  | 	Status int | ||||||
|  | 	Body   string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (r FixedResponse) handler() handler { | ||||||
|  | 	return handler{ | ||||||
|  | 		Status: r.Status, | ||||||
|  | 		Body:   r.Body, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type Option func(responses *FixedResponses) | ||||||
|  | 
 | ||||||
|  | func WithListRepositoryWorkflowRunsResponse(status int, body string) Option { | ||||||
|  | 	return func(r *FixedResponses) { | ||||||
|  | 		r.listRepositoryWorkflowRuns = FixedResponse{ | ||||||
|  | 			Status: status, | ||||||
|  | 			Body:   body, | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | @ -27,8 +27,10 @@ func NewClient(appID, installationID int64, privateKeyPath string) (*Client, err | ||||||
| 		return nil, fmt.Errorf("authentication failed: %v", err) | 		return nil, fmt.Errorf("authentication failed: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	gh := github.NewClient(&http.Client{Transport: tr}) | ||||||
|  | 
 | ||||||
| 	return &Client{ | 	return &Client{ | ||||||
| 		Client:    github.NewClient(&http.Client{Transport: tr}), | 		Client:    gh, | ||||||
| 		regTokens: map[string]*github.RegistrationToken{}, | 		regTokens: map[string]*github.RegistrationToken{}, | ||||||
| 		mu:        sync.Mutex{}, | 		mu:        sync.Mutex{}, | ||||||
| 	}, nil | 	}, nil | ||||||
|  |  | ||||||
							
								
								
									
										1
									
								
								go.mod
								
								
								
								
							
							
						
						
									
										1
									
								
								go.mod
								
								
								
								
							|  | @ -15,5 +15,6 @@ require ( | ||||||
| 	k8s.io/api v0.0.0-20190918155943-95b840bb6a1f | 	k8s.io/api v0.0.0-20190918155943-95b840bb6a1f | ||||||
| 	k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655 | 	k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655 | ||||||
| 	k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90 | 	k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90 | ||||||
|  | 	k8s.io/klog v0.4.0 | ||||||
| 	sigs.k8s.io/controller-runtime v0.4.0 | 	sigs.k8s.io/controller-runtime v0.4.0 | ||||||
| ) | ) | ||||||
|  |  | ||||||
							
								
								
									
										5
									
								
								main.go
								
								
								
								
							
							
						
						
									
										5
									
								
								main.go
								
								
								
								
							|  | @ -21,6 +21,7 @@ import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"os" | 	"os" | ||||||
| 	"strconv" | 	"strconv" | ||||||
|  | 	"time" | ||||||
| 
 | 
 | ||||||
| 	actionsv1alpha1 "github.com/summerwind/actions-runner-controller/api/v1alpha1" | 	actionsv1alpha1 "github.com/summerwind/actions-runner-controller/api/v1alpha1" | ||||||
| 	"github.com/summerwind/actions-runner-controller/controllers" | 	"github.com/summerwind/actions-runner-controller/controllers" | ||||||
|  | @ -57,6 +58,7 @@ func main() { | ||||||
| 
 | 
 | ||||||
| 		metricsAddr          string | 		metricsAddr          string | ||||||
| 		enableLeaderElection bool | 		enableLeaderElection bool | ||||||
|  | 		syncPeriod           time.Duration | ||||||
| 
 | 
 | ||||||
| 		runnerImage string | 		runnerImage string | ||||||
| 		dockerImage string | 		dockerImage string | ||||||
|  | @ -76,6 +78,7 @@ func main() { | ||||||
| 	flag.Int64Var(&ghAppID, "github-app-id", 0, "The application ID of GitHub App.") | 	flag.Int64Var(&ghAppID, "github-app-id", 0, "The application ID of GitHub App.") | ||||||
| 	flag.Int64Var(&ghAppInstallationID, "github-app-installation-id", 0, "The installation ID of GitHub App.") | 	flag.Int64Var(&ghAppInstallationID, "github-app-installation-id", 0, "The installation ID of GitHub App.") | ||||||
| 	flag.StringVar(&ghAppPrivateKey, "github-app-private-key", "", "The path of a private key file to authenticate as a GitHub App") | 	flag.StringVar(&ghAppPrivateKey, "github-app-private-key", "", "The path of a private key file to authenticate as a GitHub App") | ||||||
|  | 	flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "Determines the minimum frequency at which K8s resources managed by this controller are reconciled. When you use autoscaling, set to a lower value like 10 minute, because this corresponds to the minimum time to react on demand change") | ||||||
| 	flag.Parse() | 	flag.Parse() | ||||||
| 
 | 
 | ||||||
| 	if ghToken == "" { | 	if ghToken == "" { | ||||||
|  | @ -133,6 +136,7 @@ func main() { | ||||||
| 		MetricsBindAddress: metricsAddr, | 		MetricsBindAddress: metricsAddr, | ||||||
| 		LeaderElection:     enableLeaderElection, | 		LeaderElection:     enableLeaderElection, | ||||||
| 		Port:               9443, | 		Port:               9443, | ||||||
|  | 		SyncPeriod:         &syncPeriod, | ||||||
| 	}) | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		setupLog.Error(err, "unable to start manager") | 		setupLog.Error(err, "unable to start manager") | ||||||
|  | @ -168,6 +172,7 @@ func main() { | ||||||
| 		Client:       mgr.GetClient(), | 		Client:       mgr.GetClient(), | ||||||
| 		Log:          ctrl.Log.WithName("controllers").WithName("RunnerDeployment"), | 		Log:          ctrl.Log.WithName("controllers").WithName("RunnerDeployment"), | ||||||
| 		Scheme:       mgr.GetScheme(), | 		Scheme:       mgr.GetScheme(), | ||||||
|  | 		GitHubClient: ghClient, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if err = runnerDeploymentReconciler.SetupWithManager(mgr); err != nil { | 	if err = runnerDeploymentReconciler.SetupWithManager(mgr); err != nil { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue