diff --git a/cmd/ghalistener/listener/listener.go b/cmd/ghalistener/listener/listener.go index 56b1e237..d97d7149 100644 --- a/cmd/ghalistener/listener/listener.go +++ b/cmd/ghalistener/listener/listener.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "os" + "sync" "time" "github.com/actions/actions-runner-controller/cmd/ghalistener/metrics" @@ -15,6 +16,7 @@ import ( "github.com/google/uuid" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) const ( @@ -186,6 +188,7 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error { // Remove cancellation from the context to avoid cancelling the message handling. if err := l.handleMessage(context.WithoutCancel(ctx), handler, msg); err != nil { + span.End() return fmt.Errorf("failed to handle message: %w", err) } @@ -193,6 +196,108 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error { } } +type tracedJob struct { + jobSpan tracer.Span + runnerSetAssignSpan tracer.Span + runnerAssignSpan tracer.Span + runnerRunJobSpan tracer.Span +} + +var mu sync.Mutex +var tracedJobs map[string]*tracedJob + +func (l *Listener) progressTraces(parsedMsg *parsedMessage) { + mu.Lock() + defer mu.Unlock() + + if tracedJobs == nil { + tracedJobs = make(map[string]*tracedJob) + } + + for _, j := range parsedMsg.jobsAvailable { + jobSpan := tracer.StartSpan( + "GitHub Actions Workflow Run", + tracer.StartTime(j.QueueTime), + tracer.Tag("runner_request_id", fmt.Sprintf("%d", j.RunnerRequestId)), + tracer.Tag("repository_name", j.RepositoryName), + tracer.Tag("owner_name", j.OwnerName), + tracer.Tag("workflow_ref", fmt.Sprintf("%s", j.JobWorkflowRef)), + tracer.Tag("workflow_run_id", fmt.Sprintf("%d", j.WorkflowRunId)), + ) + + runnerSetAssignSpan := tracer.StartSpan( + "runnerSetAssign", + tracer.ChildOf(jobSpan.Context()), + tracer.StartTime(j.QueueTime), + tracer.Tag("runner_request_id", fmt.Sprintf("%d", j.RunnerRequestId)), + tracer.Tag("repository_name", j.RepositoryName), + tracer.Tag("owner_name", j.OwnerName), + ) + + reqID := fmt.Sprintf("%d", j.RunnerRequestId) + tracedJobs[reqID] = &tracedJob{ + jobSpan: jobSpan, + runnerSetAssignSpan: runnerSetAssignSpan, + } + + l.logger.Info("Listener.progressTraces: Job available", "queueTime", j.QueueTime, "runnerAssignTime", j.ScaleSetAssignTime, "requestLabels", j.RequestLabels, "now", time.Now()) + } + + for _, j := range parsedMsg.jobsStarted { + reqID := fmt.Sprintf("%d", j.RunnerRequestId) + t := tracedJobs[reqID] + if t == nil { + s := tracer.StartSpan(fmt.Sprintf("%s", j.JobWorkflowRef), tracer.StartTime(j.QueueTime)) + tracedJobs[reqID] = &tracedJob{jobSpan: s} + + l.logger.Error(errors.New("job and runnerSetAssign spans have not started yet"), "runnerRequestId", j.RunnerRequestId) + } else { + if t.runnerSetAssignSpan == nil { + l.logger.Error(errors.New("runnerSetAssignSpan has not started yet"), "runnerRequestId", j.RunnerRequestId) + } else { + t.runnerSetAssignSpan.Finish(tracer.FinishTime(j.RunnerAssignTime)) + } + + t.runnerAssignSpan = tracer.StartSpan( + "runnerAssign", + tracer.ChildOf(t.jobSpan.Context()), + tracer.StartTime(j.RunnerAssignTime), + ) + now := time.Now() + t.runnerAssignSpan.Finish(tracer.FinishTime(now)) + + t.runnerRunJobSpan = tracer.StartSpan( + "runnerRunJob", + tracer.ChildOf(t.jobSpan.Context()), + tracer.StartTime(now), + ) + + l.logger.Info("Listener.progressTraces: Job started", "queueTime", j.QueueTime, "runnerAssignTime", j.RunnerAssignTime, "requestLabels", j.RequestLabels, "now", now) + } + } + + for _, j := range parsedMsg.jobsCompleted { + reqID := fmt.Sprintf("%d", j.RunnerRequestId) + t := tracedJobs[reqID] + if t == nil { + s := tracer.StartSpan(fmt.Sprintf("%s", j.JobWorkflowRef), tracer.StartTime(j.QueueTime)) + t = &tracedJob{jobSpan: s} + tracedJobs[reqID] = t + + l.logger.Error(errors.New("job, runnerSetAssign and runnerAssign spans have not started yet"), "runnerRequestId", j.RunnerRequestId) + } else { + if t.runnerRunJobSpan == nil { + l.logger.Error(errors.New("runnerRunJobSPan has not started yet"), "runnerRequestId", j.RunnerRequestId) + } else { + t.runnerRunJobSpan.Finish(tracer.FinishTime(j.FinishTime)) + } + } + s := t.jobSpan + s.Finish(tracer.FinishTime(j.FinishTime)) + delete(tracedJobs, reqID) + } +} + func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *actions.RunnerScaleSetMessage) error { ctx, span := otel.Tracer("arc").Start(ctx, "Listener.handleMessage") defer span.End() @@ -203,6 +308,8 @@ func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *acti } l.metrics.PublishStatistics(parsedMsg.statistics) + l.progressTraces(parsedMsg) + if len(parsedMsg.jobsAvailable) > 0 { acquiredJobIDs, err := l.acquireAvailableJobs(ctx, parsedMsg.jobsAvailable) if err != nil { @@ -425,7 +532,7 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet } func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*actions.JobAvailable) ([]int64, error) { - ctx, span := otel.Tracer("arc").Start(ctx, "Listener.acquireAvailableJobs") + ctx, span := otel.Tracer("arc").Start(ctx, "Listener.acquireAvailableJobs", trace.WithLinks()) defer span.End() ids := make([]int64, 0, len(jobsAvailable))