Fix overscaling when the controller is much faster then the listener (#3371)
Co-authored-by: Francesco Renzi <rentziass@gmail.com>
This commit is contained in:
		
							parent
							
								
									46cfbb6ec7
								
							
						
					
					
						commit
						7a643a5107
					
				|  | @ -42,6 +42,10 @@ type EphemeralRunner struct { | ||||||
| 	Status EphemeralRunnerStatus `json:"status,omitempty"` | 	Status EphemeralRunnerStatus `json:"status,omitempty"` | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (er *EphemeralRunner) IsDone() bool { | ||||||
|  | 	return er.Status.Phase == corev1.PodSucceeded || er.Status.Phase == corev1.PodFailed | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // EphemeralRunnerSpec defines the desired state of EphemeralRunner
 | // EphemeralRunnerSpec defines the desired state of EphemeralRunner
 | ||||||
| type EphemeralRunnerSpec struct { | type EphemeralRunnerSpec struct { | ||||||
| 	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
 | 	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
 | ||||||
|  |  | ||||||
|  | @ -24,6 +24,8 @@ import ( | ||||||
| type EphemeralRunnerSetSpec struct { | type EphemeralRunnerSetSpec struct { | ||||||
| 	// Replicas is the number of desired EphemeralRunner resources in the k8s namespace.
 | 	// Replicas is the number of desired EphemeralRunner resources in the k8s namespace.
 | ||||||
| 	Replicas int `json:"replicas,omitempty"` | 	Replicas int `json:"replicas,omitempty"` | ||||||
|  | 	// PatchID is the unique identifier for the patch issued by the listener app
 | ||||||
|  | 	PatchID int `json:"patchID"` | ||||||
| 
 | 
 | ||||||
| 	EphemeralRunnerSpec EphemeralRunnerSpec `json:"ephemeralRunnerSpec,omitempty"` | 	EphemeralRunnerSpec EphemeralRunnerSpec `json:"ephemeralRunnerSpec,omitempty"` | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -6957,9 +6957,14 @@ spec: | ||||||
|                         - containers |                         - containers | ||||||
|                       type: object |                       type: object | ||||||
|                   type: object |                   type: object | ||||||
|  |                 patchID: | ||||||
|  |                   description: PatchID is the unique identifier for the patch issued by the listener app | ||||||
|  |                   type: integer | ||||||
|                 replicas: |                 replicas: | ||||||
|                   description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace. |                   description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace. | ||||||
|                   type: integer |                   type: integer | ||||||
|  |               required: | ||||||
|  |                 - patchID | ||||||
|               type: object |               type: object | ||||||
|             status: |             status: | ||||||
|               description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet |               description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet | ||||||
|  |  | ||||||
|  | @ -34,7 +34,7 @@ type Listener interface { | ||||||
| //go:generate mockery --name Worker --output ./mocks --outpkg mocks --case underscore
 | //go:generate mockery --name Worker --output ./mocks --outpkg mocks --case underscore
 | ||||||
| type Worker interface { | type Worker interface { | ||||||
| 	HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error | 	HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error | ||||||
| 	HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) | 	HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func New(config config.Config) (*App, error) { | func New(config config.Config) (*App, error) { | ||||||
|  |  | ||||||
|  | @ -15,23 +15,23 @@ type Worker struct { | ||||||
| 	mock.Mock | 	mock.Mock | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // HandleDesiredRunnerCount provides a mock function with given fields: ctx, count
 | // HandleDesiredRunnerCount provides a mock function with given fields: ctx, count, acquireCount
 | ||||||
| func (_m *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { | func (_m *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, acquireCount int) (int, error) { | ||||||
| 	ret := _m.Called(ctx, count) | 	ret := _m.Called(ctx, count, acquireCount) | ||||||
| 
 | 
 | ||||||
| 	var r0 int | 	var r0 int | ||||||
| 	var r1 error | 	var r1 error | ||||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int) (int, error)); ok { | 	if rf, ok := ret.Get(0).(func(context.Context, int, int) (int, error)); ok { | ||||||
| 		return rf(ctx, count) | 		return rf(ctx, count, acquireCount) | ||||||
| 	} | 	} | ||||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int) int); ok { | 	if rf, ok := ret.Get(0).(func(context.Context, int, int) int); ok { | ||||||
| 		r0 = rf(ctx, count) | 		r0 = rf(ctx, count, acquireCount) | ||||||
| 	} else { | 	} else { | ||||||
| 		r0 = ret.Get(0).(int) | 		r0 = ret.Get(0).(int) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if rf, ok := ret.Get(1).(func(context.Context, int) error); ok { | 	if rf, ok := ret.Get(1).(func(context.Context, int, int) error); ok { | ||||||
| 		r1 = rf(ctx, count) | 		r1 = rf(ctx, count, acquireCount) | ||||||
| 	} else { | 	} else { | ||||||
| 		r1 = ret.Error(1) | 		r1 = ret.Error(1) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -114,7 +114,7 @@ func New(config Config) (*Listener, error) { | ||||||
| //go:generate mockery --name Handler --output ./mocks --outpkg mocks --case underscore
 | //go:generate mockery --name Handler --output ./mocks --outpkg mocks --case underscore
 | ||||||
| type Handler interface { | type Handler interface { | ||||||
| 	HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error | 	HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error | ||||||
| 	HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) | 	HandleDesiredRunnerCount(ctx context.Context, count, jobsCompleted int) (int, error) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Listen listens for incoming messages and handles them using the provided handler.
 | // Listen listens for incoming messages and handles them using the provided handler.
 | ||||||
|  | @ -145,7 +145,7 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error { | ||||||
| 	} | 	} | ||||||
| 	l.metrics.PublishStatistics(initialMessage.Statistics) | 	l.metrics.PublishStatistics(initialMessage.Statistics) | ||||||
| 
 | 
 | ||||||
| 	desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, initialMessage.Statistics.TotalAssignedJobs) | 	desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, initialMessage.Statistics.TotalAssignedJobs, 0) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("handling initial message failed: %w", err) | 		return fmt.Errorf("handling initial message failed: %w", err) | ||||||
| 	} | 	} | ||||||
|  | @ -207,7 +207,7 @@ func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *acti | ||||||
| 		l.metrics.PublishJobStarted(jobStarted) | 		l.metrics.PublishJobStarted(jobStarted) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, parsedMsg.statistics.TotalAssignedJobs) | 	desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, parsedMsg.statistics.TotalAssignedJobs, len(parsedMsg.jobsCompleted)) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("failed to handle desired runner count: %w", err) | 		return fmt.Errorf("failed to handle desired runner count: %w", err) | ||||||
| 	} | 	} | ||||||
|  | @ -284,7 +284,6 @@ func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessa | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return msg, nil | 	return msg, nil | ||||||
| 
 |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (l *Listener) deleteLastMessage(ctx context.Context) error { | func (l *Listener) deleteLastMessage(ctx context.Context) error { | ||||||
|  |  | ||||||
|  | @ -427,7 +427,7 @@ func TestListener_Listen(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 		var called bool | 		var called bool | ||||||
| 		handler := listenermocks.NewHandler(t) | 		handler := listenermocks.NewHandler(t) | ||||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). | 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0). | ||||||
| 			Return(0, nil). | 			Return(0, nil). | ||||||
| 			Run( | 			Run( | ||||||
| 				func(mock.Arguments) { | 				func(mock.Arguments) { | ||||||
|  | @ -485,11 +485,11 @@ func TestListener_Listen(t *testing.T) { | ||||||
| 		config.Client = client | 		config.Client = client | ||||||
| 
 | 
 | ||||||
| 		handler := listenermocks.NewHandler(t) | 		handler := listenermocks.NewHandler(t) | ||||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). | 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0). | ||||||
| 			Return(0, nil). | 			Return(0, nil). | ||||||
| 			Once() | 			Once() | ||||||
| 
 | 
 | ||||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). | 		handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0). | ||||||
| 			Return(0, nil). | 			Return(0, nil). | ||||||
| 			Once() | 			Once() | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -86,7 +86,7 @@ func TestInitialMetrics(t *testing.T) { | ||||||
| 		config.Client = client | 		config.Client = client | ||||||
| 
 | 
 | ||||||
| 		handler := listenermocks.NewHandler(t) | 		handler := listenermocks.NewHandler(t) | ||||||
| 		handler.On("HandleDesiredRunnerCount", mock.Anything, sessionStatistics.TotalAssignedJobs). | 		handler.On("HandleDesiredRunnerCount", mock.Anything, sessionStatistics.TotalAssignedJobs, 0). | ||||||
| 			Return(sessionStatistics.TotalAssignedJobs, nil). | 			Return(sessionStatistics.TotalAssignedJobs, nil). | ||||||
| 			Once() | 			Once() | ||||||
| 
 | 
 | ||||||
|  | @ -178,7 +178,7 @@ func TestHandleMessageMetrics(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	handler := listenermocks.NewHandler(t) | 	handler := listenermocks.NewHandler(t) | ||||||
| 	handler.On("HandleJobStarted", mock.Anything, jobsStarted[0]).Return(nil).Once() | 	handler.On("HandleJobStarted", mock.Anything, jobsStarted[0]).Return(nil).Once() | ||||||
| 	handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).Return(desiredResult, nil).Once() | 	handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 2).Return(desiredResult, nil).Once() | ||||||
| 
 | 
 | ||||||
| 	client := listenermocks.NewClient(t) | 	client := listenermocks.NewClient(t) | ||||||
| 	client.On("DeleteMessage", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() | 	client.On("DeleteMessage", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() | ||||||
|  |  | ||||||
|  | @ -15,23 +15,23 @@ type Handler struct { | ||||||
| 	mock.Mock | 	mock.Mock | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // HandleDesiredRunnerCount provides a mock function with given fields: ctx, count
 | // HandleDesiredRunnerCount provides a mock function with given fields: ctx, count, jobsCompleted
 | ||||||
| func (_m *Handler) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { | func (_m *Handler) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) { | ||||||
| 	ret := _m.Called(ctx, count) | 	ret := _m.Called(ctx, count, jobsCompleted) | ||||||
| 
 | 
 | ||||||
| 	var r0 int | 	var r0 int | ||||||
| 	var r1 error | 	var r1 error | ||||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int) (int, error)); ok { | 	if rf, ok := ret.Get(0).(func(context.Context, int, int) (int, error)); ok { | ||||||
| 		return rf(ctx, count) | 		return rf(ctx, count, jobsCompleted) | ||||||
| 	} | 	} | ||||||
| 	if rf, ok := ret.Get(0).(func(context.Context, int) int); ok { | 	if rf, ok := ret.Get(0).(func(context.Context, int, int) int); ok { | ||||||
| 		r0 = rf(ctx, count) | 		r0 = rf(ctx, count, jobsCompleted) | ||||||
| 	} else { | 	} else { | ||||||
| 		r0 = ret.Get(0).(int) | 		r0 = ret.Get(0).(int) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if rf, ok := ret.Get(1).(func(context.Context, int) error); ok { | 	if rf, ok := ret.Get(1).(func(context.Context, int, int) error); ok { | ||||||
| 		r1 = rf(ctx, count) | 		r1 = rf(ctx, count, jobsCompleted) | ||||||
| 	} else { | 	} else { | ||||||
| 		r1 = ret.Error(1) | 		r1 = ret.Error(1) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -41,6 +41,7 @@ type Worker struct { | ||||||
| 	clientset   *kubernetes.Clientset | 	clientset   *kubernetes.Clientset | ||||||
| 	config      Config | 	config      Config | ||||||
| 	lastPatch   int | 	lastPatch   int | ||||||
|  | 	lastPatchID int | ||||||
| 	logger      *logr.Logger | 	logger      *logr.Logger | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -50,6 +51,7 @@ func New(config Config, options ...Option) (*Worker, error) { | ||||||
| 	w := &Worker{ | 	w := &Worker{ | ||||||
| 		config:      config, | 		config:      config, | ||||||
| 		lastPatch:   -1, | 		lastPatch:   -1, | ||||||
|  | 		lastPatchID: -1, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	conf, err := rest.InClusterConfig() | 	conf, err := rest.InClusterConfig() | ||||||
|  | @ -161,7 +163,7 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart | ||||||
| // The function then scales the ephemeral runner set by applying the merge patch.
 | // The function then scales the ephemeral runner set by applying the merge patch.
 | ||||||
| // Finally, it logs the scaled ephemeral runner set details and returns nil if successful.
 | // Finally, it logs the scaled ephemeral runner set details and returns nil if successful.
 | ||||||
| // If any error occurs during the process, it returns an error with a descriptive message.
 | // If any error occurs during the process, it returns an error with a descriptive message.
 | ||||||
| func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { | func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) { | ||||||
| 	// Max runners should always be set by the resource builder either to the configured value,
 | 	// Max runners should always be set by the resource builder either to the configured value,
 | ||||||
| 	// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
 | 	// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
 | ||||||
| 	targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners) | 	targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners) | ||||||
|  | @ -172,17 +174,22 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, | ||||||
| 		"min", w.config.MinRunners, | 		"min", w.config.MinRunners, | ||||||
| 		"max", w.config.MaxRunners, | 		"max", w.config.MaxRunners, | ||||||
| 		"currentRunnerCount", w.lastPatch, | 		"currentRunnerCount", w.lastPatch, | ||||||
|  | 		"jobsCompleted", jobsCompleted, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if targetRunnerCount == w.lastPatch { | 	if w.lastPatch == targetRunnerCount && jobsCompleted == 0 { | ||||||
| 		w.logger.Info("Skipping patching of EphemeralRunnerSet as the desired count has not changed", logValues...) | 		w.logger.Info("Skipping patch", logValues...) | ||||||
| 		return targetRunnerCount, nil | 		return targetRunnerCount, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	w.lastPatchID++ | ||||||
|  | 	w.lastPatch = targetRunnerCount | ||||||
|  | 
 | ||||||
| 	original, err := json.Marshal( | 	original, err := json.Marshal( | ||||||
| 		&v1alpha1.EphemeralRunnerSet{ | 		&v1alpha1.EphemeralRunnerSet{ | ||||||
| 			Spec: v1alpha1.EphemeralRunnerSetSpec{ | 			Spec: v1alpha1.EphemeralRunnerSetSpec{ | ||||||
| 				Replicas: -1, | 				Replicas: -1, | ||||||
|  | 				PatchID:  -1, | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
| 	) | 	) | ||||||
|  | @ -194,6 +201,7 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, | ||||||
| 		&v1alpha1.EphemeralRunnerSet{ | 		&v1alpha1.EphemeralRunnerSet{ | ||||||
| 			Spec: v1alpha1.EphemeralRunnerSetSpec{ | 			Spec: v1alpha1.EphemeralRunnerSetSpec{ | ||||||
| 				Replicas: targetRunnerCount, | 				Replicas: targetRunnerCount, | ||||||
|  | 				PatchID:  w.lastPatchID, | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
|  | @ -6957,9 +6957,14 @@ spec: | ||||||
|                         - containers |                         - containers | ||||||
|                       type: object |                       type: object | ||||||
|                   type: object |                   type: object | ||||||
|  |                 patchID: | ||||||
|  |                   description: PatchID is the unique identifier for the patch issued by the listener app | ||||||
|  |                   type: integer | ||||||
|                 replicas: |                 replicas: | ||||||
|                   description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace. |                   description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace. | ||||||
|                   type: integer |                   type: integer | ||||||
|  |               required: | ||||||
|  |                 - patchID | ||||||
|               type: object |               type: object | ||||||
|             status: |             status: | ||||||
|               description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet |               description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet | ||||||
|  |  | ||||||
|  | @ -42,6 +42,7 @@ const AutoscalingRunnerSetCleanupFinalizerName = "actions.github.com/cleanup-pro | ||||||
| const ( | const ( | ||||||
| 	AnnotationKeyGitHubRunnerGroupName    = "actions.github.com/runner-group-name" | 	AnnotationKeyGitHubRunnerGroupName    = "actions.github.com/runner-group-name" | ||||||
| 	AnnotationKeyGitHubRunnerScaleSetName = "actions.github.com/runner-scale-set-name" | 	AnnotationKeyGitHubRunnerScaleSetName = "actions.github.com/runner-scale-set-name" | ||||||
|  | 	AnnotationKeyPatchID                  = "actions.github.com/patch-id" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // Labels applied to listener roles
 | // Labels applied to listener roles
 | ||||||
|  |  | ||||||
|  | @ -133,6 +133,23 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ | ||||||
| 		return ctrl.Result{}, nil | 		return ctrl.Result{}, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if ephemeralRunner.IsDone() { | ||||||
|  | 		log.Info("Cleaning up resources after after ephemeral runner termination", "phase", ephemeralRunner.Status.Phase) | ||||||
|  | 		done, err := r.cleanupResources(ctx, ephemeralRunner, log) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Error(err, "Failed to clean up ephemeral runner owned resources") | ||||||
|  | 			return ctrl.Result{}, err | ||||||
|  | 		} | ||||||
|  | 		if !done { | ||||||
|  | 			log.Info("Waiting for ephemeral runner owned resources to be deleted") | ||||||
|  | 			return ctrl.Result{Requeue: true}, nil | ||||||
|  | 		} | ||||||
|  | 		// Stop reconciling on this object.
 | ||||||
|  | 		// The EphemeralRunnerSet is responsible for cleaning it up.
 | ||||||
|  | 		log.Info("EphemeralRunner has already finished. Stopping reconciliation and waiting for EphemeralRunnerSet to clean it up", "phase", ephemeralRunner.Status.Phase) | ||||||
|  | 		return ctrl.Result{}, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	if !controllerutil.ContainsFinalizer(ephemeralRunner, ephemeralRunnerActionsFinalizerName) { | 	if !controllerutil.ContainsFinalizer(ephemeralRunner, ephemeralRunnerActionsFinalizerName) { | ||||||
| 		log.Info("Adding runner registration finalizer") | 		log.Info("Adding runner registration finalizer") | ||||||
| 		err := patch(ctx, r.Client, ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) { | 		err := patch(ctx, r.Client, ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) { | ||||||
|  | @ -159,13 +176,6 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ | ||||||
| 		return ctrl.Result{}, nil | 		return ctrl.Result{}, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if ephemeralRunner.Status.Phase == corev1.PodSucceeded || ephemeralRunner.Status.Phase == corev1.PodFailed { |  | ||||||
| 		// Stop reconciling on this object.
 |  | ||||||
| 		// The EphemeralRunnerSet is responsible for cleaning it up.
 |  | ||||||
| 		log.Info("EphemeralRunner has already finished. Stopping reconciliation and waiting for EphemeralRunnerSet to clean it up", "phase", ephemeralRunner.Status.Phase) |  | ||||||
| 		return ctrl.Result{}, nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if ephemeralRunner.Status.RunnerId == 0 { | 	if ephemeralRunner.Status.RunnerId == 0 { | ||||||
| 		log.Info("Creating new ephemeral runner registration and updating status with runner config") | 		log.Info("Creating new ephemeral runner registration and updating status with runner config") | ||||||
| 		return r.updateStatusWithRunnerConfig(ctx, ephemeralRunner, log) | 		return r.updateStatusWithRunnerConfig(ctx, ephemeralRunner, log) | ||||||
|  | @ -324,7 +334,7 @@ func (r *EphemeralRunnerReconciler) cleanupResources(ctx context.Context, epheme | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		return false, nil | 		return false, nil | ||||||
| 	case err != nil && !kerrors.IsNotFound(err): | 	case !kerrors.IsNotFound(err): | ||||||
| 		return false, err | 		return false, err | ||||||
| 	} | 	} | ||||||
| 	log.Info("Pod is deleted") | 	log.Info("Pod is deleted") | ||||||
|  | @ -341,7 +351,7 @@ func (r *EphemeralRunnerReconciler) cleanupResources(ctx context.Context, epheme | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		return false, nil | 		return false, nil | ||||||
| 	case err != nil && !kerrors.IsNotFound(err): | 	case !kerrors.IsNotFound(err): | ||||||
| 		return false, err | 		return false, err | ||||||
| 	} | 	} | ||||||
| 	log.Info("Secret is deleted") | 	log.Info("Secret is deleted") | ||||||
|  |  | ||||||
|  | @ -22,6 +22,7 @@ import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"sort" | 	"sort" | ||||||
|  | 	"strconv" | ||||||
| 	"strings" | 	"strings" | ||||||
| 
 | 
 | ||||||
| 	"github.com/actions/actions-runner-controller/apis/actions.github.com/v1alpha1" | 	"github.com/actions/actions-runner-controller/apis/actions.github.com/v1alpha1" | ||||||
|  | @ -156,14 +157,14 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R | ||||||
| 		return ctrl.Result{}, err | 		return ctrl.Result{}, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners := categorizeEphemeralRunners(ephemeralRunnerList) | 	ephemeralRunnerState := newEphemeralRunnerState(ephemeralRunnerList) | ||||||
| 
 | 
 | ||||||
| 	log.Info("Ephemeral runner counts", | 	log.Info("Ephemeral runner counts", | ||||||
| 		"pending", len(pendingEphemeralRunners), | 		"pending", len(ephemeralRunnerState.pending), | ||||||
| 		"running", len(runningEphemeralRunners), | 		"running", len(ephemeralRunnerState.running), | ||||||
| 		"finished", len(finishedEphemeralRunners), | 		"finished", len(ephemeralRunnerState.finished), | ||||||
| 		"failed", len(failedEphemeralRunners), | 		"failed", len(ephemeralRunnerState.failed), | ||||||
| 		"deleting", len(deletingEphemeralRunners), | 		"deleting", len(ephemeralRunnerState.deleting), | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	if r.PublishMetrics { | 	if r.PublishMetrics { | ||||||
|  | @ -183,30 +184,20 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R | ||||||
| 				Organization: parsedURL.Organization, | 				Organization: parsedURL.Organization, | ||||||
| 				Enterprise:   parsedURL.Enterprise, | 				Enterprise:   parsedURL.Enterprise, | ||||||
| 			}, | 			}, | ||||||
| 			len(pendingEphemeralRunners), | 			len(ephemeralRunnerState.pending), | ||||||
| 			len(runningEphemeralRunners), | 			len(ephemeralRunnerState.running), | ||||||
| 			len(failedEphemeralRunners), | 			len(ephemeralRunnerState.failed), | ||||||
| 		) | 		) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// cleanup finished runners and proceed
 | 	total := ephemeralRunnerState.scaleTotal() | ||||||
| 	var errs []error | 	if ephemeralRunnerSet.Spec.PatchID == 0 || ephemeralRunnerSet.Spec.PatchID != ephemeralRunnerState.latestPatchID { | ||||||
| 	for i := range finishedEphemeralRunners { | 		defer func() { | ||||||
| 		log.Info("Deleting finished ephemeral runner", "name", finishedEphemeralRunners[i].Name) | 			if err := r.cleanupFinishedEphemeralRunners(ctx, ephemeralRunnerState.finished, log); err != nil { | ||||||
| 		if err := r.Delete(ctx, finishedEphemeralRunners[i]); err != nil { | 				log.Error(err, "failed to cleanup finished ephemeral runners") | ||||||
| 			if !kerrors.IsNotFound(err) { |  | ||||||
| 				errs = append(errs, err) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 			} | 			} | ||||||
|  | 		}() | ||||||
| 
 | 
 | ||||||
| 	if len(errs) > 0 { |  | ||||||
| 		mergedErrs := multierr.Combine(errs...) |  | ||||||
| 		log.Error(mergedErrs, "Failed to delete finished ephemeral runners") |  | ||||||
| 		return ctrl.Result{}, mergedErrs |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	total := len(pendingEphemeralRunners) + len(runningEphemeralRunners) + len(failedEphemeralRunners) |  | ||||||
| 		log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) | 		log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) | ||||||
| 		switch { | 		switch { | ||||||
| 		case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up
 | 		case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up
 | ||||||
|  | @ -220,17 +211,25 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R | ||||||
| 		case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario.
 | 		case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario.
 | ||||||
| 			count := total - ephemeralRunnerSet.Spec.Replicas | 			count := total - ephemeralRunnerSet.Spec.Replicas | ||||||
| 			log.Info("Deleting ephemeral runners (scale down)", "count", count) | 			log.Info("Deleting ephemeral runners (scale down)", "count", count) | ||||||
| 		if err := r.deleteIdleEphemeralRunners(ctx, ephemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners, count, log); err != nil { | 			if err := r.deleteIdleEphemeralRunners( | ||||||
|  | 				ctx, | ||||||
|  | 				ephemeralRunnerSet, | ||||||
|  | 				ephemeralRunnerState.pending, | ||||||
|  | 				ephemeralRunnerState.running, | ||||||
|  | 				count, | ||||||
|  | 				log, | ||||||
|  | 			); err != nil { | ||||||
| 				log.Error(err, "failed to delete idle runners") | 				log.Error(err, "failed to delete idle runners") | ||||||
| 				return ctrl.Result{}, err | 				return ctrl.Result{}, err | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 	desiredStatus := v1alpha1.EphemeralRunnerSetStatus{ | 	desiredStatus := v1alpha1.EphemeralRunnerSetStatus{ | ||||||
| 		CurrentReplicas:         total, | 		CurrentReplicas:         total, | ||||||
| 		PendingEphemeralRunners: len(pendingEphemeralRunners), | 		PendingEphemeralRunners: len(ephemeralRunnerState.pending), | ||||||
| 		RunningEphemeralRunners: len(runningEphemeralRunners), | 		RunningEphemeralRunners: len(ephemeralRunnerState.running), | ||||||
| 		FailedEphemeralRunners:  len(failedEphemeralRunners), | 		FailedEphemeralRunners:  len(ephemeralRunnerState.failed), | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// Update the status if needed.
 | 	// Update the status if needed.
 | ||||||
|  | @ -247,6 +246,21 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R | ||||||
| 	return ctrl.Result{}, nil | 	return ctrl.Result{}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (r *EphemeralRunnerSetReconciler) cleanupFinishedEphemeralRunners(ctx context.Context, finishedEphemeralRunners []*v1alpha1.EphemeralRunner, log logr.Logger) error { | ||||||
|  | 	// cleanup finished runners and proceed
 | ||||||
|  | 	var errs []error | ||||||
|  | 	for i := range finishedEphemeralRunners { | ||||||
|  | 		log.Info("Deleting finished ephemeral runner", "name", finishedEphemeralRunners[i].Name) | ||||||
|  | 		if err := r.Delete(ctx, finishedEphemeralRunners[i]); err != nil { | ||||||
|  | 			if !kerrors.IsNotFound(err) { | ||||||
|  | 				errs = append(errs, err) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return multierr.Combine(errs...) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (r *EphemeralRunnerSetReconciler) cleanUpProxySecret(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, log logr.Logger) error { | func (r *EphemeralRunnerSetReconciler) cleanUpProxySecret(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, log logr.Logger) error { | ||||||
| 	if ephemeralRunnerSet.Spec.EphemeralRunnerSpec.Proxy == nil { | 	if ephemeralRunnerSet.Spec.EphemeralRunnerSpec.Proxy == nil { | ||||||
| 		return nil | 		return nil | ||||||
|  | @ -284,19 +298,19 @@ func (r *EphemeralRunnerSetReconciler) cleanUpEphemeralRunners(ctx context.Conte | ||||||
| 		return true, nil | 		return true, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners := categorizeEphemeralRunners(ephemeralRunnerList) | 	ephemeralRunnerState := newEphemeralRunnerState(ephemeralRunnerList) | ||||||
| 
 | 
 | ||||||
| 	log.Info("Clean up runner counts", | 	log.Info("Clean up runner counts", | ||||||
| 		"pending", len(pendingEphemeralRunners), | 		"pending", len(ephemeralRunnerState.pending), | ||||||
| 		"running", len(runningEphemeralRunners), | 		"running", len(ephemeralRunnerState.running), | ||||||
| 		"finished", len(finishedEphemeralRunners), | 		"finished", len(ephemeralRunnerState.finished), | ||||||
| 		"failed", len(failedEphemeralRunners), | 		"failed", len(ephemeralRunnerState.failed), | ||||||
| 		"deleting", len(deletingEphemeralRunners), | 		"deleting", len(ephemeralRunnerState.deleting), | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	log.Info("Cleanup finished or failed ephemeral runners") | 	log.Info("Cleanup finished or failed ephemeral runners") | ||||||
| 	var errs []error | 	var errs []error | ||||||
| 	for _, ephemeralRunner := range append(finishedEphemeralRunners, failedEphemeralRunners...) { | 	for _, ephemeralRunner := range append(ephemeralRunnerState.finished, ephemeralRunnerState.failed...) { | ||||||
| 		log.Info("Deleting ephemeral runner", "name", ephemeralRunner.Name) | 		log.Info("Deleting ephemeral runner", "name", ephemeralRunner.Name) | ||||||
| 		if err := r.Delete(ctx, ephemeralRunner); err != nil && !kerrors.IsNotFound(err) { | 		if err := r.Delete(ctx, ephemeralRunner); err != nil && !kerrors.IsNotFound(err) { | ||||||
| 			errs = append(errs, err) | 			errs = append(errs, err) | ||||||
|  | @ -310,7 +324,7 @@ func (r *EphemeralRunnerSetReconciler) cleanUpEphemeralRunners(ctx context.Conte | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// avoid fetching the client if we have nothing left to do
 | 	// avoid fetching the client if we have nothing left to do
 | ||||||
| 	if len(runningEphemeralRunners) == 0 && len(pendingEphemeralRunners) == 0 { | 	if len(ephemeralRunnerState.running) == 0 && len(ephemeralRunnerState.pending) == 0 { | ||||||
| 		return false, nil | 		return false, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -321,7 +335,7 @@ func (r *EphemeralRunnerSetReconciler) cleanUpEphemeralRunners(ctx context.Conte | ||||||
| 
 | 
 | ||||||
| 	log.Info("Cleanup pending or running ephemeral runners") | 	log.Info("Cleanup pending or running ephemeral runners") | ||||||
| 	errs = errs[0:0] | 	errs = errs[0:0] | ||||||
| 	for _, ephemeralRunner := range append(pendingEphemeralRunners, runningEphemeralRunners...) { | 	for _, ephemeralRunner := range append(ephemeralRunnerState.pending, ephemeralRunnerState.running...) { | ||||||
| 		log.Info("Removing the ephemeral runner from the service", "name", ephemeralRunner.Name) | 		log.Info("Removing the ephemeral runner from the service", "name", ephemeralRunner.Name) | ||||||
| 		_, err := r.deleteEphemeralRunnerWithActionsClient(ctx, ephemeralRunner, actionsClient, log) | 		_, err := r.deleteEphemeralRunnerWithActionsClient(ctx, ephemeralRunner, actionsClient, log) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  | @ -427,12 +441,13 @@ func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Co | ||||||
| 	deletedCount := 0 | 	deletedCount := 0 | ||||||
| 	for runners.next() { | 	for runners.next() { | ||||||
| 		ephemeralRunner := runners.object() | 		ephemeralRunner := runners.object() | ||||||
| 		if ephemeralRunner.Status.RunnerId == 0 { | 		isDone := ephemeralRunner.IsDone() | ||||||
|  | 		if !isDone && ephemeralRunner.Status.RunnerId == 0 { | ||||||
| 			log.Info("Skipping ephemeral runner since it is not registered yet", "name", ephemeralRunner.Name) | 			log.Info("Skipping ephemeral runner since it is not registered yet", "name", ephemeralRunner.Name) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if ephemeralRunner.Status.JobRequestId > 0 { | 		if !isDone && ephemeralRunner.Status.JobRequestId > 0 { | ||||||
| 			log.Info("Skipping ephemeral runner since it is running a job", "name", ephemeralRunner.Name, "jobRequestId", ephemeralRunner.Status.JobRequestId) | 			log.Info("Skipping ephemeral runner since it is running a job", "name", ephemeralRunner.Name, "jobRequestId", ephemeralRunner.Status.JobRequestId) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  | @ -580,16 +595,22 @@ type ephemeralRunnerStepper struct { | ||||||
| 	index int | 	index int | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func newEphemeralRunnerStepper(pending, running []*v1alpha1.EphemeralRunner) *ephemeralRunnerStepper { | func newEphemeralRunnerStepper(primary []*v1alpha1.EphemeralRunner, othersOrdered ...[]*v1alpha1.EphemeralRunner) *ephemeralRunnerStepper { | ||||||
| 	sort.Slice(pending, func(i, j int) bool { | 	sort.Slice(primary, func(i, j int) bool { | ||||||
| 		return pending[i].GetCreationTimestamp().Time.Before(pending[j].GetCreationTimestamp().Time) | 		return primary[i].GetCreationTimestamp().Time.Before(primary[j].GetCreationTimestamp().Time) | ||||||
| 	}) | 	}) | ||||||
| 	sort.Slice(running, func(i, j int) bool { | 	for _, bucket := range othersOrdered { | ||||||
| 		return running[i].GetCreationTimestamp().Time.Before(running[j].GetCreationTimestamp().Time) | 		sort.Slice(bucket, func(i, j int) bool { | ||||||
|  | 			return bucket[i].GetCreationTimestamp().Time.Before(bucket[j].GetCreationTimestamp().Time) | ||||||
| 		}) | 		}) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, bucket := range othersOrdered { | ||||||
|  | 		primary = append(primary, bucket...) | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 	return &ephemeralRunnerStepper{ | 	return &ephemeralRunnerStepper{ | ||||||
| 		items: append(pending, running...), | 		items: primary, | ||||||
| 		index: -1, | 		index: -1, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -613,28 +634,48 @@ func (s *ephemeralRunnerStepper) len() int { | ||||||
| 	return len(s.items) | 	return len(s.items) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func categorizeEphemeralRunners(ephemeralRunnerList *v1alpha1.EphemeralRunnerList) (pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners []*v1alpha1.EphemeralRunner) { | type ephemeralRunnerState struct { | ||||||
|  | 	pending  []*v1alpha1.EphemeralRunner | ||||||
|  | 	running  []*v1alpha1.EphemeralRunner | ||||||
|  | 	finished []*v1alpha1.EphemeralRunner | ||||||
|  | 	failed   []*v1alpha1.EphemeralRunner | ||||||
|  | 	deleting []*v1alpha1.EphemeralRunner | ||||||
|  | 
 | ||||||
|  | 	latestPatchID int | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func newEphemeralRunnerState(ephemeralRunnerList *v1alpha1.EphemeralRunnerList) *ephemeralRunnerState { | ||||||
|  | 	var ephemeralRunnerState ephemeralRunnerState | ||||||
|  | 
 | ||||||
| 	for i := range ephemeralRunnerList.Items { | 	for i := range ephemeralRunnerList.Items { | ||||||
| 		r := &ephemeralRunnerList.Items[i] | 		r := &ephemeralRunnerList.Items[i] | ||||||
|  | 		patchID, err := strconv.Atoi(r.Annotations[AnnotationKeyPatchID]) | ||||||
|  | 		if err == nil && patchID > ephemeralRunnerState.latestPatchID { | ||||||
|  | 			ephemeralRunnerState.latestPatchID = patchID | ||||||
|  | 		} | ||||||
| 		if !r.ObjectMeta.DeletionTimestamp.IsZero() { | 		if !r.ObjectMeta.DeletionTimestamp.IsZero() { | ||||||
| 			deletingEphemeralRunners = append(deletingEphemeralRunners, r) | 			ephemeralRunnerState.deleting = append(ephemeralRunnerState.deleting, r) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		switch r.Status.Phase { | 		switch r.Status.Phase { | ||||||
| 		case corev1.PodRunning: | 		case corev1.PodRunning: | ||||||
| 			runningEphemeralRunners = append(runningEphemeralRunners, r) | 			ephemeralRunnerState.running = append(ephemeralRunnerState.running, r) | ||||||
| 		case corev1.PodSucceeded: | 		case corev1.PodSucceeded: | ||||||
| 			finishedEphemeralRunners = append(finishedEphemeralRunners, r) | 			ephemeralRunnerState.finished = append(ephemeralRunnerState.finished, r) | ||||||
| 		case corev1.PodFailed: | 		case corev1.PodFailed: | ||||||
| 			failedEphemeralRunners = append(failedEphemeralRunners, r) | 			ephemeralRunnerState.failed = append(ephemeralRunnerState.failed, r) | ||||||
| 		default: | 		default: | ||||||
| 			// Pending or no phase should be considered as pending.
 | 			// Pending or no phase should be considered as pending.
 | ||||||
| 			//
 | 			//
 | ||||||
| 			// If field is not set, that means that the EphemeralRunner
 | 			// If field is not set, that means that the EphemeralRunner
 | ||||||
| 			// did not yet have chance to update the Status.Phase field.
 | 			// did not yet have chance to update the Status.Phase field.
 | ||||||
| 			pendingEphemeralRunners = append(pendingEphemeralRunners, r) | 			ephemeralRunnerState.pending = append(ephemeralRunnerState.pending, r) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return | 	return &ephemeralRunnerState | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *ephemeralRunnerState) scaleTotal() int { | ||||||
|  | 	return len(s.pending) + len(s.running) + len(s.failed) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -4,6 +4,7 @@ import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"crypto/tls" | 	"crypto/tls" | ||||||
| 	"encoding/base64" | 	"encoding/base64" | ||||||
|  | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"net/http/httptest" | 	"net/http/httptest" | ||||||
|  | @ -274,14 +275,17 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 	}) | 	}) | ||||||
| 
 | 
 | ||||||
| 	Context("When a new EphemeralRunnerSet scale up and down", func() { | 	Context("When a new EphemeralRunnerSet scale up and down", func() { | ||||||
| 		It("It should delete finished EphemeralRunner and create new EphemeralRunner", func() { | 		It("Should scale only on patch ID change", func() { | ||||||
| 			created := new(actionsv1alpha1.EphemeralRunnerSet) | 			created := new(actionsv1alpha1.EphemeralRunnerSet) | ||||||
| 			err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, created) | 			err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, created) | ||||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||||
| 
 | 
 | ||||||
|  | 			patchID := 1 | ||||||
|  | 
 | ||||||
| 			// Scale up the EphemeralRunnerSet
 | 			// Scale up the EphemeralRunnerSet
 | ||||||
| 			updated := created.DeepCopy() | 			updated := created.DeepCopy() | ||||||
| 			updated.Spec.Replicas = 5 | 			updated.Spec.Replicas = 5 | ||||||
|  | 			updated.Spec.PatchID = patchID | ||||||
| 			err = k8sClient.Update(ctx, updated) | 			err = k8sClient.Update(ctx, updated) | ||||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||||
| 
 | 
 | ||||||
|  | @ -317,7 +321,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 					return len(runnerList.Items), nil | 					return len(runnerList.Items), nil | ||||||
| 				}, | 				}, | ||||||
| 				ephemeralRunnerSetTestTimeout, | 				ephemeralRunnerSetTestTimeout, | ||||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") | 				ephemeralRunnerSetTestInterval, | ||||||
|  | 			).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") | ||||||
| 
 | 
 | ||||||
| 			// Mark one of the EphemeralRunner as finished
 | 			// Mark one of the EphemeralRunner as finished
 | ||||||
| 			finishedRunner := runnerList.Items[4].DeepCopy() | 			finishedRunner := runnerList.Items[4].DeepCopy() | ||||||
|  | @ -325,7 +330,7 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 			err = k8sClient.Status().Patch(ctx, finishedRunner, client.MergeFrom(&runnerList.Items[4])) | 			err = k8sClient.Status().Patch(ctx, finishedRunner, client.MergeFrom(&runnerList.Items[4])) | ||||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | ||||||
| 
 | 
 | ||||||
| 			// Wait for the finished EphemeralRunner to be deleted
 | 			// Wait for the finished EphemeralRunner to be set to succeeded
 | ||||||
| 			Eventually( | 			Eventually( | ||||||
| 				func() error { | 				func() error { | ||||||
| 					runnerList := new(actionsv1alpha1.EphemeralRunnerList) | 					runnerList := new(actionsv1alpha1.EphemeralRunnerList) | ||||||
|  | @ -335,17 +340,35 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 					} | 					} | ||||||
| 
 | 
 | ||||||
| 					for _, runner := range runnerList.Items { | 					for _, runner := range runnerList.Items { | ||||||
| 						if runner.Name == finishedRunner.Name { | 						if runner.Name != finishedRunner.Name { | ||||||
| 							return fmt.Errorf("EphemeralRunner is not deleted") | 							continue | ||||||
| 						} |  | ||||||
| 						} | 						} | ||||||
| 
 | 
 | ||||||
|  | 						if runner.Status.Phase != corev1.PodSucceeded { | ||||||
|  | 							return fmt.Errorf("EphemeralRunner is not finished") | ||||||
|  | 						} | ||||||
|  | 						// found pod succeeded
 | ||||||
| 						return nil | 						return nil | ||||||
|  | 					} | ||||||
|  | 
 | ||||||
|  | 					return errors.New("Finished ephemeral runner is not found") | ||||||
| 				}, | 				}, | ||||||
| 				ephemeralRunnerSetTestTimeout, | 				ephemeralRunnerSetTestTimeout, | ||||||
| 				ephemeralRunnerSetTestInterval).Should(Succeed(), "Finished EphemeralRunner should be deleted") | 				ephemeralRunnerSetTestInterval, | ||||||
|  | 			).Should(Succeed(), "Finished EphemeralRunner should be deleted") | ||||||
| 
 | 
 | ||||||
| 			// We should still have the EphemeralRunnerSet scale up
 | 			// After one ephemeral runner is finished, simulate job done patch
 | ||||||
|  | 			patchID++ | ||||||
|  | 			original := new(actionsv1alpha1.EphemeralRunnerSet) | ||||||
|  | 			err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) | ||||||
|  | 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||||
|  | 			updated = original.DeepCopy() | ||||||
|  | 			updated.Spec.PatchID = patchID | ||||||
|  | 			updated.Spec.Replicas = 4 | ||||||
|  | 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) | ||||||
|  | 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||||
|  | 
 | ||||||
|  | 			// Only finished ephemeral runner should be deleted
 | ||||||
| 			runnerList = new(actionsv1alpha1.EphemeralRunnerList) | 			runnerList = new(actionsv1alpha1.EphemeralRunnerList) | ||||||
| 			Eventually( | 			Eventually( | ||||||
| 				func() (int, error) { | 				func() (int, error) { | ||||||
|  | @ -354,35 +377,27 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 						return -1, err | 						return -1, err | ||||||
| 					} | 					} | ||||||
| 
 | 
 | ||||||
| 					// Set status to simulate a configured EphemeralRunner
 | 					for _, runner := range runnerList.Items { | ||||||
| 					refetch := false | 						if runner.Status.Phase == corev1.PodSucceeded { | ||||||
| 					for i, runner := range runnerList.Items { | 							return -1, fmt.Errorf("Finished EphemeralRunner should be deleted") | ||||||
| 						if runner.Status.RunnerId == 0 { |  | ||||||
| 							updatedRunner := runner.DeepCopy() |  | ||||||
| 							updatedRunner.Status.Phase = corev1.PodRunning |  | ||||||
| 							updatedRunner.Status.RunnerId = i + 100 |  | ||||||
| 							err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner)) |  | ||||||
| 							Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") |  | ||||||
| 							refetch = true |  | ||||||
| 						} |  | ||||||
| 					} |  | ||||||
| 
 |  | ||||||
| 					if refetch { |  | ||||||
| 						err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) |  | ||||||
| 						if err != nil { |  | ||||||
| 							return -1, err |  | ||||||
| 						} | 						} | ||||||
| 					} | 					} | ||||||
| 
 | 
 | ||||||
| 					return len(runnerList.Items), nil | 					return len(runnerList.Items), nil | ||||||
| 				}, | 				}, | ||||||
| 				ephemeralRunnerSetTestTimeout, | 				ephemeralRunnerSetTestTimeout, | ||||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") | 				ephemeralRunnerSetTestInterval, | ||||||
|  | 			).Should(BeEquivalentTo(4), "4 EphemeralRunner should be created") | ||||||
| 
 | 
 | ||||||
| 			// Scale down the EphemeralRunnerSet
 | 			// Scaling down the EphemeralRunnerSet
 | ||||||
| 			updated = created.DeepCopy() | 			patchID++ | ||||||
|  | 			original = new(actionsv1alpha1.EphemeralRunnerSet) | ||||||
|  | 			err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) | ||||||
|  | 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||||
|  | 			updated = original.DeepCopy() | ||||||
|  | 			updated.Spec.PatchID = patchID | ||||||
| 			updated.Spec.Replicas = 3 | 			updated.Spec.Replicas = 3 | ||||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(created)) | 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) | ||||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||||
| 
 | 
 | ||||||
| 			// Wait for the EphemeralRunnerSet to be scaled down
 | 			// Wait for the EphemeralRunnerSet to be scaled down
 | ||||||
|  | @ -417,7 +432,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 					return len(runnerList.Items), nil | 					return len(runnerList.Items), nil | ||||||
| 				}, | 				}, | ||||||
| 				ephemeralRunnerSetTestTimeout, | 				ephemeralRunnerSetTestTimeout, | ||||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(3), "3 EphemeralRunner should be created") | 				ephemeralRunnerSetTestInterval, | ||||||
|  | 			).Should(BeEquivalentTo(3), "3 EphemeralRunner should be created") | ||||||
| 
 | 
 | ||||||
| 			// We will not scale down runner that is running jobs
 | 			// We will not scale down runner that is running jobs
 | ||||||
| 			runningRunner := runnerList.Items[0].DeepCopy() | 			runningRunner := runnerList.Items[0].DeepCopy() | ||||||
|  | @ -430,10 +446,15 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 			err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[1])) | 			err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[1])) | ||||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | ||||||
| 
 | 
 | ||||||
| 			// Scale down to 1
 | 			// Scale down to 1 while 2 are running
 | ||||||
| 			updated = created.DeepCopy() | 			patchID++ | ||||||
|  | 			original = new(actionsv1alpha1.EphemeralRunnerSet) | ||||||
|  | 			err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) | ||||||
|  | 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||||
|  | 			updated = original.DeepCopy() | ||||||
|  | 			updated.Spec.PatchID = patchID | ||||||
| 			updated.Spec.Replicas = 1 | 			updated.Spec.Replicas = 1 | ||||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(created)) | 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) | ||||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||||
| 
 | 
 | ||||||
| 			// Wait for the EphemeralRunnerSet to be scaled down to 2 since we still have 2 runner running jobs
 | 			// Wait for the EphemeralRunnerSet to be scaled down to 2 since we still have 2 runner running jobs
 | ||||||
|  | @ -468,7 +489,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 					return len(runnerList.Items), nil | 					return len(runnerList.Items), nil | ||||||
| 				}, | 				}, | ||||||
| 				ephemeralRunnerSetTestTimeout, | 				ephemeralRunnerSetTestTimeout, | ||||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") | 				ephemeralRunnerSetTestInterval, | ||||||
|  | 			).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") | ||||||
| 
 | 
 | ||||||
| 			// We will not scale down failed runner
 | 			// We will not scale down failed runner
 | ||||||
| 			failedRunner := runnerList.Items[0].DeepCopy() | 			failedRunner := runnerList.Items[0].DeepCopy() | ||||||
|  | @ -476,15 +498,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 			err = k8sClient.Status().Patch(ctx, failedRunner, client.MergeFrom(&runnerList.Items[0])) | 			err = k8sClient.Status().Patch(ctx, failedRunner, client.MergeFrom(&runnerList.Items[0])) | ||||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") | ||||||
| 
 | 
 | ||||||
| 			// Scale down to 0
 |  | ||||||
| 			updated = created.DeepCopy() |  | ||||||
| 			updated.Spec.Replicas = 0 |  | ||||||
| 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(created)) |  | ||||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") |  | ||||||
| 
 |  | ||||||
| 			// We should not scale down the EphemeralRunnerSet since we still have 1 runner running job and 1 failed runner
 |  | ||||||
| 			runnerList = new(actionsv1alpha1.EphemeralRunnerList) | 			runnerList = new(actionsv1alpha1.EphemeralRunnerList) | ||||||
| 			Consistently( | 			Eventually( | ||||||
| 				func() (int, error) { | 				func() (int, error) { | ||||||
| 					err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) | 					err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) | ||||||
| 					if err != nil { | 					if err != nil { | ||||||
|  | @ -514,7 +529,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 					return len(runnerList.Items), nil | 					return len(runnerList.Items), nil | ||||||
| 				}, | 				}, | ||||||
| 				ephemeralRunnerSetTestTimeout, | 				ephemeralRunnerSetTestTimeout, | ||||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") | 				ephemeralRunnerSetTestInterval, | ||||||
|  | 			).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") | ||||||
| 
 | 
 | ||||||
| 			// We will scale down to 0 when the running job is completed and the failed runner is deleted
 | 			// We will scale down to 0 when the running job is completed and the failed runner is deleted
 | ||||||
| 			runningRunner = runnerList.Items[1].DeepCopy() | 			runningRunner = runnerList.Items[1].DeepCopy() | ||||||
|  | @ -525,6 +541,17 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 			err = k8sClient.Delete(ctx, &runnerList.Items[0]) | 			err = k8sClient.Delete(ctx, &runnerList.Items[0]) | ||||||
| 			Expect(err).NotTo(HaveOccurred(), "failed to delete EphemeralRunner") | 			Expect(err).NotTo(HaveOccurred(), "failed to delete EphemeralRunner") | ||||||
| 
 | 
 | ||||||
|  | 			// Scale down to 0 while 1 ephemeral runner is failed
 | ||||||
|  | 			patchID++ | ||||||
|  | 			original = new(actionsv1alpha1.EphemeralRunnerSet) | ||||||
|  | 			err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) | ||||||
|  | 			Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") | ||||||
|  | 			updated = original.DeepCopy() | ||||||
|  | 			updated.Spec.PatchID = patchID | ||||||
|  | 			updated.Spec.Replicas = 0 | ||||||
|  | 			err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) | ||||||
|  | 			Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") | ||||||
|  | 
 | ||||||
| 			// Wait for the EphemeralRunnerSet to be scaled down to 0
 | 			// Wait for the EphemeralRunnerSet to be scaled down to 0
 | ||||||
| 			runnerList = new(actionsv1alpha1.EphemeralRunnerList) | 			runnerList = new(actionsv1alpha1.EphemeralRunnerList) | ||||||
| 			Eventually( | 			Eventually( | ||||||
|  | @ -557,7 +584,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { | ||||||
| 					return len(runnerList.Items), nil | 					return len(runnerList.Items), nil | ||||||
| 				}, | 				}, | ||||||
| 				ephemeralRunnerSetTestTimeout, | 				ephemeralRunnerSetTestTimeout, | ||||||
| 				ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(0), "0 EphemeralRunner should be created") | 				ephemeralRunnerSetTestInterval, | ||||||
|  | 			).Should(BeEquivalentTo(0), "0 EphemeralRunner should be created") | ||||||
| 		}) | 		}) | ||||||
| 
 | 
 | ||||||
| 		It("Should update status on Ephemeral Runner state changes", func() { | 		It("Should update status on Ephemeral Runner state changes", func() { | ||||||
|  |  | ||||||
|  | @ -563,6 +563,7 @@ func (b *resourceBuilder) newEphemeralRunner(ephemeralRunnerSet *v1alpha1.Epheme | ||||||
| 	for key, val := range ephemeralRunnerSet.Annotations { | 	for key, val := range ephemeralRunnerSet.Annotations { | ||||||
| 		annotations[key] = val | 		annotations[key] = val | ||||||
| 	} | 	} | ||||||
|  | 	annotations[AnnotationKeyPatchID] = strconv.Itoa(ephemeralRunnerSet.Spec.PatchID) | ||||||
| 	return &v1alpha1.EphemeralRunner{ | 	return &v1alpha1.EphemeralRunner{ | ||||||
| 		TypeMeta: metav1.TypeMeta{}, | 		TypeMeta: metav1.TypeMeta{}, | ||||||
| 		ObjectMeta: metav1.ObjectMeta{ | 		ObjectMeta: metav1.ObjectMeta{ | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue