Merge pull request #1 from halradaideh/feature/workflow-metadata-labels
Add workflow metadata labels to runner pods for cost tracking
This commit is contained in:
commit
1123f8d286
|
|
@ -31,7 +31,7 @@ metadata:
|
|||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["pods"]
|
||||
verbs: ["get", "list", "create", "delete"]
|
||||
verbs: ["get", "list", "create", "delete", "update", "patch"]
|
||||
- apiGroups: [""]
|
||||
resources: ["pods/exec"]
|
||||
verbs: ["get", "create"]
|
||||
|
|
|
|||
|
|
@ -35,6 +35,8 @@ rules:
|
|||
- create
|
||||
- delete
|
||||
- get
|
||||
- update
|
||||
- patch
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
|
|
|
|||
|
|
@ -4,6 +4,9 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/actions/actions-runner-controller/apis/actions.github.com/v1alpha1"
|
||||
"github.com/actions/actions-runner-controller/cmd/ghalistener/listener"
|
||||
|
|
@ -12,6 +15,7 @@ import (
|
|||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/go-logr/logr"
|
||||
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
|
|
@ -19,6 +23,38 @@ import (
|
|||
|
||||
const workerName = "kubernetesworker"
|
||||
|
||||
// sanitizeLabelValue sanitizes a string to be a valid Kubernetes label value
|
||||
// Kubernetes label values must be alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character
|
||||
func sanitizeLabelValue(value string) string {
|
||||
if value == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Replace invalid characters with hyphens
|
||||
reg := regexp.MustCompile(`[^a-zA-Z0-9._-]`)
|
||||
sanitized := reg.ReplaceAllString(value, "-")
|
||||
|
||||
// Remove consecutive hyphens
|
||||
reg = regexp.MustCompile(`-+`)
|
||||
sanitized = reg.ReplaceAllString(sanitized, "-")
|
||||
|
||||
// Ensure it starts and ends with alphanumeric character
|
||||
sanitized = strings.Trim(sanitized, "-._")
|
||||
|
||||
// If empty after sanitization, use a default value
|
||||
if sanitized == "" {
|
||||
sanitized = "unknown"
|
||||
}
|
||||
|
||||
// Truncate to max length (63 characters for Kubernetes labels)
|
||||
if len(sanitized) > 63 {
|
||||
sanitized = sanitized[:63]
|
||||
sanitized = strings.Trim(sanitized, "-._")
|
||||
}
|
||||
|
||||
return sanitized
|
||||
}
|
||||
|
||||
type Option func(*Worker)
|
||||
|
||||
func WithLogger(logger logr.Logger) Option {
|
||||
|
|
@ -28,6 +64,12 @@ func WithLogger(logger logr.Logger) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithClientset(clientset *kubernetes.Clientset) Option {
|
||||
return func(w *Worker) {
|
||||
w.clientset = clientset
|
||||
}
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
EphemeralRunnerSetNamespace string
|
||||
EphemeralRunnerSetName string
|
||||
|
|
@ -54,22 +96,26 @@ func New(config Config, options ...Option) (*Worker, error) {
|
|||
patchSeq: -1,
|
||||
}
|
||||
|
||||
conf, err := rest.InClusterConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.clientset = clientset
|
||||
|
||||
// Apply options first to see if clientset is provided
|
||||
for _, option := range options {
|
||||
option(w)
|
||||
}
|
||||
|
||||
// Only create clientset if not already provided
|
||||
if w.clientset == nil {
|
||||
conf, err := rest.InClusterConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.clientset = clientset
|
||||
}
|
||||
|
||||
if err := w.applyDefaults(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -148,12 +194,82 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
|
|||
if err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
w.logger.Info("Ephemeral runner not found, skipping patching of ephemeral runner status", "runnerName", jobInfo.RunnerName)
|
||||
return nil
|
||||
} else {
|
||||
return fmt.Errorf("could not patch ephemeral runner status, patch JSON: %s, error: %w", string(mergePatch), err)
|
||||
}
|
||||
return fmt.Errorf("could not patch ephemeral runner status, patch JSON: %s, error: %w", string(mergePatch), err)
|
||||
} else {
|
||||
w.logger.Info("Ephemeral runner status updated with the merge patch successfully.")
|
||||
}
|
||||
|
||||
w.logger.Info("Ephemeral runner status updated with the merge patch successfully.")
|
||||
// Update pod labels with workflow metadata for cost tracking
|
||||
w.logger.Info("Attempting to update pod labels with workflow metadata",
|
||||
"runnerName", jobInfo.RunnerName,
|
||||
"jobID", jobInfo.JobID,
|
||||
"repository", fmt.Sprintf("%s/%s", jobInfo.OwnerName, jobInfo.RepositoryName))
|
||||
|
||||
if err := w.updatePodLabelsWithWorkflowMetadata(ctx, jobInfo); err != nil {
|
||||
w.logger.Error(err, "Failed to update pod labels with workflow metadata", "runnerName", jobInfo.RunnerName)
|
||||
// Don't return error as this is not critical for job execution
|
||||
} else {
|
||||
w.logger.Info("Successfully updated pod labels with workflow metadata", "runnerName", jobInfo.RunnerName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updatePodLabelsWithWorkflowMetadata updates pod labels with workflow metadata for cost tracking
|
||||
func (w *Worker) updatePodLabelsWithWorkflowMetadata(ctx context.Context, jobInfo *actions.JobStarted) error {
|
||||
w.logger.Info("Starting pod label update",
|
||||
"runnerName", jobInfo.RunnerName,
|
||||
"namespace", w.config.EphemeralRunnerSetNamespace)
|
||||
|
||||
// Create labels map for workflow metadata
|
||||
workflowLabels := make(map[string]string)
|
||||
workflowLabels["github.com/repository"] = sanitizeLabelValue(fmt.Sprintf("%s-%s", jobInfo.OwnerName, jobInfo.RepositoryName))
|
||||
workflowLabels["github.com/workflow"] = sanitizeLabelValue(jobInfo.JobWorkflowRef)
|
||||
workflowLabels["github.com/run-id"] = strconv.FormatInt(jobInfo.WorkflowRunID, 10)
|
||||
workflowLabels["github.com/job"] = sanitizeLabelValue(jobInfo.JobDisplayName)
|
||||
workflowLabels["github.com/job-id"] = sanitizeLabelValue(jobInfo.JobID)
|
||||
|
||||
w.logger.Info("Created workflow labels", "labels", workflowLabels)
|
||||
|
||||
// Get the pod
|
||||
w.logger.Info("Retrieving pod", "podName", jobInfo.RunnerName, "namespace", w.config.EphemeralRunnerSetNamespace)
|
||||
pod, err := w.clientset.CoreV1().Pods(w.config.EphemeralRunnerSetNamespace).Get(ctx, jobInfo.RunnerName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
w.logger.Info("Pod not found, skipping pod label update", "runnerName", jobInfo.RunnerName)
|
||||
return nil
|
||||
}
|
||||
w.logger.Error(err, "Failed to get pod", "runnerName", jobInfo.RunnerName)
|
||||
return fmt.Errorf("failed to get pod: %w", err)
|
||||
}
|
||||
|
||||
w.logger.Info("Successfully retrieved pod", "podName", pod.Name, "currentLabels", pod.Labels)
|
||||
|
||||
// Update pod labels
|
||||
if pod.Labels == nil {
|
||||
pod.Labels = make(map[string]string)
|
||||
}
|
||||
|
||||
labelsUpdated := false
|
||||
for key, value := range workflowLabels {
|
||||
if pod.Labels[key] != value {
|
||||
pod.Labels[key] = value
|
||||
labelsUpdated = true
|
||||
}
|
||||
}
|
||||
|
||||
if labelsUpdated {
|
||||
w.logger.Info("Updating pod labels with workflow metadata",
|
||||
"pod", pod.Name,
|
||||
"labels", workflowLabels)
|
||||
|
||||
_, err = w.clientset.CoreV1().Pods(w.config.EphemeralRunnerSetNamespace).Update(ctx, pod, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update pod labels: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue