Add request labels to progress traces log

This commit is contained in:
Yusuke Kuoka 2024-08-29 04:54:12 +00:00
parent 8237ef2cba
commit ae7ade0191
1 changed files with 108 additions and 1 deletions

View File

@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"os"
"sync"
"time"
"github.com/actions/actions-runner-controller/cmd/ghalistener/metrics"
@ -15,6 +16,7 @@ import (
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
const (
@ -186,6 +188,7 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error {
// Remove cancellation from the context to avoid cancelling the message handling.
if err := l.handleMessage(context.WithoutCancel(ctx), handler, msg); err != nil {
span.End()
return fmt.Errorf("failed to handle message: %w", err)
}
@ -193,6 +196,108 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error {
}
}
type tracedJob struct {
jobSpan tracer.Span
runnerSetAssignSpan tracer.Span
runnerAssignSpan tracer.Span
runnerRunJobSpan tracer.Span
}
var mu sync.Mutex
var tracedJobs map[string]*tracedJob
func (l *Listener) progressTraces(parsedMsg *parsedMessage) {
mu.Lock()
defer mu.Unlock()
if tracedJobs == nil {
tracedJobs = make(map[string]*tracedJob)
}
for _, j := range parsedMsg.jobsAvailable {
jobSpan := tracer.StartSpan(
"GitHub Actions Workflow Run",
tracer.StartTime(j.QueueTime),
tracer.Tag("runner_request_id", fmt.Sprintf("%d", j.RunnerRequestId)),
tracer.Tag("repository_name", j.RepositoryName),
tracer.Tag("owner_name", j.OwnerName),
tracer.Tag("workflow_ref", fmt.Sprintf("%s", j.JobWorkflowRef)),
tracer.Tag("workflow_run_id", fmt.Sprintf("%d", j.WorkflowRunId)),
)
runnerSetAssignSpan := tracer.StartSpan(
"runnerSetAssign",
tracer.ChildOf(jobSpan.Context()),
tracer.StartTime(j.QueueTime),
tracer.Tag("runner_request_id", fmt.Sprintf("%d", j.RunnerRequestId)),
tracer.Tag("repository_name", j.RepositoryName),
tracer.Tag("owner_name", j.OwnerName),
)
reqID := fmt.Sprintf("%d", j.RunnerRequestId)
tracedJobs[reqID] = &tracedJob{
jobSpan: jobSpan,
runnerSetAssignSpan: runnerSetAssignSpan,
}
l.logger.Info("Listener.progressTraces: Job available", "queueTime", j.QueueTime, "runnerAssignTime", j.ScaleSetAssignTime, "requestLabels", j.RequestLabels, "now", time.Now())
}
for _, j := range parsedMsg.jobsStarted {
reqID := fmt.Sprintf("%d", j.RunnerRequestId)
t := tracedJobs[reqID]
if t == nil {
s := tracer.StartSpan(fmt.Sprintf("%s", j.JobWorkflowRef), tracer.StartTime(j.QueueTime))
tracedJobs[reqID] = &tracedJob{jobSpan: s}
l.logger.Error(errors.New("job and runnerSetAssign spans have not started yet"), "runnerRequestId", j.RunnerRequestId)
} else {
if t.runnerSetAssignSpan == nil {
l.logger.Error(errors.New("runnerSetAssignSpan has not started yet"), "runnerRequestId", j.RunnerRequestId)
} else {
t.runnerSetAssignSpan.Finish(tracer.FinishTime(j.RunnerAssignTime))
}
t.runnerAssignSpan = tracer.StartSpan(
"runnerAssign",
tracer.ChildOf(t.jobSpan.Context()),
tracer.StartTime(j.RunnerAssignTime),
)
now := time.Now()
t.runnerAssignSpan.Finish(tracer.FinishTime(now))
t.runnerRunJobSpan = tracer.StartSpan(
"runnerRunJob",
tracer.ChildOf(t.jobSpan.Context()),
tracer.StartTime(now),
)
l.logger.Info("Listener.progressTraces: Job started", "queueTime", j.QueueTime, "runnerAssignTime", j.RunnerAssignTime, "requestLabels", j.RequestLabels, "now", now)
}
}
for _, j := range parsedMsg.jobsCompleted {
reqID := fmt.Sprintf("%d", j.RunnerRequestId)
t := tracedJobs[reqID]
if t == nil {
s := tracer.StartSpan(fmt.Sprintf("%s", j.JobWorkflowRef), tracer.StartTime(j.QueueTime))
t = &tracedJob{jobSpan: s}
tracedJobs[reqID] = t
l.logger.Error(errors.New("job, runnerSetAssign and runnerAssign spans have not started yet"), "runnerRequestId", j.RunnerRequestId)
} else {
if t.runnerRunJobSpan == nil {
l.logger.Error(errors.New("runnerRunJobSPan has not started yet"), "runnerRequestId", j.RunnerRequestId)
} else {
t.runnerRunJobSpan.Finish(tracer.FinishTime(j.FinishTime))
}
}
s := t.jobSpan
s.Finish(tracer.FinishTime(j.FinishTime))
delete(tracedJobs, reqID)
}
}
func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *actions.RunnerScaleSetMessage) error {
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.handleMessage")
defer span.End()
@ -203,6 +308,8 @@ func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *acti
}
l.metrics.PublishStatistics(parsedMsg.statistics)
l.progressTraces(parsedMsg)
if len(parsedMsg.jobsAvailable) > 0 {
acquiredJobIDs, err := l.acquireAvailableJobs(ctx, parsedMsg.jobsAvailable)
if err != nil {
@ -425,7 +532,7 @@ func (l *Listener) parseMessage(ctx context.Context, msg *actions.RunnerScaleSet
}
func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*actions.JobAvailable) ([]int64, error) {
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.acquireAvailableJobs")
ctx, span := otel.Tracer("arc").Start(ctx, "Listener.acquireAvailableJobs", trace.WithLinks())
defer span.End()
ids := make([]int64, 0, len(jobsAvailable))