Remove ephemeral runner when exit code != 0 and is patched with the job (#4239)

This commit is contained in:
Nikola Jokic 2025-09-17 21:40:37 +02:00 committed by GitHub
parent 2035e13724
commit 088e2a3a90
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 186 additions and 74 deletions

View File

@ -34,6 +34,7 @@ const EphemeralRunnerContainerName = "runner"
// +kubebuilder:printcolumn:JSONPath=".status.jobWorkflowRef",name=JobWorkflowRef,type=string // +kubebuilder:printcolumn:JSONPath=".status.jobWorkflowRef",name=JobWorkflowRef,type=string
// +kubebuilder:printcolumn:JSONPath=".status.workflowRunId",name=WorkflowRunId,type=number // +kubebuilder:printcolumn:JSONPath=".status.workflowRunId",name=WorkflowRunId,type=number
// +kubebuilder:printcolumn:JSONPath=".status.jobDisplayName",name=JobDisplayName,type=string // +kubebuilder:printcolumn:JSONPath=".status.jobDisplayName",name=JobDisplayName,type=string
// +kubebuilder:printcolumn:JSONPath=".status.jobId",name=JobId,type=string
// +kubebuilder:printcolumn:JSONPath=".status.message",name=Message,type=string // +kubebuilder:printcolumn:JSONPath=".status.message",name=Message,type=string
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
@ -50,6 +51,10 @@ func (er *EphemeralRunner) IsDone() bool {
return er.Status.Phase == corev1.PodSucceeded || er.Status.Phase == corev1.PodFailed return er.Status.Phase == corev1.PodSucceeded || er.Status.Phase == corev1.PodFailed
} }
func (er *EphemeralRunner) HasJob() bool {
return len(er.Status.JobID) > 0
}
func (er *EphemeralRunner) HasContainerHookConfigured() bool { func (er *EphemeralRunner) HasContainerHookConfigured() bool {
for i := range er.Spec.Spec.Containers { for i := range er.Spec.Spec.Containers {
if er.Spec.Spec.Containers[i].Name != EphemeralRunnerContainerName { if er.Spec.Spec.Containers[i].Name != EphemeralRunnerContainerName {
@ -152,6 +157,9 @@ type EphemeralRunnerStatus struct {
// +optional // +optional
JobRequestId int64 `json:"jobRequestId,omitempty"` JobRequestId int64 `json:"jobRequestId,omitempty"`
// +optional
JobID string `json:"jobId,omitempty"`
// +optional // +optional
JobRepositoryName string `json:"jobRepositoryName,omitempty"` JobRepositoryName string `json:"jobRepositoryName,omitempty"`

View File

@ -36,6 +36,9 @@ spec:
- jsonPath: .status.jobDisplayName - jsonPath: .status.jobDisplayName
name: JobDisplayName name: JobDisplayName
type: string type: string
- jsonPath: .status.jobId
name: JobId
type: string
- jsonPath: .status.message - jsonPath: .status.message
name: Message name: Message
type: string type: string
@ -7846,6 +7849,8 @@ spec:
type: object type: object
jobDisplayName: jobDisplayName:
type: string type: string
jobId:
type: string
jobRepositoryName: jobRepositoryName:
type: string type: string
jobRequestId: jobRequestId:

View File

@ -361,7 +361,7 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
return nil, fmt.Errorf("failed to decode job available: %w", err) return nil, fmt.Errorf("failed to decode job available: %w", err)
} }
l.logger.Info("Job available message received", "jobId", jobAvailable.RunnerRequestId) l.logger.Info("Job available message received", "jobId", jobAvailable.JobID)
parsedMsg.jobsAvailable = append(parsedMsg.jobsAvailable, &jobAvailable) parsedMsg.jobsAvailable = append(parsedMsg.jobsAvailable, &jobAvailable)
case messageTypeJobAssigned: case messageTypeJobAssigned:
@ -370,14 +370,14 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
return nil, fmt.Errorf("failed to decode job assigned: %w", err) return nil, fmt.Errorf("failed to decode job assigned: %w", err)
} }
l.logger.Info("Job assigned message received", "jobId", jobAssigned.RunnerRequestId) l.logger.Info("Job assigned message received", "jobId", jobAssigned.JobID)
case messageTypeJobStarted: case messageTypeJobStarted:
var jobStarted actions.JobStarted var jobStarted actions.JobStarted
if err := json.Unmarshal(msg, &jobStarted); err != nil { if err := json.Unmarshal(msg, &jobStarted); err != nil {
return nil, fmt.Errorf("could not decode job started message. %w", err) return nil, fmt.Errorf("could not decode job started message. %w", err)
} }
l.logger.Info("Job started message received.", "RequestId", jobStarted.RunnerRequestId, "RunnerId", jobStarted.RunnerId) l.logger.Info("Job started message received.", "JobID", jobStarted.JobID, "RunnerId", jobStarted.RunnerID)
parsedMsg.jobsStarted = append(parsedMsg.jobsStarted, &jobStarted) parsedMsg.jobsStarted = append(parsedMsg.jobsStarted, &jobStarted)
case messageTypeJobCompleted: case messageTypeJobCompleted:
@ -386,7 +386,13 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
return nil, fmt.Errorf("failed to decode job completed: %w", err) return nil, fmt.Errorf("failed to decode job completed: %w", err)
} }
l.logger.Info("Job completed message received.", "RequestId", jobCompleted.RunnerRequestId, "Result", jobCompleted.Result, "RunnerId", jobCompleted.RunnerId, "RunnerName", jobCompleted.RunnerName) l.logger.Info(
"Job completed message received.",
"JobID", jobCompleted.JobID,
"Result", jobCompleted.Result,
"RunnerId", jobCompleted.RunnerId,
"RunnerName", jobCompleted.RunnerName,
)
parsedMsg.jobsCompleted = append(parsedMsg.jobsCompleted, &jobCompleted) parsedMsg.jobsCompleted = append(parsedMsg.jobsCompleted, &jobCompleted)
default: default:
@ -400,7 +406,7 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*actions.JobAvailable) ([]int64, error) { func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*actions.JobAvailable) ([]int64, error) {
ids := make([]int64, 0, len(jobsAvailable)) ids := make([]int64, 0, len(jobsAvailable))
for _, job := range jobsAvailable { for _, job := range jobsAvailable {
ids = append(ids, job.RunnerRequestId) ids = append(ids, job.RunnerRequestID)
} }
l.logger.Info("Acquiring jobs", "count", len(ids), "requestIds", fmt.Sprint(ids)) l.logger.Info("Acquiring jobs", "count", len(ids), "requestIds", fmt.Sprint(ids))

View File

@ -627,17 +627,17 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
availableJobs := []*actions.JobAvailable{ availableJobs := []*actions.JobAvailable{
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
} }
@ -678,17 +678,17 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
availableJobs := []*actions.JobAvailable{ availableJobs := []*actions.JobAvailable{
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
} }
@ -724,17 +724,17 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
availableJobs := []*actions.JobAvailable{ availableJobs := []*actions.JobAvailable{
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
} }
@ -809,17 +809,17 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
availableJobs := []*actions.JobAvailable{ availableJobs := []*actions.JobAvailable{
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
} }
@ -881,7 +881,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobAvailable, MessageType: messageTypeJobAvailable,
}, },
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
@ -890,7 +890,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobAvailable, MessageType: messageTypeJobAvailable,
}, },
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
} }
@ -904,7 +904,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobAssigned, MessageType: messageTypeJobAssigned,
}, },
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
{ {
@ -912,7 +912,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobAssigned, MessageType: messageTypeJobAssigned,
}, },
RunnerRequestId: 4, RunnerRequestID: 4,
}, },
}, },
} }
@ -926,9 +926,9 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobStarted, MessageType: messageTypeJobStarted,
}, },
RunnerRequestId: 5, RunnerRequestID: 5,
}, },
RunnerId: 2, RunnerID: 2,
RunnerName: "runner2", RunnerName: "runner2",
}, },
} }
@ -942,7 +942,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobCompleted, MessageType: messageTypeJobCompleted,
}, },
RunnerRequestId: 6, RunnerRequestID: 6,
}, },
Result: "success", Result: "success",
RunnerId: 1, RunnerId: 1,

View File

@ -123,9 +123,9 @@ func TestHandleMessageMetrics(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobStarted, MessageType: messageTypeJobStarted,
}, },
RunnerRequestId: 8, RunnerRequestID: 8,
}, },
RunnerId: 3, RunnerID: 3,
RunnerName: "runner3", RunnerName: "runner3",
}, },
} }
@ -139,7 +139,7 @@ func TestHandleMessageMetrics(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobCompleted, MessageType: messageTypeJobCompleted,
}, },
RunnerRequestId: 6, RunnerRequestID: 6,
}, },
Result: "success", Result: "success",
RunnerId: 1, RunnerId: 1,
@ -150,7 +150,7 @@ func TestHandleMessageMetrics(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobCompleted, MessageType: messageTypeJobCompleted,
}, },
RunnerRequestId: 7, RunnerRequestID: 7,
}, },
Result: "success", Result: "success",
RunnerId: 2, RunnerId: 2,

View File

@ -100,10 +100,11 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
"runnerName", jobInfo.RunnerName, "runnerName", jobInfo.RunnerName,
"ownerName", jobInfo.OwnerName, "ownerName", jobInfo.OwnerName,
"repoName", jobInfo.RepositoryName, "repoName", jobInfo.RepositoryName,
"jobId", jobInfo.JobID,
"workflowRef", jobInfo.JobWorkflowRef, "workflowRef", jobInfo.JobWorkflowRef,
"workflowRunId", jobInfo.WorkflowRunId, "workflowRunId", jobInfo.WorkflowRunID,
"jobDisplayName", jobInfo.JobDisplayName, "jobDisplayName", jobInfo.JobDisplayName,
"requestId", jobInfo.RunnerRequestId) "requestId", jobInfo.RunnerRequestID)
original, err := json.Marshal(&v1alpha1.EphemeralRunner{}) original, err := json.Marshal(&v1alpha1.EphemeralRunner{})
if err != nil { if err != nil {
@ -113,9 +114,10 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
patch, err := json.Marshal( patch, err := json.Marshal(
&v1alpha1.EphemeralRunner{ &v1alpha1.EphemeralRunner{
Status: v1alpha1.EphemeralRunnerStatus{ Status: v1alpha1.EphemeralRunnerStatus{
JobRequestId: jobInfo.RunnerRequestId, JobRequestId: jobInfo.RunnerRequestID,
JobRepositoryName: fmt.Sprintf("%s/%s", jobInfo.OwnerName, jobInfo.RepositoryName), JobRepositoryName: fmt.Sprintf("%s/%s", jobInfo.OwnerName, jobInfo.RepositoryName),
WorkflowRunId: jobInfo.WorkflowRunId, JobID: jobInfo.JobID,
WorkflowRunId: jobInfo.WorkflowRunID,
JobWorkflowRef: jobInfo.JobWorkflowRef, JobWorkflowRef: jobInfo.JobWorkflowRef,
JobDisplayName: jobInfo.JobDisplayName, JobDisplayName: jobInfo.JobDisplayName,
}, },

View File

@ -36,6 +36,9 @@ spec:
- jsonPath: .status.jobDisplayName - jsonPath: .status.jobDisplayName
name: JobDisplayName name: JobDisplayName
type: string type: string
- jsonPath: .status.jobId
name: JobId
type: string
- jsonPath: .status.message - jsonPath: .status.message
name: Message name: Message
type: string type: string
@ -7846,6 +7849,8 @@ spec:
type: object type: object
jobDisplayName: jobDisplayName:
type: string type: string
jobId:
type: string
jobRepositoryName: jobRepositoryName:
type: string type: string
jobRequestId: jobRequestId:

View File

@ -192,10 +192,12 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
case err == nil: case err == nil:
// create secret if not created // create secret if not created
log.Info("Creating new ephemeral runner secret for jitconfig.") log.Info("Creating new ephemeral runner secret for jitconfig.")
if err := r.createSecret(ctx, ephemeralRunner, jitConfig, log); err != nil { jitSecret, err := r.createSecret(ctx, ephemeralRunner, jitConfig, log)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create secret: %w", err) return ctrl.Result{}, fmt.Errorf("failed to create secret: %w", err)
} }
log.Info("Created new ephemeral runner secret for jitconfig.") log.Info("Created new ephemeral runner secret for jitconfig.")
secret = jitSecret
case errors.Is(err, retryableError): case errors.Is(err, retryableError):
log.Info("Encountered retryable error, requeueing", "error", err.Error()) log.Info("Encountered retryable error, requeueing", "error", err.Error())
@ -226,12 +228,15 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{Requeue: true}, nil return ctrl.Result{Requeue: true}, nil
} }
runnerName := string(secret.Data["runnerName"])
if err := patchSubResource(ctx, r.Status(), ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) { if err := patchSubResource(ctx, r.Status(), ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) {
obj.Status.RunnerId = runnerID obj.Status.RunnerId = runnerID
obj.Status.RunnerName = string(secret.Data["runnerName"]) obj.Status.RunnerName = runnerName
}); err != nil { }); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update runner status for RunnerId/RunnerName/RunnerJITConfig: %w", err) return ctrl.Result{}, fmt.Errorf("failed to update runner status for RunnerId/RunnerName/RunnerJITConfig: %w", err)
} }
ephemeralRunner.Status.RunnerId = runnerID
ephemeralRunner.Status.RunnerName = runnerName
log.Info("Updated ephemeral runner status with runnerId and runnerName") log.Info("Updated ephemeral runner status with runnerId and runnerName")
} }
@ -321,6 +326,18 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
case cs.State.Terminated.ExitCode != 0: // failed case cs.State.Terminated.ExitCode != 0: // failed
log.Info("Ephemeral runner container failed", "exitCode", cs.State.Terminated.ExitCode) log.Info("Ephemeral runner container failed", "exitCode", cs.State.Terminated.ExitCode)
if ephemeralRunner.HasJob() {
log.Error(
errors.New("ephemeral runner has a job assigned, but the pod has failed"),
"Ephemeral runner either has faulty entrypoint or something external killing the runner",
)
log.Info("Deleting the ephemeral runner that has a job assigned but the pod has failed")
if err := r.Delete(ctx, ephemeralRunner); err != nil {
log.Error(err, "Failed to delete the ephemeral runner that has a job assigned but the pod has failed")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
if err := r.deletePodAsFailed(ctx, ephemeralRunner, pod, log); err != nil { if err := r.deletePodAsFailed(ctx, ephemeralRunner, pod, log); err != nil {
log.Error(err, "Failed to delete runner pod on failure") log.Error(err, "Failed to delete runner pod on failure")
return ctrl.Result{}, err return ctrl.Result{}, err
@ -328,9 +345,9 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, nil return ctrl.Result{}, nil
default: // succeeded default: // succeeded
log.Info("Ephemeral runner has finished successfully") log.Info("Ephemeral runner has finished successfully, deleting ephemeral runner", "exitCode", cs.State.Terminated.ExitCode)
if err := r.markAsFinished(ctx, ephemeralRunner, log); err != nil { if err := r.Delete(ctx, ephemeralRunner); err != nil {
log.Error(err, "Failed to mark ephemeral runner as finished") log.Error(err, "Failed to delete ephemeral runner after successful completion")
return ctrl.Result{}, err return ctrl.Result{}, err
} }
return ctrl.Result{}, nil return ctrl.Result{}, nil
@ -500,18 +517,6 @@ func (r *EphemeralRunnerReconciler) markAsFailed(ctx context.Context, ephemeralR
return nil return nil
} }
func (r *EphemeralRunnerReconciler) markAsFinished(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, log logr.Logger) error {
log.Info("Updating ephemeral runner status to Finished")
if err := patchSubResource(ctx, r.Status(), ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) {
obj.Status.Phase = corev1.PodSucceeded
}); err != nil {
return fmt.Errorf("failed to update ephemeral runner with status finished: %w", err)
}
log.Info("EphemeralRunner status is marked as Finished")
return nil
}
// deletePodAsFailed is responsible for deleting the pod and updating the .Status.Failures for tracking failure count. // deletePodAsFailed is responsible for deleting the pod and updating the .Status.Failures for tracking failure count.
// It should not be responsible for setting the status to Failed. // It should not be responsible for setting the status to Failed.
func (r *EphemeralRunnerReconciler) deletePodAsFailed(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, pod *corev1.Pod, log logr.Logger) error { func (r *EphemeralRunnerReconciler) deletePodAsFailed(ctx context.Context, ephemeralRunner *v1alpha1.EphemeralRunner, pod *corev1.Pod, log logr.Logger) error {
@ -680,21 +685,21 @@ func (r *EphemeralRunnerReconciler) createPod(ctx context.Context, runner *v1alp
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
func (r *EphemeralRunnerReconciler) createSecret(ctx context.Context, runner *v1alpha1.EphemeralRunner, jitConfig *actions.RunnerScaleSetJitRunnerConfig, log logr.Logger) error { func (r *EphemeralRunnerReconciler) createSecret(ctx context.Context, runner *v1alpha1.EphemeralRunner, jitConfig *actions.RunnerScaleSetJitRunnerConfig, log logr.Logger) (*corev1.Secret, error) {
log.Info("Creating new secret for ephemeral runner") log.Info("Creating new secret for ephemeral runner")
jitSecret := r.newEphemeralRunnerJitSecret(runner, jitConfig) jitSecret := r.newEphemeralRunnerJitSecret(runner, jitConfig)
if err := ctrl.SetControllerReference(runner, jitSecret, r.Scheme); err != nil { if err := ctrl.SetControllerReference(runner, jitSecret, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err) return nil, fmt.Errorf("failed to set controller reference: %w", err)
} }
log.Info("Created new secret spec for ephemeral runner") log.Info("Created new secret spec for ephemeral runner")
if err := r.Create(ctx, jitSecret); err != nil { if err := r.Create(ctx, jitSecret); err != nil {
return fmt.Errorf("failed to create jit secret: %w", err) return nil, fmt.Errorf("failed to create jit secret: %w", err)
} }
log.Info("Created ephemeral runner secret", "secretName", jitSecret.Name) log.Info("Created ephemeral runner secret", "secretName", jitSecret.Name)
return nil return jitSecret, nil
} }
// updateRunStatusFromPod is responsible for updating non-exiting statuses. // updateRunStatusFromPod is responsible for updating non-exiting statuses.

View File

@ -176,7 +176,7 @@ var _ = Describe("EphemeralRunner", func() {
).Should(BeEquivalentTo(ephemeralRunner.Name)) ).Should(BeEquivalentTo(ephemeralRunner.Name))
}) })
It("It should re-create pod on failure", func() { It("It should re-create pod on failure and no job assigned", func() {
pod := new(corev1.Pod) pod := new(corev1.Pod)
Eventually(func() (bool, error) { Eventually(func() (bool, error) {
if err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace}, pod); err != nil { if err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace}, pod); err != nil {
@ -200,6 +200,67 @@ var _ = Describe("EphemeralRunner", func() {
).Should(BeEquivalentTo(true)) ).Should(BeEquivalentTo(true))
}) })
It("It should delete ephemeral runner on failure and job assigned", func() {
er := new(v1alpha1.EphemeralRunner)
// Check if finalizer is added
Eventually(
func() error {
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace}, er)
return err
},
ephemeralRunnerTimeout,
ephemeralRunnerInterval,
).Should(Succeed(), "failed to get ephemeral runner")
// update job id to simulate job assigned
er.Status.JobID = "1"
err := k8sClient.Status().Update(ctx, er)
Expect(err).To(BeNil(), "failed to update ephemeral runner status")
er = new(v1alpha1.EphemeralRunner)
Eventually(
func() (string, error) {
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace}, er)
if err != nil {
return "", err
}
return er.Status.JobID, nil
},
ephemeralRunnerTimeout,
ephemeralRunnerInterval,
).Should(BeEquivalentTo("1"))
pod := new(corev1.Pod)
Eventually(func() (bool, error) {
if err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace}, pod); err != nil {
return false, err
}
return true, nil
}).Should(BeEquivalentTo(true))
// delete pod to simulate failure
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{
Name: v1alpha1.EphemeralRunnerContainerName,
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 1,
},
},
})
err = k8sClient.Status().Update(ctx, pod)
Expect(err).To(BeNil(), "Failed to update pod status")
er = new(v1alpha1.EphemeralRunner)
Eventually(
func() bool {
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace}, er)
return kerrors.IsNotFound(err)
},
ephemeralRunnerTimeout,
ephemeralRunnerInterval,
).Should(BeTrue(), "Ephemeral runner should eventually be deleted")
})
It("It should failed if a pod template is invalid", func() { It("It should failed if a pod template is invalid", func() {
invalideEphemeralRunner := newExampleRunner("invalid-ephemeral-runner", autoscalingNS.Name, configSecret.Name) invalideEphemeralRunner := newExampleRunner("invalid-ephemeral-runner", autoscalingNS.Name, configSecret.Name)
invalideEphemeralRunner.Spec.Spec.PriorityClassName = "notexist" invalideEphemeralRunner.Spec.Spec.PriorityClassName = "notexist"
@ -208,13 +269,22 @@ var _ = Describe("EphemeralRunner", func() {
Expect(err).To(BeNil()) Expect(err).To(BeNil())
updated := new(v1alpha1.EphemeralRunner) updated := new(v1alpha1.EphemeralRunner)
Eventually(func() (corev1.PodPhase, error) { Eventually(
err := k8sClient.Get(ctx, client.ObjectKey{Name: invalideEphemeralRunner.Name, Namespace: invalideEphemeralRunner.Namespace}, updated) func() (corev1.PodPhase, error) {
err := k8sClient.Get(
ctx,
client.ObjectKey{Name: invalideEphemeralRunner.Name, Namespace: invalideEphemeralRunner.Namespace},
updated,
)
if err != nil { if err != nil {
return "", nil return "", nil
} }
return updated.Status.Phase, nil return updated.Status.Phase, nil
}, ephemeralRunnerTimeout, ephemeralRunnerInterval).Should(BeEquivalentTo(corev1.PodFailed)) },
ephemeralRunnerTimeout,
ephemeralRunnerInterval,
).Should(BeEquivalentTo(corev1.PodFailed))
Expect(updated.Status.Reason).Should(Equal("InvalidPod")) Expect(updated.Status.Reason).Should(Equal("InvalidPod"))
Expect(updated.Status.Message).Should(Equal("Failed to create the pod: pods \"invalid-ephemeral-runner\" is forbidden: no PriorityClass with name notexist was found")) Expect(updated.Status.Message).Should(Equal("Failed to create the pod: pods \"invalid-ephemeral-runner\" is forbidden: no PriorityClass with name notexist was found"))
}) })
@ -775,7 +845,7 @@ var _ = Describe("EphemeralRunner", func() {
startManagers(GinkgoT(), mgr) startManagers(GinkgoT(), mgr)
}) })
It("It should set the Phase to Succeeded", func() { It("It should delete EphemeralRunner when pod exits successfully", func() {
ephemeralRunner := newExampleRunner("test-runner", autoscalingNS.Name, configSecret.Name) ephemeralRunner := newExampleRunner("test-runner", autoscalingNS.Name, configSecret.Name)
err := k8sClient.Create(ctx, ephemeralRunner) err := k8sClient.Create(ctx, ephemeralRunner)
@ -801,13 +871,18 @@ var _ = Describe("EphemeralRunner", func() {
Expect(err).To(BeNil(), "failed to update pod status") Expect(err).To(BeNil(), "failed to update pod status")
updated := new(v1alpha1.EphemeralRunner) updated := new(v1alpha1.EphemeralRunner)
Eventually(func() (corev1.PodPhase, error) { Eventually(
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace}, updated) func() bool {
if err != nil { err := k8sClient.Get(
return "", nil ctx,
} client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace},
return updated.Status.Phase, nil updated,
}, ephemeralRunnerTimeout, ephemeralRunnerInterval).Should(BeEquivalentTo(corev1.PodSucceeded)) )
return kerrors.IsNotFound(err)
},
ephemeralRunnerTimeout,
ephemeralRunnerInterval,
).Should(BeTrue())
}) })
}) })

View File

@ -453,8 +453,13 @@ func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Co
continue continue
} }
if !isDone && ephemeralRunner.Status.JobRequestId > 0 { if !isDone && ephemeralRunner.HasJob() {
log.Info("Skipping ephemeral runner since it is running a job", "name", ephemeralRunner.Name, "jobRequestId", ephemeralRunner.Status.JobRequestId) log.Info(
"Skipping ephemeral runner since it is running a job",
"name", ephemeralRunner.Name,
"workflowRunId", ephemeralRunner.Status.WorkflowRunId,
"jobId", ephemeralRunner.Status.JobID,
)
continue continue
} }

View File

@ -37,7 +37,7 @@ type JobAssigned struct {
} }
type JobStarted struct { type JobStarted struct {
RunnerId int `json:"runnerId"` RunnerID int `json:"runnerId"`
RunnerName string `json:"runnerName"` RunnerName string `json:"runnerName"`
JobMessageBase JobMessageBase
} }
@ -55,12 +55,13 @@ type JobMessageType struct {
type JobMessageBase struct { type JobMessageBase struct {
JobMessageType JobMessageType
RunnerRequestId int64 `json:"runnerRequestId"` RunnerRequestID int64 `json:"runnerRequestId"`
RepositoryName string `json:"repositoryName"` RepositoryName string `json:"repositoryName"`
OwnerName string `json:"ownerName"` OwnerName string `json:"ownerName"`
JobID string `json:"jobId"`
JobWorkflowRef string `json:"jobWorkflowRef"` JobWorkflowRef string `json:"jobWorkflowRef"`
JobDisplayName string `json:"jobDisplayName"` JobDisplayName string `json:"jobDisplayName"`
WorkflowRunId int64 `json:"workflowRunId"` WorkflowRunID int64 `json:"workflowRunId"`
EventName string `json:"eventName"` EventName string `json:"eventName"`
RequestLabels []string `json:"requestLabels"` RequestLabels []string `json:"requestLabels"`
QueueTime time.Time `json:"queueTime"` QueueTime time.Time `json:"queueTime"`