Merge branch 'master' into probes-and-minReadySeconds
This commit is contained in:
		
						commit
						4306ef3031
					
				|  | @ -42,6 +42,10 @@ type EphemeralRunner struct { | |||
| 	Status EphemeralRunnerStatus `json:"status,omitempty"` | ||||
| } | ||||
| 
 | ||||
| func (er *EphemeralRunner) IsDone() bool { | ||||
| 	return er.Status.Phase == corev1.PodSucceeded || er.Status.Phase == corev1.PodFailed | ||||
| } | ||||
| 
 | ||||
| // EphemeralRunnerSpec defines the desired state of EphemeralRunner
 | ||||
| type EphemeralRunnerSpec struct { | ||||
| 	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
 | ||||
|  |  | |||
|  | @ -24,6 +24,8 @@ import ( | |||
| type EphemeralRunnerSetSpec struct { | ||||
| 	// Replicas is the number of desired EphemeralRunner resources in the k8s namespace.
 | ||||
| 	Replicas int `json:"replicas,omitempty"` | ||||
| 	// PatchID is the unique identifier for the patch issued by the listener app
 | ||||
| 	PatchID int `json:"patchID"` | ||||
| 
 | ||||
| 	EphemeralRunnerSpec EphemeralRunnerSpec `json:"ephemeralRunnerSpec,omitempty"` | ||||
| } | ||||
|  |  | |||
|  | @ -6957,9 +6957,14 @@ spec: | |||
|                         - containers | ||||
|                       type: object | ||||
|                   type: object | ||||
|                 patchID: | ||||
|                   description: PatchID is the unique identifier for the patch issued by the listener app | ||||
|                   type: integer | ||||
|                 replicas: | ||||
|                   description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace. | ||||
|                   type: integer | ||||
|               required: | ||||
|                 - patchID | ||||
|               type: object | ||||
|             status: | ||||
|               description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet | ||||
|  |  | |||
|  | @ -13,6 +13,7 @@ metadata: | |||
|     app.kubernetes.io/component: "autoscaling-runner-set" | ||||
|     {{- include "gha-runner-scale-set.labels" . | nindent 4 }} | ||||
|   annotations: | ||||
|     actions.github.com/values-hash: {{ toJson .Values | sha256sum | trunc 63 }} | ||||
|     {{- $containerMode := .Values.containerMode }} | ||||
|     {{- if not (kindIs "string" .Values.githubConfigSecret) }} | ||||
|     actions.github.com/cleanup-github-secret-name: {{ include "gha-runner-scale-set.githubsecret" . }} | ||||
|  |  | |||
|  | @ -900,7 +900,7 @@ func TestTemplateRenderedAutoScalingRunnerSet_EnableDinD(t *testing.T) { | |||
| 	assert.Equal(t, "ghcr.io/actions/actions-runner:latest", ars.Spec.Template.Spec.Containers[0].Image) | ||||
| 	assert.Len(t, ars.Spec.Template.Spec.Containers[0].Env, 2, "The runner container should have 2 env vars, DOCKER_HOST and RUNNER_WAIT_FOR_DOCKER_IN_SECONDS") | ||||
| 	assert.Equal(t, "DOCKER_HOST", ars.Spec.Template.Spec.Containers[0].Env[0].Name) | ||||
| 	assert.Equal(t, "unix:///run/docker/docker.sock", ars.Spec.Template.Spec.Containers[0].Env[0].Value) | ||||
| 	assert.Equal(t, "unix:///var/run/docker.sock", ars.Spec.Template.Spec.Containers[0].Env[0].Value) | ||||
| 	assert.Equal(t, "RUNNER_WAIT_FOR_DOCKER_IN_SECONDS", ars.Spec.Template.Spec.Containers[0].Env[1].Name) | ||||
| 	assert.Equal(t, "120", ars.Spec.Template.Spec.Containers[0].Env[1].Value) | ||||
| 
 | ||||
|  | @ -910,8 +910,7 @@ func TestTemplateRenderedAutoScalingRunnerSet_EnableDinD(t *testing.T) { | |||
| 	assert.False(t, ars.Spec.Template.Spec.Containers[0].VolumeMounts[0].ReadOnly) | ||||
| 
 | ||||
| 	assert.Equal(t, "dind-sock", ars.Spec.Template.Spec.Containers[0].VolumeMounts[1].Name) | ||||
| 	assert.Equal(t, "/run/docker", ars.Spec.Template.Spec.Containers[0].VolumeMounts[1].MountPath) | ||||
| 	assert.True(t, ars.Spec.Template.Spec.Containers[0].VolumeMounts[1].ReadOnly) | ||||
| 	assert.Equal(t, "/var/run", ars.Spec.Template.Spec.Containers[0].VolumeMounts[1].MountPath) | ||||
| 
 | ||||
| 	assert.Equal(t, "dind", ars.Spec.Template.Spec.Containers[1].Name) | ||||
| 	assert.Equal(t, "docker:dind", ars.Spec.Template.Spec.Containers[1].Image) | ||||
|  | @ -921,7 +920,7 @@ func TestTemplateRenderedAutoScalingRunnerSet_EnableDinD(t *testing.T) { | |||
| 	assert.Equal(t, "/home/runner/_work", ars.Spec.Template.Spec.Containers[1].VolumeMounts[0].MountPath) | ||||
| 
 | ||||
| 	assert.Equal(t, "dind-sock", ars.Spec.Template.Spec.Containers[1].VolumeMounts[1].Name) | ||||
| 	assert.Equal(t, "/run/docker", ars.Spec.Template.Spec.Containers[1].VolumeMounts[1].MountPath) | ||||
| 	assert.Equal(t, "/var/run", ars.Spec.Template.Spec.Containers[1].VolumeMounts[1].MountPath) | ||||
| 
 | ||||
| 	assert.Equal(t, "dind-externals", ars.Spec.Template.Spec.Containers[1].VolumeMounts[2].Name) | ||||
| 	assert.Equal(t, "/home/runner/externals", ars.Spec.Template.Spec.Containers[1].VolumeMounts[2].MountPath) | ||||
|  | @ -2089,3 +2088,58 @@ func TestRunnerContainerVolumeNotEmptyMap(t *testing.T) { | |||
| 	_, ok := m.Spec.Template.Spec.Containers[0]["volumeMounts"] | ||||
| 	assert.False(t, ok, "volumeMounts should not be set") | ||||
| } | ||||
| 
 | ||||
| func TestAutoscalingRunnerSetAnnotationValuesHash(t *testing.T) { | ||||
| 	t.Parallel() | ||||
| 
 | ||||
| 	const valuesHash = "actions.github.com/values-hash" | ||||
| 
 | ||||
| 	// Path to the helm chart we will test
 | ||||
| 	helmChartPath, err := filepath.Abs("../../gha-runner-scale-set") | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	releaseName := "test-runners" | ||||
| 	namespaceName := "test-" + strings.ToLower(random.UniqueId()) | ||||
| 
 | ||||
| 	options := &helm.Options{ | ||||
| 		Logger: logger.Discard, | ||||
| 		SetValues: map[string]string{ | ||||
| 			"githubConfigUrl":                    "https://github.com/actions", | ||||
| 			"githubConfigSecret.github_token":    "gh_token12345", | ||||
| 			"controllerServiceAccount.name":      "arc", | ||||
| 			"controllerServiceAccount.namespace": "arc-system", | ||||
| 		}, | ||||
| 		KubectlOptions: k8s.NewKubectlOptions("", "", namespaceName), | ||||
| 	} | ||||
| 
 | ||||
| 	output := helm.RenderTemplate(t, options, helmChartPath, releaseName, []string{"templates/autoscalingrunnerset.yaml"}) | ||||
| 
 | ||||
| 	var autoscalingRunnerSet v1alpha1.AutoscalingRunnerSet | ||||
| 	helm.UnmarshalK8SYaml(t, output, &autoscalingRunnerSet) | ||||
| 
 | ||||
| 	firstHash := autoscalingRunnerSet.Annotations["actions.github.com/values-hash"] | ||||
| 	assert.NotEmpty(t, firstHash) | ||||
| 	assert.LessOrEqual(t, len(firstHash), 63) | ||||
| 
 | ||||
| 	helmChartPath, err = filepath.Abs("../../gha-runner-scale-set") | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	options = &helm.Options{ | ||||
| 		Logger: logger.Discard, | ||||
| 		SetValues: map[string]string{ | ||||
| 			"githubConfigUrl":                    "https://github.com/actions", | ||||
| 			"githubConfigSecret.github_token":    "gh_token1234567890", | ||||
| 			"controllerServiceAccount.name":      "arc", | ||||
| 			"controllerServiceAccount.namespace": "arc-system", | ||||
| 		}, | ||||
| 		KubectlOptions: k8s.NewKubectlOptions("", "", namespaceName), | ||||
| 	} | ||||
| 
 | ||||
| 	output = helm.RenderTemplate(t, options, helmChartPath, releaseName, []string{"templates/autoscalingrunnerset.yaml"}) | ||||
| 
 | ||||
| 	helm.UnmarshalK8SYaml(t, output, &autoscalingRunnerSet) | ||||
| 	secondHash := autoscalingRunnerSet.Annotations[valuesHash] | ||||
| 	assert.NotEmpty(t, secondHash) | ||||
| 	assert.NotEqual(t, firstHash, secondHash) | ||||
| 	assert.LessOrEqual(t, len(secondHash), 63) | ||||
| } | ||||
|  |  | |||
|  | @ -125,18 +125,17 @@ template: | |||
|   ##       command: ["/home/runner/run.sh"] | ||||
|   ##       env: | ||||
|   ##         - name: DOCKER_HOST | ||||
|   ##           value: unix:///run/docker/docker.sock | ||||
|   ##           value: unix:///var/run/docker.sock | ||||
|   ##       volumeMounts: | ||||
|   ##         - name: work | ||||
|   ##           mountPath: /home/runner/_work | ||||
|   ##         - name: dind-sock | ||||
|   ##           mountPath: /run/docker | ||||
|   ##           readOnly: true | ||||
|   ##           mountPath: /var/run | ||||
|   ##     - name: dind | ||||
|   ##       image: docker:dind | ||||
|   ##       args: | ||||
|   ##         - dockerd | ||||
|   ##         - --host=unix:///run/docker/docker.sock | ||||
|   ##         - --host=unix:///var/run/docker.sock | ||||
|   ##         - --group=$(DOCKER_GROUP_GID) | ||||
|   ##       env: | ||||
|   ##         - name: DOCKER_GROUP_GID | ||||
|  | @ -147,7 +146,7 @@ template: | |||
|   ##         - name: work | ||||
|   ##           mountPath: /home/runner/_work | ||||
|   ##         - name: dind-sock | ||||
|   ##           mountPath: /run/docker | ||||
|   ##           mountPath: /var/run | ||||
|   ##         - name: dind-externals | ||||
|   ##           mountPath: /home/runner/externals | ||||
|   ##     volumes: | ||||
|  |  | |||
|  | @ -34,7 +34,7 @@ type Listener interface { | |||
| //go:generate mockery --name Worker --output ./mocks --outpkg mocks --case underscore
 | ||||
| type Worker interface { | ||||
| 	HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error | ||||
| 	HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) | ||||
| 	HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) | ||||
| } | ||||
| 
 | ||||
| func New(config config.Config) (*App, error) { | ||||
|  |  | |||
|  | @ -15,23 +15,23 @@ type Worker struct { | |||
| 	mock.Mock | ||||
| } | ||||
| 
 | ||||
| // HandleDesiredRunnerCount provides a mock function with given fields: ctx, count
 | ||||
| func (_m *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { | ||||
| 	ret := _m.Called(ctx, count) | ||||
| // HandleDesiredRunnerCount provides a mock function with given fields: ctx, count, acquireCount
 | ||||
| func (_m *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, acquireCount int) (int, error) { | ||||
| 	ret := _m.Called(ctx, count, acquireCount) | ||||
| 
 | ||||
| 	var r0 int | ||||
| 	var r1 error | ||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int) (int, error)); ok { | ||||
| 		return rf(ctx, count) | ||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int, int) (int, error)); ok { | ||||
| 		return rf(ctx, count, acquireCount) | ||||
| 	} | ||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int) int); ok { | ||||
| 		r0 = rf(ctx, count) | ||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int, int) int); ok { | ||||
| 		r0 = rf(ctx, count, acquireCount) | ||||
| 	} else { | ||||
| 		r0 = ret.Get(0).(int) | ||||
| 	} | ||||
| 
 | ||||
| 	if rf, ok := ret.Get(1).(func(context.Context, int) error); ok { | ||||
| 		r1 = rf(ctx, count) | ||||
| 	if rf, ok := ret.Get(1).(func(context.Context, int, int) error); ok { | ||||
| 		r1 = rf(ctx, count, acquireCount) | ||||
| 	} else { | ||||
| 		r1 = ret.Error(1) | ||||
| 	} | ||||
|  |  | |||
|  | @ -114,7 +114,7 @@ func New(config Config) (*Listener, error) { | |||
| //go:generate mockery --name Handler --output ./mocks --outpkg mocks --case underscore
 | ||||
| type Handler interface { | ||||
| 	HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error | ||||
| 	HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) | ||||
| 	HandleDesiredRunnerCount(ctx context.Context, count, jobsCompleted int) (int, error) | ||||
| } | ||||
| 
 | ||||
| // Listen listens for incoming messages and handles them using the provided handler.
 | ||||
|  | @ -145,7 +145,7 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error { | |||
| 	} | ||||
| 	l.metrics.PublishStatistics(initialMessage.Statistics) | ||||
| 
 | ||||
| 	desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, initialMessage.Statistics.TotalAssignedJobs) | ||||
| 	desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, initialMessage.Statistics.TotalAssignedJobs, 0) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("handling initial message failed: %w", err) | ||||
| 	} | ||||
|  | @ -207,7 +207,7 @@ func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *acti | |||
| 		l.metrics.PublishJobStarted(jobStarted) | ||||
| 	} | ||||
| 
 | ||||
| 	desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, parsedMsg.statistics.TotalAssignedJobs) | ||||
| 	desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, parsedMsg.statistics.TotalAssignedJobs, len(parsedMsg.jobsCompleted)) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to handle desired runner count: %w", err) | ||||
| 	} | ||||
|  | @ -284,7 +284,6 @@ func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessa | |||
| 	} | ||||
| 
 | ||||
| 	return msg, nil | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func (l *Listener) deleteLastMessage(ctx context.Context) error { | ||||
|  |  | |||
|  | @ -427,7 +427,7 @@ func TestListener_Listen(t *testing.T) { | |||
| 
 | ||||
| 		var called bool | ||||
| 		handler := listenermocks.NewHandler(t) | ||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). | ||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0). | ||||
| 			Return(0, nil). | ||||
| 			Run( | ||||
| 				func(mock.Arguments) { | ||||
|  | @ -485,11 +485,11 @@ func TestListener_Listen(t *testing.T) { | |||
| 		config.Client = client | ||||
| 
 | ||||
| 		handler := listenermocks.NewHandler(t) | ||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). | ||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0). | ||||
| 			Return(0, nil). | ||||
| 			Once() | ||||
| 
 | ||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). | ||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0). | ||||
| 			Return(0, nil). | ||||
| 			Once() | ||||
| 
 | ||||
|  |  | |||
|  | @ -86,7 +86,7 @@ func TestInitialMetrics(t *testing.T) { | |||
| 		config.Client = client | ||||
| 
 | ||||
| 		handler := listenermocks.NewHandler(t) | ||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, sessionStatistics.TotalAssignedJobs). | ||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, sessionStatistics.TotalAssignedJobs, 0). | ||||
| 			Return(sessionStatistics.TotalAssignedJobs, nil). | ||||
| 			Once() | ||||
| 
 | ||||
|  | @ -178,7 +178,7 @@ func TestHandleMessageMetrics(t *testing.T) { | |||
| 
 | ||||
| 	handler := listenermocks.NewHandler(t) | ||||
| 	handler.On("HandleJobStarted", mock.Anything, jobsStarted[0]).Return(nil).Once() | ||||
| 	handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).Return(desiredResult, nil).Once() | ||||
| 	handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 2).Return(desiredResult, nil).Once() | ||||
| 
 | ||||
| 	client := listenermocks.NewClient(t) | ||||
| 	client.On("DeleteMessage", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() | ||||
|  |  | |||
|  | @ -15,23 +15,23 @@ type Handler struct { | |||
| 	mock.Mock | ||||
| } | ||||
| 
 | ||||
| // HandleDesiredRunnerCount provides a mock function with given fields: ctx, count
 | ||||
| func (_m *Handler) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { | ||||
| 	ret := _m.Called(ctx, count) | ||||
| // HandleDesiredRunnerCount provides a mock function with given fields: ctx, count, jobsCompleted
 | ||||
| func (_m *Handler) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) { | ||||
| 	ret := _m.Called(ctx, count, jobsCompleted) | ||||
| 
 | ||||
| 	var r0 int | ||||
| 	var r1 error | ||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int) (int, error)); ok { | ||||
| 		return rf(ctx, count) | ||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int, int) (int, error)); ok { | ||||
| 		return rf(ctx, count, jobsCompleted) | ||||
| 	} | ||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int) int); ok { | ||||
| 		r0 = rf(ctx, count) | ||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int, int) int); ok { | ||||
| 		r0 = rf(ctx, count, jobsCompleted) | ||||
| 	} else { | ||||
| 		r0 = ret.Get(0).(int) | ||||
| 	} | ||||
| 
 | ||||
| 	if rf, ok := ret.Get(1).(func(context.Context, int) error); ok { | ||||
| 		r1 = rf(ctx, count) | ||||
| 	if rf, ok := ret.Get(1).(func(context.Context, int, int) error); ok { | ||||
| 		r1 = rf(ctx, count, jobsCompleted) | ||||
| 	} else { | ||||
| 		r1 = ret.Error(1) | ||||
| 	} | ||||
|  |  | |||
|  | @ -38,18 +38,20 @@ type Config struct { | |||
| // The Worker's role is to process the messages it receives from the listener.
 | ||||
| // It then initiates Kubernetes API requests to carry out the necessary actions.
 | ||||
| type Worker struct { | ||||
| 	clientset *kubernetes.Clientset | ||||
| 	config    Config | ||||
| 	lastPatch int | ||||
| 	logger    *logr.Logger | ||||
| 	clientset   *kubernetes.Clientset | ||||
| 	config      Config | ||||
| 	lastPatch   int | ||||
| 	lastPatchID int | ||||
| 	logger      *logr.Logger | ||||
| } | ||||
| 
 | ||||
| var _ listener.Handler = (*Worker)(nil) | ||||
| 
 | ||||
| func New(config Config, options ...Option) (*Worker, error) { | ||||
| 	w := &Worker{ | ||||
| 		config:    config, | ||||
| 		lastPatch: -1, | ||||
| 		config:      config, | ||||
| 		lastPatch:   -1, | ||||
| 		lastPatchID: -1, | ||||
| 	} | ||||
| 
 | ||||
| 	conf, err := rest.InClusterConfig() | ||||
|  | @ -161,7 +163,7 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart | |||
| // The function then scales the ephemeral runner set by applying the merge patch.
 | ||||
| // Finally, it logs the scaled ephemeral runner set details and returns nil if successful.
 | ||||
| // If any error occurs during the process, it returns an error with a descriptive message.
 | ||||
| func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { | ||||
| func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) { | ||||
| 	// Max runners should always be set by the resource builder either to the configured value,
 | ||||
| 	// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
 | ||||
| 	targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners) | ||||
|  | @ -172,17 +174,22 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, | |||
| 		"min", w.config.MinRunners, | ||||
| 		"max", w.config.MaxRunners, | ||||
| 		"currentRunnerCount", w.lastPatch, | ||||
| 		"jobsCompleted", jobsCompleted, | ||||
| 	} | ||||
| 
 | ||||
| 	if targetRunnerCount == w.lastPatch { | ||||
| 		w.logger.Info("Skipping patching of EphemeralRunnerSet as the desired count has not changed", logValues...) | ||||
| 	if w.lastPatch == targetRunnerCount && jobsCompleted == 0 { | ||||
| 		w.logger.Info("Skipping patch", logValues...) | ||||
| 		return targetRunnerCount, nil | ||||
| 	} | ||||
| 
 | ||||
| 	w.lastPatchID++ | ||||
| 	w.lastPatch = targetRunnerCount | ||||
| 
 | ||||
| 	original, err := json.Marshal( | ||||
| 		&v1alpha1.EphemeralRunnerSet{ | ||||
| 			Spec: v1alpha1.EphemeralRunnerSetSpec{ | ||||
| 				Replicas: -1, | ||||
| 				PatchID:  -1, | ||||
| 			}, | ||||
| 		}, | ||||
| 	) | ||||
|  | @ -194,6 +201,7 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, | |||
| 		&v1alpha1.EphemeralRunnerSet{ | ||||
| 			Spec: v1alpha1.EphemeralRunnerSetSpec{ | ||||
| 				Replicas: targetRunnerCount, | ||||
| 				PatchID:  w.lastPatchID, | ||||
| 			}, | ||||
| 		}, | ||||
| 	) | ||||
|  |  | |||
|  | @ -6957,9 +6957,14 @@ spec: | |||
|                         - containers | ||||
|                       type: object | ||||
|                   type: object | ||||
|                 patchID: | ||||
|                   description: PatchID is the unique identifier for the patch issued by the listener app | ||||
|                   type: integer | ||||
|                 replicas: | ||||
|                   description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace. | ||||
|                   type: integer | ||||
|               required: | ||||
|                 - patchID | ||||
|               type: object | ||||
|             status: | ||||
|               description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet | ||||
|  |  | |||
|  | @ -42,10 +42,14 @@ import ( | |||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	labelKeyRunnerSpecHash            = "runner-spec-hash" | ||||
| 	annotationKeyRunnerSpecHash = "actions.github.com/runner-spec-hash" | ||||
| 	// annotationKeyValuesHash is hash of the entire values json.
 | ||||
| 	// This is used to determine if the values have changed, so we can
 | ||||
| 	// re-create listener.
 | ||||
| 	annotationKeyValuesHash = "actions.github.com/values-hash" | ||||
| 
 | ||||
| 	autoscalingRunnerSetFinalizerName = "autoscalingrunnerset.actions.github.com/finalizer" | ||||
| 	runnerScaleSetIdAnnotationKey     = "runner-scale-set-id" | ||||
| 	runnerScaleSetNameAnnotationKey   = "runner-scale-set-name" | ||||
| ) | ||||
| 
 | ||||
| type UpdateStrategy string | ||||
|  | @ -205,7 +209,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl | |||
| 	} | ||||
| 
 | ||||
| 	// Make sure the runner scale set name is up to date
 | ||||
| 	currentRunnerScaleSetName, ok := autoscalingRunnerSet.Annotations[runnerScaleSetNameAnnotationKey] | ||||
| 	currentRunnerScaleSetName, ok := autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerScaleSetName] | ||||
| 	if !ok || (len(autoscalingRunnerSet.Spec.RunnerScaleSetName) > 0 && !strings.EqualFold(currentRunnerScaleSetName, autoscalingRunnerSet.Spec.RunnerScaleSetName)) { | ||||
| 		log.Info("AutoScalingRunnerSet runner scale set name changed. Updating the runner scale set.") | ||||
| 		return r.updateRunnerScaleSetName(ctx, autoscalingRunnerSet, log) | ||||
|  | @ -231,9 +235,8 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl | |||
| 		return r.createEphemeralRunnerSet(ctx, autoscalingRunnerSet, log) | ||||
| 	} | ||||
| 
 | ||||
| 	desiredSpecHash := autoscalingRunnerSet.RunnerSetSpecHash() | ||||
| 	for _, runnerSet := range existingRunnerSets.all() { | ||||
| 		log.Info("Find existing ephemeral runner set", "name", runnerSet.Name, "specHash", runnerSet.Labels[labelKeyRunnerSpecHash]) | ||||
| 		log.Info("Find existing ephemeral runner set", "name", runnerSet.Name, "specHash", runnerSet.Annotations[annotationKeyRunnerSpecHash]) | ||||
| 	} | ||||
| 
 | ||||
| 	// Make sure the AutoscalingListener is up and running in the controller namespace
 | ||||
|  | @ -250,7 +253,9 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl | |||
| 	} | ||||
| 
 | ||||
| 	// Our listener pod is out of date, so we need to delete it to get a new recreate.
 | ||||
| 	if listenerFound && (listener.Labels[labelKeyRunnerSpecHash] != autoscalingRunnerSet.ListenerSpecHash()) { | ||||
| 	listenerValuesHashChanged := listener.Annotations[annotationKeyValuesHash] != autoscalingRunnerSet.Annotations[annotationKeyValuesHash] | ||||
| 	listenerSpecHashChanged := listener.Annotations[annotationKeyRunnerSpecHash] != autoscalingRunnerSet.ListenerSpecHash() | ||||
| 	if listenerFound && (listenerValuesHashChanged || listenerSpecHashChanged) { | ||||
| 		log.Info("RunnerScaleSetListener is out of date. Deleting it so that it is recreated", "name", listener.Name) | ||||
| 		if err := r.Delete(ctx, listener); err != nil { | ||||
| 			if kerrors.IsNotFound(err) { | ||||
|  | @ -264,7 +269,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl | |||
| 		return ctrl.Result{}, nil | ||||
| 	} | ||||
| 
 | ||||
| 	if desiredSpecHash != latestRunnerSet.Labels[labelKeyRunnerSpecHash] { | ||||
| 	if latestRunnerSet.Annotations[annotationKeyRunnerSpecHash] != autoscalingRunnerSet.RunnerSetSpecHash() { | ||||
| 		if r.drainingJobs(&latestRunnerSet.Status) { | ||||
| 			log.Info("Latest runner set spec hash does not match the current autoscaling runner set. Waiting for the running and pending runners to finish:", "running", latestRunnerSet.Status.RunningEphemeralRunners, "pending", latestRunnerSet.Status.PendingEphemeralRunners) | ||||
| 			log.Info("Scaling down the number of desired replicas to 0") | ||||
|  | @ -480,7 +485,7 @@ func (r *AutoscalingRunnerSetReconciler) createRunnerScaleSet(ctx context.Contex | |||
| 
 | ||||
| 	logger.Info("Adding runner scale set ID, name and runner group name as an annotation and url labels") | ||||
| 	if err = patch(ctx, r.Client, autoscalingRunnerSet, func(obj *v1alpha1.AutoscalingRunnerSet) { | ||||
| 		obj.Annotations[runnerScaleSetNameAnnotationKey] = runnerScaleSet.Name | ||||
| 		obj.Annotations[AnnotationKeyGitHubRunnerScaleSetName] = runnerScaleSet.Name | ||||
| 		obj.Annotations[runnerScaleSetIdAnnotationKey] = strconv.Itoa(runnerScaleSet.Id) | ||||
| 		obj.Annotations[AnnotationKeyGitHubRunnerGroupName] = runnerScaleSet.RunnerGroupName | ||||
| 		if err := applyGitHubURLLabels(obj.Spec.GitHubConfigUrl, obj.Labels); err != nil { // should never happen
 | ||||
|  | @ -528,9 +533,10 @@ func (r *AutoscalingRunnerSetReconciler) updateRunnerScaleSetRunnerGroup(ctx con | |||
| 		return ctrl.Result{}, err | ||||
| 	} | ||||
| 
 | ||||
| 	logger.Info("Updating runner scale set runner group name as an annotation") | ||||
| 	logger.Info("Updating runner scale set name and runner group name as annotations") | ||||
| 	if err := patch(ctx, r.Client, autoscalingRunnerSet, func(obj *v1alpha1.AutoscalingRunnerSet) { | ||||
| 		obj.Annotations[AnnotationKeyGitHubRunnerGroupName] = updatedRunnerScaleSet.RunnerGroupName | ||||
| 		obj.Annotations[AnnotationKeyGitHubRunnerScaleSetName] = updatedRunnerScaleSet.Name | ||||
| 	}); err != nil { | ||||
| 		logger.Error(err, "Failed to update runner group name annotation") | ||||
| 		return ctrl.Result{}, err | ||||
|  | @ -566,7 +572,7 @@ func (r *AutoscalingRunnerSetReconciler) updateRunnerScaleSetName(ctx context.Co | |||
| 
 | ||||
| 	logger.Info("Updating runner scale set name as an annotation") | ||||
| 	if err := patch(ctx, r.Client, autoscalingRunnerSet, func(obj *v1alpha1.AutoscalingRunnerSet) { | ||||
| 		obj.Annotations[runnerScaleSetNameAnnotationKey] = updatedRunnerScaleSet.Name | ||||
| 		obj.Annotations[AnnotationKeyGitHubRunnerScaleSetName] = updatedRunnerScaleSet.Name | ||||
| 	}); err != nil { | ||||
| 		logger.Error(err, "Failed to update runner scale set name annotation") | ||||
| 		return ctrl.Result{}, err | ||||
|  |  | |||
|  | @ -280,6 +280,10 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { | |||
| 			// This should trigger re-creation of EphemeralRunnerSet and Listener
 | ||||
| 			patched := autoscalingRunnerSet.DeepCopy() | ||||
| 			patched.Spec.Template.Spec.PriorityClassName = "test-priority-class" | ||||
| 			if patched.ObjectMeta.Annotations == nil { | ||||
| 				patched.ObjectMeta.Annotations = make(map[string]string) | ||||
| 			} | ||||
| 			patched.ObjectMeta.Annotations[annotationKeyValuesHash] = "test-hash" | ||||
| 			err = k8sClient.Patch(ctx, patched, client.MergeFrom(autoscalingRunnerSet)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to patch AutoScalingRunnerSet") | ||||
| 			autoscalingRunnerSet = patched.DeepCopy() | ||||
|  | @ -297,10 +301,10 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { | |||
| 						return "", fmt.Errorf("We should have only 1 EphemeralRunnerSet, but got %v", len(runnerSetList.Items)) | ||||
| 					} | ||||
| 
 | ||||
| 					return runnerSetList.Items[0].Labels[labelKeyRunnerSpecHash], nil | ||||
| 					return runnerSetList.Items[0].Annotations[annotationKeyRunnerSpecHash], nil | ||||
| 				}, | ||||
| 				autoscalingRunnerSetTestTimeout, | ||||
| 				autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(runnerSet.Labels[labelKeyRunnerSpecHash]), "New EphemeralRunnerSet should be created") | ||||
| 				autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(runnerSet.Annotations[annotationKeyRunnerSpecHash]), "New EphemeralRunnerSet should be created") | ||||
| 
 | ||||
| 			// We should create a new listener
 | ||||
| 			Eventually( | ||||
|  | @ -334,6 +338,55 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { | |||
| 			err = k8sClient.Patch(ctx, patched, client.MergeFrom(autoscalingRunnerSet)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to patch AutoScalingRunnerSet") | ||||
| 
 | ||||
| 			// We should not re-create a new EphemeralRunnerSet
 | ||||
| 			Consistently( | ||||
| 				func() (string, error) { | ||||
| 					runnerSetList := new(v1alpha1.EphemeralRunnerSetList) | ||||
| 					err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace)) | ||||
| 					if err != nil { | ||||
| 						return "", err | ||||
| 					} | ||||
| 
 | ||||
| 					if len(runnerSetList.Items) != 1 { | ||||
| 						return "", fmt.Errorf("We should have only 1 EphemeralRunnerSet, but got %v", len(runnerSetList.Items)) | ||||
| 					} | ||||
| 
 | ||||
| 					return string(runnerSetList.Items[0].UID), nil | ||||
| 				}, | ||||
| 				autoscalingRunnerSetTestTimeout, | ||||
| 				autoscalingRunnerSetTestInterval).Should(BeEquivalentTo(string(runnerSet.UID)), "New EphemeralRunnerSet should not be created") | ||||
| 
 | ||||
| 			// We should only re-create a new listener
 | ||||
| 			Eventually( | ||||
| 				func() (string, error) { | ||||
| 					listener := new(v1alpha1.AutoscalingListener) | ||||
| 					err := k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, listener) | ||||
| 					if err != nil { | ||||
| 						return "", err | ||||
| 					} | ||||
| 
 | ||||
| 					return string(listener.UID), nil | ||||
| 				}, | ||||
| 				autoscalingRunnerSetTestTimeout, | ||||
| 				autoscalingRunnerSetTestInterval).ShouldNot(BeEquivalentTo(string(listener.UID)), "New Listener should be created") | ||||
| 
 | ||||
| 			// Only update the values hash for the autoscaling runner set
 | ||||
| 			// This should trigger re-creation of the Listener only
 | ||||
| 			runnerSetList = new(v1alpha1.EphemeralRunnerSetList) | ||||
| 			err = k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to list EphemeralRunnerSet") | ||||
| 			Expect(len(runnerSetList.Items)).To(Equal(1), "There should be 1 EphemeralRunnerSet") | ||||
| 			runnerSet = runnerSetList.Items[0] | ||||
| 
 | ||||
| 			listener = new(v1alpha1.AutoscalingListener) | ||||
| 			err = k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, listener) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to get Listener") | ||||
| 
 | ||||
| 			patched = autoscalingRunnerSet.DeepCopy() | ||||
| 			patched.ObjectMeta.Annotations[annotationKeyValuesHash] = "hash-changes" | ||||
| 			err = k8sClient.Patch(ctx, patched, client.MergeFrom(autoscalingRunnerSet)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to patch AutoScalingRunnerSet") | ||||
| 
 | ||||
| 			// We should not re-create a new EphemeralRunnerSet
 | ||||
| 			Consistently( | ||||
| 				func() (string, error) { | ||||
|  | @ -493,6 +546,10 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { | |||
| 			// Patch the AutoScalingRunnerSet image which should trigger
 | ||||
| 			// the recreation of the Listener and EphemeralRunnerSet
 | ||||
| 			patched := autoscalingRunnerSet.DeepCopy() | ||||
| 			if patched.ObjectMeta.Annotations == nil { | ||||
| 				patched.ObjectMeta.Annotations = make(map[string]string) | ||||
| 			} | ||||
| 			patched.ObjectMeta.Annotations[annotationKeyValuesHash] = "testgroup2" | ||||
| 			patched.Spec.Template.Spec = corev1.PodSpec{ | ||||
| 				Containers: []corev1.Container{ | ||||
| 					{ | ||||
|  | @ -501,7 +558,6 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() { | |||
| 					}, | ||||
| 				}, | ||||
| 			} | ||||
| 			// patched.Spec.Template.Spec.PriorityClassName = "test-priority-class"
 | ||||
| 			err = k8sClient.Patch(ctx, patched, client.MergeFrom(autoscalingRunnerSet)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to patch AutoScalingRunnerSet") | ||||
| 			autoscalingRunnerSet = patched.DeepCopy() | ||||
|  | @ -698,7 +754,7 @@ var _ = Describe("Test AutoScalingController updates", Ordered, func() { | |||
| 						return "", err | ||||
| 					} | ||||
| 
 | ||||
| 					if val, ok := ars.Annotations[runnerScaleSetNameAnnotationKey]; ok { | ||||
| 					if val, ok := ars.Annotations[AnnotationKeyGitHubRunnerScaleSetName]; ok { | ||||
| 						return val, nil | ||||
| 					} | ||||
| 
 | ||||
|  | @ -722,7 +778,7 @@ var _ = Describe("Test AutoScalingController updates", Ordered, func() { | |||
| 						return "", err | ||||
| 					} | ||||
| 
 | ||||
| 					if val, ok := ars.Annotations[runnerScaleSetNameAnnotationKey]; ok { | ||||
| 					if val, ok := ars.Annotations[AnnotationKeyGitHubRunnerScaleSetName]; ok { | ||||
| 						return val, nil | ||||
| 					} | ||||
| 
 | ||||
|  |  | |||
|  | @ -39,7 +39,11 @@ const ( | |||
| // Finalizer used to protect resources from deletion while AutoscalingRunnerSet is running
 | ||||
| const AutoscalingRunnerSetCleanupFinalizerName = "actions.github.com/cleanup-protection" | ||||
| 
 | ||||
| const AnnotationKeyGitHubRunnerGroupName = "actions.github.com/runner-group-name" | ||||
| const ( | ||||
| 	AnnotationKeyGitHubRunnerGroupName    = "actions.github.com/runner-group-name" | ||||
| 	AnnotationKeyGitHubRunnerScaleSetName = "actions.github.com/runner-scale-set-name" | ||||
| 	AnnotationKeyPatchID                  = "actions.github.com/patch-id" | ||||
| ) | ||||
| 
 | ||||
| // Labels applied to listener roles
 | ||||
| const ( | ||||
|  |  | |||
|  | @ -133,6 +133,23 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ | |||
| 		return ctrl.Result{}, nil | ||||
| 	} | ||||
| 
 | ||||
| 	if ephemeralRunner.IsDone() { | ||||
| 		log.Info("Cleaning up resources after after ephemeral runner termination", "phase", ephemeralRunner.Status.Phase) | ||||
| 		done, err := r.cleanupResources(ctx, ephemeralRunner, log) | ||||
| 		if err != nil { | ||||
| 			log.Error(err, "Failed to clean up ephemeral runner owned resources") | ||||
| 			return ctrl.Result{}, err | ||||
| 		} | ||||
| 		if !done { | ||||
| 			log.Info("Waiting for ephemeral runner owned resources to be deleted") | ||||
| 			return ctrl.Result{Requeue: true}, nil | ||||
| 		} | ||||
| 		// Stop reconciling on this object.
 | ||||
| 		// The EphemeralRunnerSet is responsible for cleaning it up.
 | ||||
| 		log.Info("EphemeralRunner has already finished. Stopping reconciliation and waiting for EphemeralRunnerSet to clean it up", "phase", ephemeralRunner.Status.Phase) | ||||
| 		return ctrl.Result{}, nil | ||||
| 	} | ||||
| 
 | ||||
| 	if !controllerutil.ContainsFinalizer(ephemeralRunner, ephemeralRunnerActionsFinalizerName) { | ||||
| 		log.Info("Adding runner registration finalizer") | ||||
| 		err := patch(ctx, r.Client, ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) { | ||||
|  | @ -159,13 +176,6 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ | |||
| 		return ctrl.Result{}, nil | ||||
| 	} | ||||
| 
 | ||||
| 	if ephemeralRunner.Status.Phase == corev1.PodSucceeded || ephemeralRunner.Status.Phase == corev1.PodFailed { | ||||
| 		// Stop reconciling on this object.
 | ||||
| 		// The EphemeralRunnerSet is responsible for cleaning it up.
 | ||||
| 		log.Info("EphemeralRunner has already finished. Stopping reconciliation and waiting for EphemeralRunnerSet to clean it up", "phase", ephemeralRunner.Status.Phase) | ||||
| 		return ctrl.Result{}, nil | ||||
| 	} | ||||
| 
 | ||||
| 	if ephemeralRunner.Status.RunnerId == 0 { | ||||
| 		log.Info("Creating new ephemeral runner registration and updating status with runner config") | ||||
| 		return r.updateStatusWithRunnerConfig(ctx, ephemeralRunner, log) | ||||
|  | @ -324,7 +334,7 @@ func (r *EphemeralRunnerReconciler) cleanupResources(ctx context.Context, epheme | |||
| 			} | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	case err != nil && !kerrors.IsNotFound(err): | ||||
| 	case !kerrors.IsNotFound(err): | ||||
| 		return false, err | ||||
| 	} | ||||
| 	log.Info("Pod is deleted") | ||||
|  | @ -341,7 +351,7 @@ func (r *EphemeralRunnerReconciler) cleanupResources(ctx context.Context, epheme | |||
| 			} | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	case err != nil && !kerrors.IsNotFound(err): | ||||
| 	case !kerrors.IsNotFound(err): | ||||
| 		return false, err | ||||
| 	} | ||||
| 	log.Info("Secret is deleted") | ||||
|  |  | |||
|  | @ -22,6 +22,7 @@ import ( | |||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"sort" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 
 | ||||
| 	"github.com/actions/actions-runner-controller/apis/actions.github.com/v1alpha1" | ||||
|  | @ -156,14 +157,14 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R | |||
| 		return ctrl.Result{}, err | ||||
| 	} | ||||
| 
 | ||||
| 	pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners := categorizeEphemeralRunners(ephemeralRunnerList) | ||||
| 	ephemeralRunnerState := newEphemeralRunnerState(ephemeralRunnerList) | ||||
| 
 | ||||
| 	log.Info("Ephemeral runner counts", | ||||
| 		"pending", len(pendingEphemeralRunners), | ||||
| 		"running", len(runningEphemeralRunners), | ||||
| 		"finished", len(finishedEphemeralRunners), | ||||
| 		"failed", len(failedEphemeralRunners), | ||||
| 		"deleting", len(deletingEphemeralRunners), | ||||
| 		"pending", len(ephemeralRunnerState.pending), | ||||
| 		"running", len(ephemeralRunnerState.running), | ||||
| 		"finished", len(ephemeralRunnerState.finished), | ||||
| 		"failed", len(ephemeralRunnerState.failed), | ||||
| 		"deleting", len(ephemeralRunnerState.deleting), | ||||
| 	) | ||||
| 
 | ||||
| 	if r.PublishMetrics { | ||||
|  | @ -183,54 +184,52 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R | |||
| 				Organization: parsedURL.Organization, | ||||
| 				Enterprise:   parsedURL.Enterprise, | ||||
| 			}, | ||||
| 			len(pendingEphemeralRunners), | ||||
| 			len(runningEphemeralRunners), | ||||
| 			len(failedEphemeralRunners), | ||||
| 			len(ephemeralRunnerState.pending), | ||||
| 			len(ephemeralRunnerState.running), | ||||
| 			len(ephemeralRunnerState.failed), | ||||
| 		) | ||||
| 	} | ||||
| 
 | ||||
| 	// cleanup finished runners and proceed
 | ||||
| 	var errs []error | ||||
| 	for i := range finishedEphemeralRunners { | ||||
| 		log.Info("Deleting finished ephemeral runner", "name", finishedEphemeralRunners[i].Name) | ||||
| 		if err := r.Delete(ctx, finishedEphemeralRunners[i]); err != nil { | ||||
| 			if !kerrors.IsNotFound(err) { | ||||
| 				errs = append(errs, err) | ||||
| 	total := ephemeralRunnerState.scaleTotal() | ||||
| 	if ephemeralRunnerSet.Spec.PatchID == 0 || ephemeralRunnerSet.Spec.PatchID != ephemeralRunnerState.latestPatchID { | ||||
| 		defer func() { | ||||
| 			if err := r.cleanupFinishedEphemeralRunners(ctx, ephemeralRunnerState.finished, log); err != nil { | ||||
| 				log.Error(err, "failed to cleanup finished ephemeral runners") | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 		}() | ||||
| 
 | ||||
| 	if len(errs) > 0 { | ||||
| 		mergedErrs := multierr.Combine(errs...) | ||||
| 		log.Error(mergedErrs, "Failed to delete finished ephemeral runners") | ||||
| 		return ctrl.Result{}, mergedErrs | ||||
| 	} | ||||
| 		log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) | ||||
| 		switch { | ||||
| 		case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up
 | ||||
| 			count := ephemeralRunnerSet.Spec.Replicas - total | ||||
| 			log.Info("Creating new ephemeral runners (scale up)", "count", count) | ||||
| 			if err := r.createEphemeralRunners(ctx, ephemeralRunnerSet, count, log); err != nil { | ||||
| 				log.Error(err, "failed to make ephemeral runner") | ||||
| 				return ctrl.Result{}, err | ||||
| 			} | ||||
| 
 | ||||
| 	total := len(pendingEphemeralRunners) + len(runningEphemeralRunners) + len(failedEphemeralRunners) | ||||
| 	log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) | ||||
| 	switch { | ||||
| 	case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up
 | ||||
| 		count := ephemeralRunnerSet.Spec.Replicas - total | ||||
| 		log.Info("Creating new ephemeral runners (scale up)", "count", count) | ||||
| 		if err := r.createEphemeralRunners(ctx, ephemeralRunnerSet, count, log); err != nil { | ||||
| 			log.Error(err, "failed to make ephemeral runner") | ||||
| 			return ctrl.Result{}, err | ||||
| 		} | ||||
| 
 | ||||
| 	case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario.
 | ||||
| 		count := total - ephemeralRunnerSet.Spec.Replicas | ||||
| 		log.Info("Deleting ephemeral runners (scale down)", "count", count) | ||||
| 		if err := r.deleteIdleEphemeralRunners(ctx, ephemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners, count, log); err != nil { | ||||
| 			log.Error(err, "failed to delete idle runners") | ||||
| 			return ctrl.Result{}, err | ||||
| 		case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario.
 | ||||
| 			count := total - ephemeralRunnerSet.Spec.Replicas | ||||
| 			log.Info("Deleting ephemeral runners (scale down)", "count", count) | ||||
| 			if err := r.deleteIdleEphemeralRunners( | ||||
| 				ctx, | ||||
| 				ephemeralRunnerSet, | ||||
| 				ephemeralRunnerState.pending, | ||||
| 				ephemeralRunnerState.running, | ||||
| 				count, | ||||
| 				log, | ||||
| 			); err != nil { | ||||
| 				log.Error(err, "failed to delete idle runners") | ||||
| 				return ctrl.Result{}, err | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	desiredStatus := v1alpha1.EphemeralRunnerSetStatus{ | ||||
| 		CurrentReplicas:         total, | ||||
| 		PendingEphemeralRunners: len(pendingEphemeralRunners), | ||||
| 		RunningEphemeralRunners: len(runningEphemeralRunners), | ||||
| 		FailedEphemeralRunners:  len(failedEphemeralRunners), | ||||
| 		PendingEphemeralRunners: len(ephemeralRunnerState.pending), | ||||
| 		RunningEphemeralRunners: len(ephemeralRunnerState.running), | ||||
| 		FailedEphemeralRunners:  len(ephemeralRunnerState.failed), | ||||
| 	} | ||||
| 
 | ||||
| 	// Update the status if needed.
 | ||||
|  | @ -247,6 +246,21 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R | |||
| 	return ctrl.Result{}, nil | ||||
| } | ||||
| 
 | ||||
| func (r *EphemeralRunnerSetReconciler) cleanupFinishedEphemeralRunners(ctx context.Context, finishedEphemeralRunners []*v1alpha1.EphemeralRunner, log logr.Logger) error { | ||||
| 	// cleanup finished runners and proceed
 | ||||
| 	var errs []error | ||||
| 	for i := range finishedEphemeralRunners { | ||||
| 		log.Info("Deleting finished ephemeral runner", "name", finishedEphemeralRunners[i].Name) | ||||
| 		if err := r.Delete(ctx, finishedEphemeralRunners[i]); err != nil { | ||||
| 			if !kerrors.IsNotFound(err) { | ||||
| 				errs = append(errs, err) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return multierr.Combine(errs...) | ||||
| } | ||||
| 
 | ||||
| func (r *EphemeralRunnerSetReconciler) cleanUpProxySecret(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, log logr.Logger) error { | ||||
| 	if ephemeralRunnerSet.Spec.EphemeralRunnerSpec.Proxy == nil { | ||||
| 		return nil | ||||
|  | @ -284,19 +298,19 @@ func (r *EphemeralRunnerSetReconciler) cleanUpEphemeralRunners(ctx context.Conte | |||
| 		return true, nil | ||||
| 	} | ||||
| 
 | ||||
| 	pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners := categorizeEphemeralRunners(ephemeralRunnerList) | ||||
| 	ephemeralRunnerState := newEphemeralRunnerState(ephemeralRunnerList) | ||||
| 
 | ||||
| 	log.Info("Clean up runner counts", | ||||
| 		"pending", len(pendingEphemeralRunners), | ||||
| 		"running", len(runningEphemeralRunners), | ||||
| 		"finished", len(finishedEphemeralRunners), | ||||
| 		"failed", len(failedEphemeralRunners), | ||||
| 		"deleting", len(deletingEphemeralRunners), | ||||
| 		"pending", len(ephemeralRunnerState.pending), | ||||
| 		"running", len(ephemeralRunnerState.running), | ||||
| 		"finished", len(ephemeralRunnerState.finished), | ||||
| 		"failed", len(ephemeralRunnerState.failed), | ||||
| 		"deleting", len(ephemeralRunnerState.deleting), | ||||
| 	) | ||||
| 
 | ||||
| 	log.Info("Cleanup finished or failed ephemeral runners") | ||||
| 	var errs []error | ||||
| 	for _, ephemeralRunner := range append(finishedEphemeralRunners, failedEphemeralRunners...) { | ||||
| 	for _, ephemeralRunner := range append(ephemeralRunnerState.finished, ephemeralRunnerState.failed...) { | ||||
| 		log.Info("Deleting ephemeral runner", "name", ephemeralRunner.Name) | ||||
| 		if err := r.Delete(ctx, ephemeralRunner); err != nil && !kerrors.IsNotFound(err) { | ||||
| 			errs = append(errs, err) | ||||
|  | @ -310,7 +324,7 @@ func (r *EphemeralRunnerSetReconciler) cleanUpEphemeralRunners(ctx context.Conte | |||
| 	} | ||||
| 
 | ||||
| 	// avoid fetching the client if we have nothing left to do
 | ||||
| 	if len(runningEphemeralRunners) == 0 && len(pendingEphemeralRunners) == 0 { | ||||
| 	if len(ephemeralRunnerState.running) == 0 && len(ephemeralRunnerState.pending) == 0 { | ||||
| 		return false, nil | ||||
| 	} | ||||
| 
 | ||||
|  | @ -321,7 +335,7 @@ func (r *EphemeralRunnerSetReconciler) cleanUpEphemeralRunners(ctx context.Conte | |||
| 
 | ||||
| 	log.Info("Cleanup pending or running ephemeral runners") | ||||
| 	errs = errs[0:0] | ||||
| 	for _, ephemeralRunner := range append(pendingEphemeralRunners, runningEphemeralRunners...) { | ||||
| 	for _, ephemeralRunner := range append(ephemeralRunnerState.pending, ephemeralRunnerState.running...) { | ||||
| 		log.Info("Removing the ephemeral runner from the service", "name", ephemeralRunner.Name) | ||||
| 		_, err := r.deleteEphemeralRunnerWithActionsClient(ctx, ephemeralRunner, actionsClient, log) | ||||
| 		if err != nil { | ||||
|  | @ -427,12 +441,13 @@ func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Co | |||
| 	deletedCount := 0 | ||||
| 	for runners.next() { | ||||
| 		ephemeralRunner := runners.object() | ||||
| 		if ephemeralRunner.Status.RunnerId == 0 { | ||||
| 		isDone := ephemeralRunner.IsDone() | ||||
| 		if !isDone && ephemeralRunner.Status.RunnerId == 0 { | ||||
| 			log.Info("Skipping ephemeral runner since it is not registered yet", "name", ephemeralRunner.Name) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		if ephemeralRunner.Status.JobRequestId > 0 { | ||||
| 		if !isDone && ephemeralRunner.Status.JobRequestId > 0 { | ||||
| 			log.Info("Skipping ephemeral runner since it is running a job", "name", ephemeralRunner.Name, "jobRequestId", ephemeralRunner.Status.JobRequestId) | ||||
| 			continue | ||||
| 		} | ||||
|  | @ -580,16 +595,22 @@ type ephemeralRunnerStepper struct { | |||
| 	index int | ||||
| } | ||||
| 
 | ||||
| func newEphemeralRunnerStepper(pending, running []*v1alpha1.EphemeralRunner) *ephemeralRunnerStepper { | ||||
| 	sort.Slice(pending, func(i, j int) bool { | ||||
| 		return pending[i].GetCreationTimestamp().Time.Before(pending[j].GetCreationTimestamp().Time) | ||||
| 	}) | ||||
| 	sort.Slice(running, func(i, j int) bool { | ||||
| 		return running[i].GetCreationTimestamp().Time.Before(running[j].GetCreationTimestamp().Time) | ||||
| func newEphemeralRunnerStepper(primary []*v1alpha1.EphemeralRunner, othersOrdered ...[]*v1alpha1.EphemeralRunner) *ephemeralRunnerStepper { | ||||
| 	sort.Slice(primary, func(i, j int) bool { | ||||
| 		return primary[i].GetCreationTimestamp().Time.Before(primary[j].GetCreationTimestamp().Time) | ||||
| 	}) | ||||
| 	for _, bucket := range othersOrdered { | ||||
| 		sort.Slice(bucket, func(i, j int) bool { | ||||
| 			return bucket[i].GetCreationTimestamp().Time.Before(bucket[j].GetCreationTimestamp().Time) | ||||
| 		}) | ||||
| 	} | ||||
| 
 | ||||
| 	for _, bucket := range othersOrdered { | ||||
| 		primary = append(primary, bucket...) | ||||
| 	} | ||||
| 
 | ||||
| 	return &ephemeralRunnerStepper{ | ||||
| 		items: append(pending, running...), | ||||
| 		items: primary, | ||||
| 		index: -1, | ||||
| 	} | ||||
| } | ||||
|  | @ -613,28 +634,48 @@ func (s *ephemeralRunnerStepper) len() int { | |||
| 	return len(s.items) | ||||
| } | ||||
| 
 | ||||
| func categorizeEphemeralRunners(ephemeralRunnerList *v1alpha1.EphemeralRunnerList) (pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners []*v1alpha1.EphemeralRunner) { | ||||
| type ephemeralRunnerState struct { | ||||
| 	pending  []*v1alpha1.EphemeralRunner | ||||
| 	running  []*v1alpha1.EphemeralRunner | ||||
| 	finished []*v1alpha1.EphemeralRunner | ||||
| 	failed   []*v1alpha1.EphemeralRunner | ||||
| 	deleting []*v1alpha1.EphemeralRunner | ||||
| 
 | ||||
| 	latestPatchID int | ||||
| } | ||||
| 
 | ||||
| func newEphemeralRunnerState(ephemeralRunnerList *v1alpha1.EphemeralRunnerList) *ephemeralRunnerState { | ||||
| 	var ephemeralRunnerState ephemeralRunnerState | ||||
| 
 | ||||
| 	for i := range ephemeralRunnerList.Items { | ||||
| 		r := &ephemeralRunnerList.Items[i] | ||||
| 		patchID, err := strconv.Atoi(r.Annotations[AnnotationKeyPatchID]) | ||||
| 		if err == nil && patchID > ephemeralRunnerState.latestPatchID { | ||||
| 			ephemeralRunnerState.latestPatchID = patchID | ||||
| 		} | ||||
| 		if !r.ObjectMeta.DeletionTimestamp.IsZero() { | ||||
| 			deletingEphemeralRunners = append(deletingEphemeralRunners, r) | ||||
| 			ephemeralRunnerState.deleting = append(ephemeralRunnerState.deleting, r) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		switch r.Status.Phase { | ||||
| 		case corev1.PodRunning: | ||||
| 			runningEphemeralRunners = append(runningEphemeralRunners, r) | ||||
| 			ephemeralRunnerState.running = append(ephemeralRunnerState.running, r) | ||||
| 		case corev1.PodSucceeded: | ||||
| 			finishedEphemeralRunners = append(finishedEphemeralRunners, r) | ||||
| 			ephemeralRunnerState.finished = append(ephemeralRunnerState.finished, r) | ||||
| 		case corev1.PodFailed: | ||||
| 			failedEphemeralRunners = append(failedEphemeralRunners, r) | ||||
| 			ephemeralRunnerState.failed = append(ephemeralRunnerState.failed, r) | ||||
| 		default: | ||||
| 			// Pending or no phase should be considered as pending.
 | ||||
| 			//
 | ||||
| 			// If field is not set, that means that the EphemeralRunner
 | ||||
| 			// did not yet have chance to update the Status.Phase field.
 | ||||
| 			pendingEphemeralRunners = append(pendingEphemeralRunners, r) | ||||
| 			ephemeralRunnerState.pending = append(ephemeralRunnerState.pending, r) | ||||
| 		} | ||||
| 	} | ||||
| 	return | ||||
| 	return &ephemeralRunnerState | ||||
| } | ||||
| 
 | ||||
| func (s *ephemeralRunnerState) scaleTotal() int { | ||||
| 	return len(s.pending) + len(s.running) + len(s.failed) | ||||
| } | ||||
|  |  | |||
|  | @ -4,6 +4,7 @@ import ( | |||
| 	"context" | ||||
| 	"crypto/tls" | ||||
| 	"encoding/base64" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
|  | @ -274,14 +275,17 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 	}) | ||||
| 
 | ||||
| 	Context("When a new EphemeralRunnerSet scale up and down", func() { | ||||
| 		It("It should delete finished EphemeralRunner and create new EphemeralRunner", func() { | ||||
| 		It("Should scale only on patch ID change", func() { | ||||
| 			created := new(actionsv1alpha1.EphemeralRunnerSet) | ||||
| 			err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, created) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||
| 
 | ||||
| 			patchID := 1 | ||||
| 
 | ||||
| 			// Scale up the EphemeralRunnerSet
 | ||||
| 			updated := created.DeepCopy() | ||||
| 			updated.Spec.Replicas = 5 | ||||
| 			updated.Spec.PatchID = patchID | ||||
| 			err = k8sClient.Update(ctx, updated) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||
| 
 | ||||
|  | @ -317,7 +321,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 					return len(runnerList.Items), nil | ||||
| 				}, | ||||
| 				ephemeralRunnerSetTestTimeout, | ||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") | ||||
| 				ephemeralRunnerSetTestInterval, | ||||
| 			).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") | ||||
| 
 | ||||
| 			// Mark one of the EphemeralRunner as finished
 | ||||
| 			finishedRunner := runnerList.Items[4].DeepCopy() | ||||
|  | @ -325,7 +330,7 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 			err = k8sClient.Status().Patch(ctx, finishedRunner, client.MergeFrom(&runnerList.Items[4])) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | ||||
| 
 | ||||
| 			// Wait for the finished EphemeralRunner to be deleted
 | ||||
| 			// Wait for the finished EphemeralRunner to be set to succeeded
 | ||||
| 			Eventually( | ||||
| 				func() error { | ||||
| 					runnerList := new(actionsv1alpha1.EphemeralRunnerList) | ||||
|  | @ -335,17 +340,35 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 					} | ||||
| 
 | ||||
| 					for _, runner := range runnerList.Items { | ||||
| 						if runner.Name == finishedRunner.Name { | ||||
| 							return fmt.Errorf("EphemeralRunner is not deleted") | ||||
| 						if runner.Name != finishedRunner.Name { | ||||
| 							continue | ||||
| 						} | ||||
| 
 | ||||
| 						if runner.Status.Phase != corev1.PodSucceeded { | ||||
| 							return fmt.Errorf("EphemeralRunner is not finished") | ||||
| 						} | ||||
| 						// found pod succeeded
 | ||||
| 						return nil | ||||
| 					} | ||||
| 
 | ||||
| 					return nil | ||||
| 					return errors.New("Finished ephemeral runner is not found") | ||||
| 				}, | ||||
| 				ephemeralRunnerSetTestTimeout, | ||||
| 				ephemeralRunnerSetTestInterval).Should(Succeed(), "Finished EphemeralRunner should be deleted") | ||||
| 				ephemeralRunnerSetTestInterval, | ||||
| 			).Should(Succeed(), "Finished EphemeralRunner should be deleted") | ||||
| 
 | ||||
| 			// We should still have the EphemeralRunnerSet scale up
 | ||||
| 			// After one ephemeral runner is finished, simulate job done patch
 | ||||
| 			patchID++ | ||||
| 			original := new(actionsv1alpha1.EphemeralRunnerSet) | ||||
| 			err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||
| 			updated = original.DeepCopy() | ||||
| 			updated.Spec.PatchID = patchID | ||||
| 			updated.Spec.Replicas = 4 | ||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||
| 
 | ||||
| 			// Only finished ephemeral runner should be deleted
 | ||||
| 			runnerList = new(actionsv1alpha1.EphemeralRunnerList) | ||||
| 			Eventually( | ||||
| 				func() (int, error) { | ||||
|  | @ -354,35 +377,27 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 						return -1, err | ||||
| 					} | ||||
| 
 | ||||
| 					// Set status to simulate a configured EphemeralRunner
 | ||||
| 					refetch := false | ||||
| 					for i, runner := range runnerList.Items { | ||||
| 						if runner.Status.RunnerId == 0 { | ||||
| 							updatedRunner := runner.DeepCopy() | ||||
| 							updatedRunner.Status.Phase = corev1.PodRunning | ||||
| 							updatedRunner.Status.RunnerId = i + 100 | ||||
| 							err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner)) | ||||
| 							Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | ||||
| 							refetch = true | ||||
| 						} | ||||
| 					} | ||||
| 
 | ||||
| 					if refetch { | ||||
| 						err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) | ||||
| 						if err != nil { | ||||
| 							return -1, err | ||||
| 					for _, runner := range runnerList.Items { | ||||
| 						if runner.Status.Phase == corev1.PodSucceeded { | ||||
| 							return -1, fmt.Errorf("Finished EphemeralRunner should be deleted") | ||||
| 						} | ||||
| 					} | ||||
| 
 | ||||
| 					return len(runnerList.Items), nil | ||||
| 				}, | ||||
| 				ephemeralRunnerSetTestTimeout, | ||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") | ||||
| 				ephemeralRunnerSetTestInterval, | ||||
| 			).Should(BeEquivalentTo(4), "4 EphemeralRunner should be created") | ||||
| 
 | ||||
| 			// Scale down the EphemeralRunnerSet
 | ||||
| 			updated = created.DeepCopy() | ||||
| 			// Scaling down the EphemeralRunnerSet
 | ||||
| 			patchID++ | ||||
| 			original = new(actionsv1alpha1.EphemeralRunnerSet) | ||||
| 			err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||
| 			updated = original.DeepCopy() | ||||
| 			updated.Spec.PatchID = patchID | ||||
| 			updated.Spec.Replicas = 3 | ||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(created)) | ||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||
| 
 | ||||
| 			// Wait for the EphemeralRunnerSet to be scaled down
 | ||||
|  | @ -417,7 +432,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 					return len(runnerList.Items), nil | ||||
| 				}, | ||||
| 				ephemeralRunnerSetTestTimeout, | ||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(3), "3 EphemeralRunner should be created") | ||||
| 				ephemeralRunnerSetTestInterval, | ||||
| 			).Should(BeEquivalentTo(3), "3 EphemeralRunner should be created") | ||||
| 
 | ||||
| 			// We will not scale down runner that is running jobs
 | ||||
| 			runningRunner := runnerList.Items[0].DeepCopy() | ||||
|  | @ -430,10 +446,15 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 			err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[1])) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | ||||
| 
 | ||||
| 			// Scale down to 1
 | ||||
| 			updated = created.DeepCopy() | ||||
| 			// Scale down to 1 while 2 are running
 | ||||
| 			patchID++ | ||||
| 			original = new(actionsv1alpha1.EphemeralRunnerSet) | ||||
| 			err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||
| 			updated = original.DeepCopy() | ||||
| 			updated.Spec.PatchID = patchID | ||||
| 			updated.Spec.Replicas = 1 | ||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(created)) | ||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||
| 
 | ||||
| 			// Wait for the EphemeralRunnerSet to be scaled down to 2 since we still have 2 runner running jobs
 | ||||
|  | @ -468,7 +489,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 					return len(runnerList.Items), nil | ||||
| 				}, | ||||
| 				ephemeralRunnerSetTestTimeout, | ||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") | ||||
| 				ephemeralRunnerSetTestInterval, | ||||
| 			).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") | ||||
| 
 | ||||
| 			// We will not scale down failed runner
 | ||||
| 			failedRunner := runnerList.Items[0].DeepCopy() | ||||
|  | @ -476,15 +498,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 			err = k8sClient.Status().Patch(ctx, failedRunner, client.MergeFrom(&runnerList.Items[0])) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | ||||
| 
 | ||||
| 			// Scale down to 0
 | ||||
| 			updated = created.DeepCopy() | ||||
| 			updated.Spec.Replicas = 0 | ||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(created)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||
| 
 | ||||
| 			// We should not scale down the EphemeralRunnerSet since we still have 1 runner running job and 1 failed runner
 | ||||
| 			runnerList = new(actionsv1alpha1.EphemeralRunnerList) | ||||
| 			Consistently( | ||||
| 			Eventually( | ||||
| 				func() (int, error) { | ||||
| 					err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) | ||||
| 					if err != nil { | ||||
|  | @ -514,7 +529,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 					return len(runnerList.Items), nil | ||||
| 				}, | ||||
| 				ephemeralRunnerSetTestTimeout, | ||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") | ||||
| 				ephemeralRunnerSetTestInterval, | ||||
| 			).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") | ||||
| 
 | ||||
| 			// We will scale down to 0 when the running job is completed and the failed runner is deleted
 | ||||
| 			runningRunner = runnerList.Items[1].DeepCopy() | ||||
|  | @ -525,6 +541,17 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 			err = k8sClient.Delete(ctx, &runnerList.Items[0]) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to delete EphemeralRunner") | ||||
| 
 | ||||
| 			// Scale down to 0 while 1 ephemeral runner is failed
 | ||||
| 			patchID++ | ||||
| 			original = new(actionsv1alpha1.EphemeralRunnerSet) | ||||
| 			err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||
| 			updated = original.DeepCopy() | ||||
| 			updated.Spec.PatchID = patchID | ||||
| 			updated.Spec.Replicas = 0 | ||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) | ||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||
| 
 | ||||
| 			// Wait for the EphemeralRunnerSet to be scaled down to 0
 | ||||
| 			runnerList = new(actionsv1alpha1.EphemeralRunnerList) | ||||
| 			Eventually( | ||||
|  | @ -557,7 +584,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | |||
| 					return len(runnerList.Items), nil | ||||
| 				}, | ||||
| 				ephemeralRunnerSetTestTimeout, | ||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(0), "0 EphemeralRunner should be created") | ||||
| 				ephemeralRunnerSetTestInterval, | ||||
| 			).Should(BeEquivalentTo(0), "0 EphemeralRunner should be created") | ||||
| 		}) | ||||
| 
 | ||||
| 		It("Should update status on Ephemeral Runner state changes", func() { | ||||
|  |  | |||
|  | @ -91,7 +91,11 @@ func (b *resourceBuilder) newAutoScalingListener(autoscalingRunnerSet *v1alpha1. | |||
| 		LabelKeyKubernetesPartOf:        labelValueKubernetesPartOf, | ||||
| 		LabelKeyKubernetesComponent:     "runner-scale-set-listener", | ||||
| 		LabelKeyKubernetesVersion:       autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion], | ||||
| 		labelKeyRunnerSpecHash:          autoscalingRunnerSet.ListenerSpecHash(), | ||||
| 	} | ||||
| 
 | ||||
| 	annotations := map[string]string{ | ||||
| 		annotationKeyRunnerSpecHash: autoscalingRunnerSet.ListenerSpecHash(), | ||||
| 		annotationKeyValuesHash:     autoscalingRunnerSet.Annotations[annotationKeyValuesHash], | ||||
| 	} | ||||
| 
 | ||||
| 	if err := applyGitHubURLLabels(autoscalingRunnerSet.Spec.GitHubConfigUrl, labels); err != nil { | ||||
|  | @ -100,9 +104,10 @@ func (b *resourceBuilder) newAutoScalingListener(autoscalingRunnerSet *v1alpha1. | |||
| 
 | ||||
| 	autoscalingListener := &v1alpha1.AutoscalingListener{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      scaleSetListenerName(autoscalingRunnerSet), | ||||
| 			Namespace: namespace, | ||||
| 			Labels:    labels, | ||||
| 			Name:        scaleSetListenerName(autoscalingRunnerSet), | ||||
| 			Namespace:   namespace, | ||||
| 			Labels:      labels, | ||||
| 			Annotations: annotations, | ||||
| 		}, | ||||
| 		Spec: v1alpha1.AutoscalingListenerSpec{ | ||||
| 			GitHubConfigUrl:               autoscalingRunnerSet.Spec.GitHubConfigUrl, | ||||
|  | @ -498,7 +503,6 @@ func (b *resourceBuilder) newEphemeralRunnerSet(autoscalingRunnerSet *v1alpha1.A | |||
| 	runnerSpecHash := autoscalingRunnerSet.RunnerSetSpecHash() | ||||
| 
 | ||||
| 	labels := map[string]string{ | ||||
| 		labelKeyRunnerSpecHash:          runnerSpecHash, | ||||
| 		LabelKeyKubernetesPartOf:        labelValueKubernetesPartOf, | ||||
| 		LabelKeyKubernetesComponent:     "runner-set", | ||||
| 		LabelKeyKubernetesVersion:       autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion], | ||||
|  | @ -511,7 +515,10 @@ func (b *resourceBuilder) newEphemeralRunnerSet(autoscalingRunnerSet *v1alpha1.A | |||
| 	} | ||||
| 
 | ||||
| 	newAnnotations := map[string]string{ | ||||
| 		AnnotationKeyGitHubRunnerGroupName: autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerGroupName], | ||||
| 
 | ||||
| 		AnnotationKeyGitHubRunnerGroupName:    autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerGroupName], | ||||
| 		AnnotationKeyGitHubRunnerScaleSetName: autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerScaleSetName], | ||||
| 		annotationKeyRunnerSpecHash:           runnerSpecHash, | ||||
| 	} | ||||
| 
 | ||||
| 	newEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{ | ||||
|  | @ -556,6 +563,7 @@ func (b *resourceBuilder) newEphemeralRunner(ephemeralRunnerSet *v1alpha1.Epheme | |||
| 	for key, val := range ephemeralRunnerSet.Annotations { | ||||
| 		annotations[key] = val | ||||
| 	} | ||||
| 	annotations[AnnotationKeyPatchID] = strconv.Itoa(ephemeralRunnerSet.Spec.PatchID) | ||||
| 	return &v1alpha1.EphemeralRunner{ | ||||
| 		TypeMeta: metav1.TypeMeta{}, | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
|  |  | |||
|  | @ -23,8 +23,9 @@ func TestLabelPropagation(t *testing.T) { | |||
| 				LabelKeyKubernetesVersion: "0.2.0", | ||||
| 			}, | ||||
| 			Annotations: map[string]string{ | ||||
| 				runnerScaleSetIdAnnotationKey:      "1", | ||||
| 				AnnotationKeyGitHubRunnerGroupName: "test-group", | ||||
| 				runnerScaleSetIdAnnotationKey:         "1", | ||||
| 				AnnotationKeyGitHubRunnerGroupName:    "test-group", | ||||
| 				AnnotationKeyGitHubRunnerScaleSetName: "test-scale-set", | ||||
| 			}, | ||||
| 		}, | ||||
| 		Spec: v1alpha1.AutoscalingRunnerSetSpec{ | ||||
|  | @ -38,20 +39,21 @@ func TestLabelPropagation(t *testing.T) { | |||
| 	assert.Equal(t, labelValueKubernetesPartOf, ephemeralRunnerSet.Labels[LabelKeyKubernetesPartOf]) | ||||
| 	assert.Equal(t, "runner-set", ephemeralRunnerSet.Labels[LabelKeyKubernetesComponent]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion], ephemeralRunnerSet.Labels[LabelKeyKubernetesVersion]) | ||||
| 	assert.NotEmpty(t, ephemeralRunnerSet.Labels[labelKeyRunnerSpecHash]) | ||||
| 	assert.NotEmpty(t, ephemeralRunnerSet.Annotations[annotationKeyRunnerSpecHash]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Name, ephemeralRunnerSet.Labels[LabelKeyGitHubScaleSetName]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Namespace, ephemeralRunnerSet.Labels[LabelKeyGitHubScaleSetNamespace]) | ||||
| 	assert.Equal(t, "", ephemeralRunnerSet.Labels[LabelKeyGitHubEnterprise]) | ||||
| 	assert.Equal(t, "org", ephemeralRunnerSet.Labels[LabelKeyGitHubOrganization]) | ||||
| 	assert.Equal(t, "repo", ephemeralRunnerSet.Labels[LabelKeyGitHubRepository]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerGroupName], ephemeralRunnerSet.Annotations[AnnotationKeyGitHubRunnerGroupName]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerScaleSetName], ephemeralRunnerSet.Annotations[AnnotationKeyGitHubRunnerScaleSetName]) | ||||
| 
 | ||||
| 	listener, err := b.newAutoScalingListener(&autoscalingRunnerSet, ephemeralRunnerSet, autoscalingRunnerSet.Namespace, "test:latest", nil) | ||||
| 	require.NoError(t, err) | ||||
| 	assert.Equal(t, labelValueKubernetesPartOf, listener.Labels[LabelKeyKubernetesPartOf]) | ||||
| 	assert.Equal(t, "runner-scale-set-listener", listener.Labels[LabelKeyKubernetesComponent]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Labels[LabelKeyKubernetesVersion], listener.Labels[LabelKeyKubernetesVersion]) | ||||
| 	assert.NotEmpty(t, ephemeralRunnerSet.Labels[labelKeyRunnerSpecHash]) | ||||
| 	assert.NotEmpty(t, ephemeralRunnerSet.Annotations[annotationKeyRunnerSpecHash]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Name, listener.Labels[LabelKeyGitHubScaleSetName]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Namespace, listener.Labels[LabelKeyGitHubScaleSetNamespace]) | ||||
| 	assert.Equal(t, "", listener.Labels[LabelKeyGitHubEnterprise]) | ||||
|  | @ -83,6 +85,7 @@ func TestLabelPropagation(t *testing.T) { | |||
| 	} | ||||
| 	assert.Equal(t, "runner", ephemeralRunner.Labels[LabelKeyKubernetesComponent]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerGroupName], ephemeralRunner.Annotations[AnnotationKeyGitHubRunnerGroupName]) | ||||
| 	assert.Equal(t, autoscalingRunnerSet.Annotations[AnnotationKeyGitHubRunnerScaleSetName], ephemeralRunnerSet.Annotations[AnnotationKeyGitHubRunnerScaleSetName]) | ||||
| 
 | ||||
| 	runnerSecret := &corev1.Secret{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
|  | @ -109,8 +112,9 @@ func TestGitHubURLTrimLabelValues(t *testing.T) { | |||
| 				LabelKeyKubernetesVersion: "0.2.0", | ||||
| 			}, | ||||
| 			Annotations: map[string]string{ | ||||
| 				runnerScaleSetIdAnnotationKey:      "1", | ||||
| 				AnnotationKeyGitHubRunnerGroupName: "test-group", | ||||
| 				runnerScaleSetIdAnnotationKey:         "1", | ||||
| 				AnnotationKeyGitHubRunnerGroupName:    "test-group", | ||||
| 				AnnotationKeyGitHubRunnerScaleSetName: "test-scale-set", | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue