Add JobID and actually rely on JobID

This commit is contained in:
Nikola Jokic 2025-09-12 18:16:50 +02:00
parent ca5f25b388
commit 9f0c174c6d
No known key found for this signature in database
GPG Key ID: E4104494F9B8DDF6
10 changed files with 70 additions and 42 deletions

View File

@ -34,6 +34,7 @@ const EphemeralRunnerContainerName = "runner"
// +kubebuilder:printcolumn:JSONPath=".status.jobWorkflowRef",name=JobWorkflowRef,type=string // +kubebuilder:printcolumn:JSONPath=".status.jobWorkflowRef",name=JobWorkflowRef,type=string
// +kubebuilder:printcolumn:JSONPath=".status.workflowRunId",name=WorkflowRunId,type=number // +kubebuilder:printcolumn:JSONPath=".status.workflowRunId",name=WorkflowRunId,type=number
// +kubebuilder:printcolumn:JSONPath=".status.jobDisplayName",name=JobDisplayName,type=string // +kubebuilder:printcolumn:JSONPath=".status.jobDisplayName",name=JobDisplayName,type=string
// +kubebuilder:printcolumn:JSONPath=".status.jobId",name=JobId,type=string
// +kubebuilder:printcolumn:JSONPath=".status.message",name=Message,type=string // +kubebuilder:printcolumn:JSONPath=".status.message",name=Message,type=string
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
@ -51,7 +52,7 @@ func (er *EphemeralRunner) IsDone() bool {
} }
func (er *EphemeralRunner) HasJob() bool { func (er *EphemeralRunner) HasJob() bool {
return er.Status.WorkflowRunId != 0 return len(er.Status.JobID) > 0
} }
func (er *EphemeralRunner) HasContainerHookConfigured() bool { func (er *EphemeralRunner) HasContainerHookConfigured() bool {
@ -156,6 +157,9 @@ type EphemeralRunnerStatus struct {
// +optional // +optional
JobRequestId int64 `json:"jobRequestId,omitempty"` JobRequestId int64 `json:"jobRequestId,omitempty"`
// +optional
JobID string `json:"jobId,omitempty"`
// +optional // +optional
JobRepositoryName string `json:"jobRepositoryName,omitempty"` JobRepositoryName string `json:"jobRepositoryName,omitempty"`

View File

@ -36,6 +36,9 @@ spec:
- jsonPath: .status.jobDisplayName - jsonPath: .status.jobDisplayName
name: JobDisplayName name: JobDisplayName
type: string type: string
- jsonPath: .status.jobId
name: JobId
type: string
- jsonPath: .status.message - jsonPath: .status.message
name: Message name: Message
type: string type: string
@ -7846,6 +7849,8 @@ spec:
type: object type: object
jobDisplayName: jobDisplayName:
type: string type: string
jobId:
type: string
jobRepositoryName: jobRepositoryName:
type: string type: string
jobRequestId: jobRequestId:

View File

@ -361,7 +361,7 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
return nil, fmt.Errorf("failed to decode job available: %w", err) return nil, fmt.Errorf("failed to decode job available: %w", err)
} }
l.logger.Info("Job available message received", "jobId", jobAvailable.RunnerRequestId) l.logger.Info("Job available message received", "jobId", jobAvailable.JobID)
parsedMsg.jobsAvailable = append(parsedMsg.jobsAvailable, &jobAvailable) parsedMsg.jobsAvailable = append(parsedMsg.jobsAvailable, &jobAvailable)
case messageTypeJobAssigned: case messageTypeJobAssigned:
@ -370,14 +370,14 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
return nil, fmt.Errorf("failed to decode job assigned: %w", err) return nil, fmt.Errorf("failed to decode job assigned: %w", err)
} }
l.logger.Info("Job assigned message received", "jobId", jobAssigned.RunnerRequestId) l.logger.Info("Job assigned message received", "jobId", jobAssigned.JobID)
case messageTypeJobStarted: case messageTypeJobStarted:
var jobStarted actions.JobStarted var jobStarted actions.JobStarted
if err := json.Unmarshal(msg, &jobStarted); err != nil { if err := json.Unmarshal(msg, &jobStarted); err != nil {
return nil, fmt.Errorf("could not decode job started message. %w", err) return nil, fmt.Errorf("could not decode job started message. %w", err)
} }
l.logger.Info("Job started message received.", "RequestId", jobStarted.RunnerRequestId, "RunnerId", jobStarted.RunnerId) l.logger.Info("Job started message received.", "JobID", jobStarted.JobID, "RunnerId", jobStarted.RunnerID)
parsedMsg.jobsStarted = append(parsedMsg.jobsStarted, &jobStarted) parsedMsg.jobsStarted = append(parsedMsg.jobsStarted, &jobStarted)
case messageTypeJobCompleted: case messageTypeJobCompleted:
@ -386,7 +386,13 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
return nil, fmt.Errorf("failed to decode job completed: %w", err) return nil, fmt.Errorf("failed to decode job completed: %w", err)
} }
l.logger.Info("Job completed message received.", "RequestId", jobCompleted.RunnerRequestId, "Result", jobCompleted.Result, "RunnerId", jobCompleted.RunnerId, "RunnerName", jobCompleted.RunnerName) l.logger.Info(
"Job completed message received.",
"JobID", jobCompleted.JobID,
"Result", jobCompleted.Result,
"RunnerId", jobCompleted.RunnerId,
"RunnerName", jobCompleted.RunnerName,
)
parsedMsg.jobsCompleted = append(parsedMsg.jobsCompleted, &jobCompleted) parsedMsg.jobsCompleted = append(parsedMsg.jobsCompleted, &jobCompleted)
default: default:
@ -400,7 +406,7 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*actions.JobAvailable) ([]int64, error) { func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*actions.JobAvailable) ([]int64, error) {
ids := make([]int64, 0, len(jobsAvailable)) ids := make([]int64, 0, len(jobsAvailable))
for _, job := range jobsAvailable { for _, job := range jobsAvailable {
ids = append(ids, job.RunnerRequestId) ids = append(ids, job.RunnerRequestID)
} }
l.logger.Info("Acquiring jobs", "count", len(ids), "requestIds", fmt.Sprint(ids)) l.logger.Info("Acquiring jobs", "count", len(ids), "requestIds", fmt.Sprint(ids))

View File

@ -627,17 +627,17 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
availableJobs := []*actions.JobAvailable{ availableJobs := []*actions.JobAvailable{
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
} }
@ -678,17 +678,17 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
availableJobs := []*actions.JobAvailable{ availableJobs := []*actions.JobAvailable{
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
} }
@ -724,17 +724,17 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
availableJobs := []*actions.JobAvailable{ availableJobs := []*actions.JobAvailable{
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
} }
@ -809,17 +809,17 @@ func TestListener_acquireAvailableJobs(t *testing.T) {
availableJobs := []*actions.JobAvailable{ availableJobs := []*actions.JobAvailable{
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
{ {
JobMessageBase: actions.JobMessageBase{ JobMessageBase: actions.JobMessageBase{
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
} }
@ -881,7 +881,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobAvailable, MessageType: messageTypeJobAvailable,
}, },
RunnerRequestId: 1, RunnerRequestID: 1,
}, },
}, },
{ {
@ -890,7 +890,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobAvailable, MessageType: messageTypeJobAvailable,
}, },
RunnerRequestId: 2, RunnerRequestID: 2,
}, },
}, },
} }
@ -904,7 +904,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobAssigned, MessageType: messageTypeJobAssigned,
}, },
RunnerRequestId: 3, RunnerRequestID: 3,
}, },
}, },
{ {
@ -912,7 +912,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobAssigned, MessageType: messageTypeJobAssigned,
}, },
RunnerRequestId: 4, RunnerRequestID: 4,
}, },
}, },
} }
@ -926,9 +926,9 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobStarted, MessageType: messageTypeJobStarted,
}, },
RunnerRequestId: 5, RunnerRequestID: 5,
}, },
RunnerId: 2, RunnerID: 2,
RunnerName: "runner2", RunnerName: "runner2",
}, },
} }
@ -942,7 +942,7 @@ func TestListener_parseMessage(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobCompleted, MessageType: messageTypeJobCompleted,
}, },
RunnerRequestId: 6, RunnerRequestID: 6,
}, },
Result: "success", Result: "success",
RunnerId: 1, RunnerId: 1,

View File

@ -123,9 +123,9 @@ func TestHandleMessageMetrics(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobStarted, MessageType: messageTypeJobStarted,
}, },
RunnerRequestId: 8, RunnerRequestID: 8,
}, },
RunnerId: 3, RunnerID: 3,
RunnerName: "runner3", RunnerName: "runner3",
}, },
} }
@ -139,7 +139,7 @@ func TestHandleMessageMetrics(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobCompleted, MessageType: messageTypeJobCompleted,
}, },
RunnerRequestId: 6, RunnerRequestID: 6,
}, },
Result: "success", Result: "success",
RunnerId: 1, RunnerId: 1,
@ -150,7 +150,7 @@ func TestHandleMessageMetrics(t *testing.T) {
JobMessageType: actions.JobMessageType{ JobMessageType: actions.JobMessageType{
MessageType: messageTypeJobCompleted, MessageType: messageTypeJobCompleted,
}, },
RunnerRequestId: 7, RunnerRequestID: 7,
}, },
Result: "success", Result: "success",
RunnerId: 2, RunnerId: 2,

View File

@ -100,10 +100,11 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
"runnerName", jobInfo.RunnerName, "runnerName", jobInfo.RunnerName,
"ownerName", jobInfo.OwnerName, "ownerName", jobInfo.OwnerName,
"repoName", jobInfo.RepositoryName, "repoName", jobInfo.RepositoryName,
"jobId", jobInfo.JobID,
"workflowRef", jobInfo.JobWorkflowRef, "workflowRef", jobInfo.JobWorkflowRef,
"workflowRunId", jobInfo.WorkflowRunId, "workflowRunId", jobInfo.WorkflowRunID,
"jobDisplayName", jobInfo.JobDisplayName, "jobDisplayName", jobInfo.JobDisplayName,
"requestId", jobInfo.RunnerRequestId) "requestId", jobInfo.RunnerRequestID)
original, err := json.Marshal(&v1alpha1.EphemeralRunner{}) original, err := json.Marshal(&v1alpha1.EphemeralRunner{})
if err != nil { if err != nil {
@ -113,9 +114,10 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
patch, err := json.Marshal( patch, err := json.Marshal(
&v1alpha1.EphemeralRunner{ &v1alpha1.EphemeralRunner{
Status: v1alpha1.EphemeralRunnerStatus{ Status: v1alpha1.EphemeralRunnerStatus{
JobRequestId: jobInfo.RunnerRequestId, JobRequestId: jobInfo.RunnerRequestID,
JobRepositoryName: fmt.Sprintf("%s/%s", jobInfo.OwnerName, jobInfo.RepositoryName), JobRepositoryName: fmt.Sprintf("%s/%s", jobInfo.OwnerName, jobInfo.RepositoryName),
WorkflowRunId: jobInfo.WorkflowRunId, JobID: jobInfo.JobID,
WorkflowRunId: jobInfo.WorkflowRunID,
JobWorkflowRef: jobInfo.JobWorkflowRef, JobWorkflowRef: jobInfo.JobWorkflowRef,
JobDisplayName: jobInfo.JobDisplayName, JobDisplayName: jobInfo.JobDisplayName,
}, },

View File

@ -36,6 +36,9 @@ spec:
- jsonPath: .status.jobDisplayName - jsonPath: .status.jobDisplayName
name: JobDisplayName name: JobDisplayName
type: string type: string
- jsonPath: .status.jobId
name: JobId
type: string
- jsonPath: .status.message - jsonPath: .status.message
name: Message name: Message
type: string type: string
@ -7846,6 +7849,8 @@ spec:
type: object type: object
jobDisplayName: jobDisplayName:
type: string type: string
jobId:
type: string
jobRepositoryName: jobRepositoryName:
type: string type: string
jobRequestId: jobRequestId:

View File

@ -213,22 +213,22 @@ var _ = Describe("EphemeralRunner", func() {
).Should(Succeed(), "failed to get ephemeral runner") ).Should(Succeed(), "failed to get ephemeral runner")
// update job id to simulate job assigned // update job id to simulate job assigned
er.Status.WorkflowRunId = 1 er.Status.JobID = "1"
err := k8sClient.Status().Update(ctx, er) err := k8sClient.Status().Update(ctx, er)
Expect(err).To(BeNil(), "failed to update ephemeral runner status") Expect(err).To(BeNil(), "failed to update ephemeral runner status")
er = new(v1alpha1.EphemeralRunner) er = new(v1alpha1.EphemeralRunner)
Eventually( Eventually(
func() (int64, error) { func() (string, error) {
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace}, er) err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunner.Name, Namespace: ephemeralRunner.Namespace}, er)
if err != nil { if err != nil {
return 0, err return "", err
} }
return er.Status.WorkflowRunId, nil return er.Status.JobID, nil
}, },
ephemeralRunnerTimeout, ephemeralRunnerTimeout,
ephemeralRunnerInterval, ephemeralRunnerInterval,
).Should(BeEquivalentTo(int64(1))) ).Should(BeEquivalentTo("1"))
pod := new(corev1.Pod) pod := new(corev1.Pod)
Eventually(func() (bool, error) { Eventually(func() (bool, error) {

View File

@ -454,7 +454,12 @@ func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Co
} }
if !isDone && ephemeralRunner.HasJob() { if !isDone && ephemeralRunner.HasJob() {
log.Info("Skipping ephemeral runner since it is running a job", "name", ephemeralRunner.Name, "workflowRunId", ephemeralRunner.Status.WorkflowRunId) log.Info(
"Skipping ephemeral runner since it is running a job",
"name", ephemeralRunner.Name,
"workflowRunId", ephemeralRunner.Status.WorkflowRunId,
"jobId", ephemeralRunner.Status.JobID,
)
continue continue
} }

View File

@ -37,7 +37,7 @@ type JobAssigned struct {
} }
type JobStarted struct { type JobStarted struct {
RunnerId int `json:"runnerId"` RunnerID int `json:"runnerId"`
RunnerName string `json:"runnerName"` RunnerName string `json:"runnerName"`
JobMessageBase JobMessageBase
} }
@ -55,12 +55,13 @@ type JobMessageType struct {
type JobMessageBase struct { type JobMessageBase struct {
JobMessageType JobMessageType
RunnerRequestId int64 `json:"runnerRequestId"` RunnerRequestID int64 `json:"runnerRequestId"`
RepositoryName string `json:"repositoryName"` RepositoryName string `json:"repositoryName"`
OwnerName string `json:"ownerName"` OwnerName string `json:"ownerName"`
JobID string `json:"jobId"`
JobWorkflowRef string `json:"jobWorkflowRef"` JobWorkflowRef string `json:"jobWorkflowRef"`
JobDisplayName string `json:"jobDisplayName"` JobDisplayName string `json:"jobDisplayName"`
WorkflowRunId int64 `json:"workflowRunId"` WorkflowRunID int64 `json:"workflowRunId"`
EventName string `json:"eventName"` EventName string `json:"eventName"`
RequestLabels []string `json:"requestLabels"` RequestLabels []string `json:"requestLabels"`
QueueTime time.Time `json:"queueTime"` QueueTime time.Time `json:"queueTime"`