Merge pull request #3 from jeanschmidt/jeanschmidt/proactive_capacity_max_runners
Add MaxBurstCapacity cap and fix MaxRunners headroom calculation
This commit is contained in:
commit
a3c294ab14
|
|
@ -15,13 +15,13 @@ type: application
|
|||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
||||
version: 0.14.1-jeanschmidt.6
|
||||
version: 0.14.1-jeanschmidt.7
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "0.14.1-jeanschmidt.6"
|
||||
appVersion: "0.14.1-jeanschmidt.7"
|
||||
|
||||
home: https://github.com/actions/actions-runner-controller
|
||||
|
||||
|
|
|
|||
|
|
@ -15,13 +15,13 @@ type: application
|
|||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
||||
version: 0.14.1-jeanschmidt.6
|
||||
version: 0.14.1-jeanschmidt.7
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "0.14.1-jeanschmidt.6"
|
||||
appVersion: "0.14.1-jeanschmidt.7"
|
||||
|
||||
home: https://github.com/actions/actions-runner-controller
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,12 @@ type Config struct {
|
|||
PlaceholderTimeout time.Duration
|
||||
MaxRunners int
|
||||
|
||||
// MaxBurstCapacity caps the maximum number of placeholder pairs (running + pending)
|
||||
// the provisioner will create per reconcile cycle. 0 means no cap.
|
||||
// Used to prevent burst node provisioning from overloading downstream services
|
||||
// (git-cache rsync connection pool, Harbor manifest fetches, pypi-cache).
|
||||
MaxBurstCapacity int
|
||||
|
||||
// Workflow pod resources (for placeholder-workflow sizing)
|
||||
WorkflowCPU string
|
||||
WorkflowMemory string
|
||||
|
|
@ -68,6 +74,7 @@ func ConfigFromEnv() Config {
|
|||
c := Config{
|
||||
Enabled: envBool("CAPACITY_AWARE_ENABLED", false),
|
||||
ProactiveCapacity: envInt("CAPACITY_AWARE_PROACTIVE_CAPACITY", 0),
|
||||
MaxBurstCapacity: envInt("CAPACITY_AWARE_MAX_BURST_CAPACITY", 0),
|
||||
RecalculateInterval: envDuration("CAPACITY_AWARE_RECALCULATE_INTERVAL", 30*time.Second),
|
||||
ReportInterval: envDuration("CAPACITY_AWARE_REPORT_INTERVAL", 5*time.Second),
|
||||
PlaceholderTimeout: envDuration("CAPACITY_AWARE_PLACEHOLDER_TIMEOUT", 5*time.Minute),
|
||||
|
|
@ -112,6 +119,10 @@ func (c *Config) Validate() error {
|
|||
"original", c.MaxRunners)
|
||||
c.MaxRunners = 0
|
||||
}
|
||||
if c.MaxBurstCapacity < 0 {
|
||||
slog.Warn("MaxBurstCapacity is negative, clamping to 0", "original", c.MaxBurstCapacity)
|
||||
c.MaxBurstCapacity = 0
|
||||
}
|
||||
|
||||
if c.Enabled && c.RunnerNodeFleet == "" {
|
||||
// Hard requirement: the runner-pool fleet drives placeholder-runner
|
||||
|
|
|
|||
|
|
@ -35,10 +35,11 @@ const (
|
|||
deleteReasonTimeout = "timeout"
|
||||
deleteReasonExcess = "excess"
|
||||
|
||||
skipReasonProvisionerListPairs = "provisioner_list_pairs"
|
||||
skipReasonReporterListPairs = "reporter_list_pairs"
|
||||
skipReasonReporterCountRunners = "reporter_count_runners"
|
||||
skipReasonHUDAPIFailed = "hud_api_failed"
|
||||
skipReasonProvisionerListPairs = "provisioner_list_pairs"
|
||||
skipReasonProvisionerListRunners = "provisioner_list_runners"
|
||||
skipReasonReporterListPairs = "reporter_list_pairs"
|
||||
skipReasonReporterCountRunners = "reporter_count_runners"
|
||||
skipReasonHUDAPIFailed = "hud_api_failed"
|
||||
|
||||
rolePlaceholderRunnerLabel = "runner"
|
||||
rolePlaceholderWorkflowLabel = "workflow"
|
||||
|
|
@ -134,6 +135,7 @@ func New(
|
|||
// Static gauges set once at construction so they appear in Prometheus
|
||||
// scrapes even before the first reconcile cycle runs.
|
||||
m.recorder.SetProactiveCapacity(m.config.ProactiveCapacity)
|
||||
m.recorder.SetMaxBurstCapacity(m.config.MaxBurstCapacity)
|
||||
m.recorder.SetHUDEnabled(m.hudClient != nil && m.config.HUDAPIToken != "")
|
||||
// Seed the reconcile-last-success gauges to listener-startup time so any
|
||||
// `time() - metric` wedge alert has a sane floor (small at startup,
|
||||
|
|
@ -184,23 +186,34 @@ func (m *Monitor) listPairsWithRetry(ctx context.Context, maxRetries int) (map[s
|
|||
return pairs, err
|
||||
}
|
||||
|
||||
func (m *Monitor) countRunningRunnersWithRetry(ctx context.Context, maxRetries int) (int, error) {
|
||||
var count int
|
||||
// countRunnersByPhaseWithRetry returns counts of real EphemeralRunner pods for
|
||||
// this scale set, keyed by PodPhase. Used by the reporter (Running for advertised
|
||||
// capacity) and the provisioner (Running+Pending for MaxRunners headroom).
|
||||
//
|
||||
// Performs a single List with the label selector and groups in code — no
|
||||
// FieldSelector. Phases not present in the result map have count 0.
|
||||
func (m *Monitor) countRunnersByPhaseWithRetry(ctx context.Context, maxRetries int) (map[corev1.PodPhase]int, error) {
|
||||
counts := make(map[corev1.PodPhase]int)
|
||||
err := retryWithBackoff(ctx, m.logger, "count-runners", maxRetries, func() error {
|
||||
sel := fmt.Sprintf("actions-ephemeral-runner=True,%s=%s",
|
||||
labelScaleSet, m.config.ScaleSetName,
|
||||
)
|
||||
pods, e := m.clientset.CoreV1().Pods(m.config.Namespace).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: sel,
|
||||
FieldSelector: "status.phase=Running",
|
||||
})
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
count = len(pods.Items)
|
||||
// Reset on each retry so partial counts from a failed attempt don't leak
|
||||
for k := range counts {
|
||||
delete(counts, k)
|
||||
}
|
||||
for i := range pods.Items {
|
||||
counts[pods.Items[i].Status.Phase]++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return count, err
|
||||
return counts, err
|
||||
}
|
||||
|
||||
func (m *Monitor) queryHUDWithRetry(ctx context.Context) (int, error) {
|
||||
|
|
@ -235,6 +248,7 @@ func (m *Monitor) Run(ctx context.Context) error {
|
|||
m.logger.Info("starting capacity monitor",
|
||||
"proactiveCapacity", m.config.ProactiveCapacity,
|
||||
"maxRunners", m.config.MaxRunners,
|
||||
"maxBurstCapacity", m.config.MaxBurstCapacity,
|
||||
"labels", m.config.ScaleSetLabels,
|
||||
"workflowNodeFleet", m.config.NodeFleet,
|
||||
"runnerNodeFleet", m.config.RunnerNodeFleet,
|
||||
|
|
@ -370,21 +384,63 @@ func (m *Monitor) reconcileProvisioning(ctx context.Context) {
|
|||
m.recorder.SetPairs(currentPairs)
|
||||
m.emitPlaceholderPodPhases(pairs)
|
||||
|
||||
// 4. Calculate desired placeholder count.
|
||||
desiredPairs := m.config.ProactiveCapacity + queuedJobs
|
||||
// 4. Count real EphemeralRunner pods (Running + Pending) for headroom
|
||||
// calculation. We need both phases so pods that exist but haven't
|
||||
// started yet still consume the MaxRunners budget — otherwise a burst
|
||||
// of Pending runners could double-book the cap.
|
||||
//
|
||||
// On list failure: skip the entire provisioning cycle. Treating a
|
||||
// failed count as 0 here would allow up to MaxRunners placeholders on
|
||||
// top of the actual real runners, doubling the cap during the failure
|
||||
// window. Skipping is the safe posture — the next cycle will reconcile.
|
||||
// The reporter has its own MaxRunners clamp on advertised capacity (belt
|
||||
// and suspenders); the provisioner's headroom clamp here is tighter
|
||||
// when real runners exist.
|
||||
runningRunnerPods := 0
|
||||
pendingRunnerPods := 0
|
||||
if m.config.MaxRunners > 0 {
|
||||
desiredPairs = min(desiredPairs, m.config.MaxRunners)
|
||||
counts, err := m.countRunnersByPhaseWithRetry(ctx, provisionerMaxRetries)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to count runner pods after retries, skipping provisioning cycle", "error", err)
|
||||
m.recorder.IncReconcileSkips(skipReasonProvisionerListRunners)
|
||||
return
|
||||
}
|
||||
runningRunnerPods = counts[corev1.PodRunning]
|
||||
pendingRunnerPods = counts[corev1.PodPending]
|
||||
}
|
||||
|
||||
// 5. Calculate desired placeholder count.
|
||||
desiredPairs := m.config.ProactiveCapacity + queuedJobs
|
||||
|
||||
// Clamp by headroom against the hard runner cap. Real runner pods (running +
|
||||
// pending) consume the cap, so the placeholder pool can only fill what's left.
|
||||
// Without this, MaxRunners=N could allow up to N placeholders on top of N real
|
||||
// runners, doubling the intended cap.
|
||||
if m.config.MaxRunners > 0 {
|
||||
totalRunnerPods := runningRunnerPods + pendingRunnerPods
|
||||
headroom := max(0, m.config.MaxRunners-totalRunnerPods)
|
||||
desiredPairs = min(desiredPairs, headroom)
|
||||
}
|
||||
|
||||
// Clamp burst so we don't spike the cluster (overload git-cache rsync,
|
||||
// Harbor manifest fetches, pypi-cache) when many jobs queue at once.
|
||||
if m.config.MaxBurstCapacity > 0 {
|
||||
desiredPairs = min(desiredPairs, m.config.MaxBurstCapacity)
|
||||
}
|
||||
|
||||
desiredPairs = max(desiredPairs, 0)
|
||||
m.recorder.SetDesiredPairs(desiredPairs)
|
||||
|
||||
// 5. Adjust: create or delete pairs.
|
||||
// 6. Adjust: create or delete pairs.
|
||||
m.adjustPairs(ctx, pairs, currentPairs, desiredPairs)
|
||||
|
||||
m.logger.Info("provisioning reconciled",
|
||||
"queuedJobs", queuedJobs,
|
||||
"desiredPairs", desiredPairs,
|
||||
"currentPairs", currentPairs,
|
||||
"runningRunnerPods", runningRunnerPods,
|
||||
"pendingRunnerPods", pendingRunnerPods,
|
||||
"maxBurstCapacity", m.config.MaxBurstCapacity,
|
||||
)
|
||||
// Mark success only at the end of a fully completed cycle. Early-exit
|
||||
// paths above (list-pairs error) do NOT mark success — the whole point
|
||||
|
|
@ -417,12 +473,13 @@ func (m *Monitor) reconcileReporting(ctx context.Context) {
|
|||
m.recorder.SetRunningPairs(runningPairs)
|
||||
|
||||
// 2. Count running runners with retry. On failure, keep previous capacity.
|
||||
runningRunners, err := m.countRunningRunnersWithRetry(ctx, reporterMaxRetries)
|
||||
counts, err := m.countRunnersByPhaseWithRetry(ctx, reporterMaxRetries)
|
||||
if err != nil {
|
||||
m.logger.Warn("failed to count runners, keeping previous capacity", "error", err)
|
||||
m.recorder.IncReconcileSkips(skipReasonReporterCountRunners)
|
||||
return
|
||||
}
|
||||
runningRunners := counts[corev1.PodRunning]
|
||||
|
||||
// 3. Report capacity to GitHub.
|
||||
capacity := runningRunners + runningPairs
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
|
@ -472,6 +473,7 @@ type fakeCapacityRecorder struct {
|
|||
mu sync.Mutex
|
||||
|
||||
proactiveCapacity int
|
||||
maxBurstCapacity int
|
||||
hudEnabled bool
|
||||
queuedJobs int
|
||||
desiredPairs int
|
||||
|
|
@ -487,6 +489,7 @@ type fakeCapacityRecorder struct {
|
|||
|
||||
// Counts of method invocations for assertions.
|
||||
setProactiveCapacityCalls int
|
||||
setMaxBurstCapacityCalls int
|
||||
setHUDEnabledCalls int
|
||||
setQueuedJobsCalls int
|
||||
setDesiredPairsCalls int
|
||||
|
|
@ -523,6 +526,12 @@ func (f *fakeCapacityRecorder) SetProactiveCapacity(v int) {
|
|||
f.proactiveCapacity = v
|
||||
f.setProactiveCapacityCalls++
|
||||
}
|
||||
func (f *fakeCapacityRecorder) SetMaxBurstCapacity(v int) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.maxBurstCapacity = v
|
||||
f.setMaxBurstCapacityCalls++
|
||||
}
|
||||
func (f *fakeCapacityRecorder) SetHUDEnabled(b bool) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
|
@ -718,3 +727,268 @@ func TestProvisioner_ListPairsError_RecordsSkip(t *testing.T) {
|
|||
// Duration is still observed even on the skip path.
|
||||
assert.Equal(t, 1, rec.observeReconcileCalls[reconcilePhaseProvisioner])
|
||||
}
|
||||
|
||||
// TestProvisioner_RunnerCountError_RecordsSkip simulates a failure when
|
||||
// counting real runner pods (Running/Pending phase) and asserts the
|
||||
// provisioner skips the cycle: the skip counter is incremented, the
|
||||
// success timestamp is NOT advanced, and no placeholder pairs are
|
||||
// created (the headroom calculation never ran, so we must not over-
|
||||
// create). This guards the doubling-the-cap regression that would
|
||||
// happen if a list failure was silently treated as 0 runners.
|
||||
func TestProvisioner_RunnerCountError_RecordsSkip(t *testing.T) {
|
||||
cfg := Config{
|
||||
ProactiveCapacity: 3,
|
||||
MaxRunners: 10,
|
||||
PlaceholderTimeout: 5 * time.Minute,
|
||||
}
|
||||
rec := newFakeCapacityRecorder()
|
||||
m, cs, _ := newTestMonitorWithRecorder(t, cfg, rec)
|
||||
|
||||
// Fail ONLY the runner-count list calls (those use the
|
||||
// "actions-ephemeral-runner=True,..." label selector). Placeholder
|
||||
// list calls use a different selector and continue to succeed, so
|
||||
// the cycle reaches the runner-count step before erroring out.
|
||||
cs.PrependReactor("list", "pods",
|
||||
func(action k8stesting.Action) (bool, runtime.Object, error) {
|
||||
la, ok := action.(k8stesting.ListAction)
|
||||
if !ok {
|
||||
return false, nil, nil
|
||||
}
|
||||
sel := la.GetListRestrictions().Labels.String()
|
||||
if strings.Contains(sel, "actions-ephemeral-runner=True") {
|
||||
return true, nil, fmt.Errorf("synthetic count-runners error")
|
||||
}
|
||||
return false, nil, nil
|
||||
},
|
||||
)
|
||||
|
||||
// Short deadline so retry backoffs (1s, 2s, 4s) abort quickly via ctx.Done().
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
m.reconcileProvisioning(ctx)
|
||||
|
||||
rec.mu.Lock()
|
||||
defer rec.mu.Unlock()
|
||||
|
||||
assert.Equal(t, 1, rec.incReconcileSkipsCalls[skipReasonProvisionerListRunners],
|
||||
"runner-count error must record a provisioner_list_runners skip")
|
||||
assert.Equal(t, 0, rec.setReconcileLastSuccessCalls[reconcilePhaseProvisioner],
|
||||
"success timestamp must NOT be set on the skip path")
|
||||
// Duration is still observed even on the skip path.
|
||||
assert.Equal(t, 1, rec.observeReconcileCalls[reconcilePhaseProvisioner])
|
||||
// SetDesiredPairs must NOT be called — the cycle bailed out before
|
||||
// the headroom calculation.
|
||||
assert.Equal(t, 0, rec.setDesiredPairsCalls,
|
||||
"desiredPairs must not be set when the cycle is skipped")
|
||||
// No placeholder pairs were created (the adjustPairs step never ran).
|
||||
assert.Equal(t, 0, countPods(t, cs, "test-ns"),
|
||||
"no placeholders must be created when the cycle is skipped")
|
||||
}
|
||||
|
||||
// ---- MaxBurstCapacity / MaxRunners headroom tests ----
|
||||
|
||||
// createRealRunnerPods creates n real EphemeralRunner pods for the scale set
|
||||
// in the given phase. Mirrors the label shape used by countRunnersByPhaseWithRetry
|
||||
// so the provisioner and reporter see them as "real".
|
||||
func createRealRunnerPods(
|
||||
t *testing.T,
|
||||
cs *fake.Clientset,
|
||||
ns, scaleSetName string,
|
||||
n int,
|
||||
phase corev1.PodPhase,
|
||||
namePrefix string,
|
||||
) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
for i := 0; i < n; i++ {
|
||||
pod := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s-%d", namePrefix, i),
|
||||
Namespace: ns,
|
||||
Labels: map[string]string{
|
||||
"actions-ephemeral-runner": "True",
|
||||
labelScaleSet: scaleSetName,
|
||||
},
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{Name: "runner", Image: "runner:latest"}},
|
||||
},
|
||||
Status: corev1.PodStatus{Phase: phase},
|
||||
}
|
||||
_, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// MaxBurstCapacity = 0 means "no cap" — desiredPairs equals the proactive +
|
||||
// queued sum, just like before the cap was introduced.
|
||||
func TestReconcile_MaxBurstCapacity_ZeroIsNoCap(t *testing.T) {
|
||||
hudRows := []QueuedJobsForRunner{
|
||||
{RunnerLabel: "linux.2xlarge", NumQueuedJobs: 8},
|
||||
}
|
||||
cfg := Config{
|
||||
ProactiveCapacity: 3,
|
||||
MaxRunners: 0, // unlimited so headroom can't interfere
|
||||
MaxBurstCapacity: 0, // unlimited
|
||||
ScaleSetLabels: []string{"linux.2xlarge"},
|
||||
PlaceholderTimeout: 5 * time.Minute,
|
||||
}
|
||||
m, cs, _ := newTestMonitor(t, cfg, hudRows)
|
||||
|
||||
m.reconcileProvisioning(context.Background())
|
||||
|
||||
// desired = 3 + 8 = 11 pairs = 22 pods, no cap applied.
|
||||
assert.Equal(t, 22, countPods(t, cs, "test-ns"),
|
||||
"MaxBurstCapacity=0 must NOT cap placeholders")
|
||||
}
|
||||
|
||||
// MaxBurstCapacity > 0 and desired exceeds it -> clamp to MaxBurstCapacity.
|
||||
func TestReconcile_MaxBurstCapacity_ClampsWhenExceeded(t *testing.T) {
|
||||
hudRows := []QueuedJobsForRunner{
|
||||
{RunnerLabel: "linux.2xlarge", NumQueuedJobs: 50},
|
||||
}
|
||||
cfg := Config{
|
||||
ProactiveCapacity: 5,
|
||||
MaxRunners: 0, // unlimited so MaxBurstCapacity is the only cap
|
||||
MaxBurstCapacity: 7,
|
||||
ScaleSetLabels: []string{"linux.2xlarge"},
|
||||
PlaceholderTimeout: 5 * time.Minute,
|
||||
}
|
||||
m, cs, _ := newTestMonitor(t, cfg, hudRows)
|
||||
|
||||
m.reconcileProvisioning(context.Background())
|
||||
|
||||
// desired = min(5+50, MaxBurstCapacity=7) = 7 pairs = 14 pods.
|
||||
assert.Equal(t, 14, countPods(t, cs, "test-ns"),
|
||||
"desired must be clamped to MaxBurstCapacity")
|
||||
}
|
||||
|
||||
// MaxBurstCapacity > 0 but desired stays below the cap -> no clamp.
|
||||
func TestReconcile_MaxBurstCapacity_UnclampedWhenBelow(t *testing.T) {
|
||||
hudRows := []QueuedJobsForRunner{
|
||||
{RunnerLabel: "linux.2xlarge", NumQueuedJobs: 2},
|
||||
}
|
||||
cfg := Config{
|
||||
ProactiveCapacity: 3,
|
||||
MaxRunners: 0,
|
||||
MaxBurstCapacity: 20,
|
||||
ScaleSetLabels: []string{"linux.2xlarge"},
|
||||
PlaceholderTimeout: 5 * time.Minute,
|
||||
}
|
||||
m, cs, _ := newTestMonitor(t, cfg, hudRows)
|
||||
|
||||
m.reconcileProvisioning(context.Background())
|
||||
|
||||
// desired = 3 + 2 = 5 (well below MaxBurstCapacity=20). 5 pairs = 10 pods.
|
||||
assert.Equal(t, 10, countPods(t, cs, "test-ns"),
|
||||
"desired below MaxBurstCapacity must remain unclamped")
|
||||
}
|
||||
|
||||
// Both MaxRunners-headroom and MaxBurstCapacity active; MaxRunners-headroom is
|
||||
// tighter (it leaves only 2 slots while MaxBurstCapacity allows 50) -> headroom wins.
|
||||
func TestReconcile_MaxRunnersHeadroom_TighterThanBurst(t *testing.T) {
|
||||
hudRows := []QueuedJobsForRunner{
|
||||
{RunnerLabel: "linux.2xlarge", NumQueuedJobs: 100},
|
||||
}
|
||||
cfg := Config{
|
||||
ProactiveCapacity: 0,
|
||||
MaxRunners: 10,
|
||||
MaxBurstCapacity: 50,
|
||||
ScaleSetLabels: []string{"linux.2xlarge"},
|
||||
PlaceholderTimeout: 5 * time.Minute,
|
||||
}
|
||||
m, cs, _ := newTestMonitor(t, cfg, hudRows)
|
||||
// 8 real runner pods consume the cap, leaving only 2 slots of headroom.
|
||||
createRealRunnerPods(t, cs, "test-ns", "test-sset", 8, corev1.PodRunning, "runner-r")
|
||||
|
||||
m.reconcileProvisioning(context.Background())
|
||||
|
||||
// MaxRunners-headroom = 10 - 8 = 2; MaxBurstCapacity = 50.
|
||||
// desired = min(100, 2, 50) = 2 pairs = 4 pods (plus the 8 real pods = 12 total).
|
||||
pairs, err := m.placeholders.ListPairs(context.Background())
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, pairs, 2,
|
||||
"MaxRunners-headroom (2) must win over MaxBurstCapacity (50)")
|
||||
}
|
||||
|
||||
// Both MaxRunners-headroom and MaxBurstCapacity active; MaxBurstCapacity is
|
||||
// the tighter cap (allows 4 while headroom would allow 50) -> burst wins.
|
||||
func TestReconcile_MaxBurstCapacity_TighterThanHeadroom(t *testing.T) {
|
||||
hudRows := []QueuedJobsForRunner{
|
||||
{RunnerLabel: "linux.2xlarge", NumQueuedJobs: 100},
|
||||
}
|
||||
cfg := Config{
|
||||
ProactiveCapacity: 0,
|
||||
MaxRunners: 100, // headroom is wide open
|
||||
MaxBurstCapacity: 4, // burst is the tight one
|
||||
ScaleSetLabels: []string{"linux.2xlarge"},
|
||||
PlaceholderTimeout: 5 * time.Minute,
|
||||
}
|
||||
m, cs, _ := newTestMonitor(t, cfg, hudRows)
|
||||
// A few real runners exist but headroom (100-5=95) is still much larger than 4.
|
||||
createRealRunnerPods(t, cs, "test-ns", "test-sset", 5, corev1.PodRunning, "runner-r")
|
||||
|
||||
m.reconcileProvisioning(context.Background())
|
||||
|
||||
// desired = min(100, 100-5=95, 4) = 4 pairs = 8 pods.
|
||||
pairs, err := m.placeholders.ListPairs(context.Background())
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, pairs, 4,
|
||||
"MaxBurstCapacity (4) must win over MaxRunners-headroom (95)")
|
||||
}
|
||||
|
||||
// Headroom-fix verification: with MaxRunners=10, 8 Running and 1 Pending real
|
||||
// runner pods, only 1 placeholder pair may be created (not 10 — that was the
|
||||
// pre-fix bug that allowed up to MaxRunners placeholders ON TOP OF the real
|
||||
// runners, doubling the cap).
|
||||
func TestReconcile_MaxRunnersHeadroom_CountsPendingTowardCap(t *testing.T) {
|
||||
hudRows := []QueuedJobsForRunner{
|
||||
{RunnerLabel: "linux.2xlarge", NumQueuedJobs: 50},
|
||||
}
|
||||
cfg := Config{
|
||||
ProactiveCapacity: 0,
|
||||
MaxRunners: 10,
|
||||
ScaleSetLabels: []string{"linux.2xlarge"},
|
||||
PlaceholderTimeout: 5 * time.Minute,
|
||||
}
|
||||
m, cs, _ := newTestMonitor(t, cfg, hudRows)
|
||||
createRealRunnerPods(t, cs, "test-ns", "test-sset", 8, corev1.PodRunning, "runner-r")
|
||||
createRealRunnerPods(t, cs, "test-ns", "test-sset", 1, corev1.PodPending, "runner-p")
|
||||
|
||||
m.reconcileProvisioning(context.Background())
|
||||
|
||||
// MaxRunners-headroom = 10 - (8 Running + 1 Pending) = 1.
|
||||
// desired = min(50, 1) = 1 pair = 2 pods.
|
||||
pairs, err := m.placeholders.ListPairs(context.Background())
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, pairs, 1,
|
||||
"only 1 placeholder allowed: MaxRunners=10 minus 8 Running minus 1 Pending real runner pods")
|
||||
}
|
||||
|
||||
// Edge case: real runner pods have already exhausted (or exceeded) MaxRunners.
|
||||
// The headroom subtraction goes negative -> max(0) clamps it -> desiredPairs=0,
|
||||
// no placeholders are created. Prevents over-provisioning when the cap is hit.
|
||||
func TestReconcile_MaxRunnersHeadroom_AtOrAboveCapFloorsAtZero(t *testing.T) {
|
||||
hudRows := []QueuedJobsForRunner{
|
||||
{RunnerLabel: "linux.2xlarge", NumQueuedJobs: 50},
|
||||
}
|
||||
cfg := Config{
|
||||
ProactiveCapacity: 5,
|
||||
MaxRunners: 10,
|
||||
ScaleSetLabels: []string{"linux.2xlarge"},
|
||||
PlaceholderTimeout: 5 * time.Minute,
|
||||
}
|
||||
m, cs, _ := newTestMonitor(t, cfg, hudRows)
|
||||
// 12 real runner pods: 6 Running + 6 Pending — already over the cap of 10.
|
||||
createRealRunnerPods(t, cs, "test-ns", "test-sset", 6, corev1.PodRunning, "runner-r")
|
||||
createRealRunnerPods(t, cs, "test-ns", "test-sset", 6, corev1.PodPending, "runner-p")
|
||||
|
||||
m.reconcileProvisioning(context.Background())
|
||||
|
||||
// MaxRunners-headroom = 10 - 12 = -2; max(...,0) -> 0 pairs.
|
||||
pairs, err := m.placeholders.ListPairs(context.Background())
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, pairs,
|
||||
"total runner pods (12) >= MaxRunners (10) must floor desiredPairs to 0")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,6 +67,7 @@ const (
|
|||
// in testdata/listener_metrics.yaml.
|
||||
const (
|
||||
MetricCapacityProactiveCapacity = "gha_capacity_proactive_capacity"
|
||||
MetricCapacityMaxBurstCapacity = "gha_capacity_max_burst_capacity"
|
||||
MetricCapacityHUDEnabled = "gha_capacity_hud_enabled"
|
||||
MetricCapacityQueuedJobs = "gha_capacity_queued_jobs"
|
||||
MetricCapacityDesiredPairs = "gha_capacity_desired_pairs"
|
||||
|
|
@ -119,9 +120,10 @@ var metricsHelp = metricsHelpRegistry{
|
|||
MetricIdleRunners: "Number of registered runners not running a job.",
|
||||
|
||||
MetricCapacityProactiveCapacity: "Configured proactiveCapacity value from listener config — target number of pre-warmed runner+placeholder pairs.",
|
||||
MetricCapacityMaxBurstCapacity: "Configured maxBurstCapacity value from listener config — caps total placeholder pairs (running + pending) the provisioner will create per cycle, preventing burst node provisioning from overloading downstream services (git-cache, Harbor, pypi-cache).",
|
||||
MetricCapacityHUDEnabled: "1 if HUD API client + token are configured at startup, else 0. Distinguishes 'no HUD data' from 'HUD broken'.",
|
||||
MetricCapacityQueuedJobs: "Queued jobs from PyTorch HUD API for this scale set's labels (external queue, distinct from gha_assigned_jobs).",
|
||||
MetricCapacityDesiredPairs: "Number of runner+placeholder pairs desired after applying the maxRunners cap.",
|
||||
MetricCapacityDesiredPairs: "Number of placeholder pairs the provisioner wants to maintain (ProactiveCapacity + queuedJobs, then clamped by MaxRunners headroom and MaxBurstCapacity).",
|
||||
MetricCapacityPairs: "Total existing runner+placeholder pairs (currentPairs).",
|
||||
MetricCapacityRunningPairs: "Pairs where both the runner pod and placeholder pod are in Running phase.",
|
||||
MetricCapacityPlaceholderPods: "Number of placeholder pods by role (runner|workflow) and phase (Pending|Running|Failed|Succeeded|Unknown). The phase values match Kubernetes corev1.PodPhase.",
|
||||
|
|
@ -179,6 +181,7 @@ type Recorder interface {
|
|||
// the hood. Callers only supply the metric-specific extra labels.
|
||||
type CapacityRecorder interface {
|
||||
SetProactiveCapacity(value int)
|
||||
SetMaxBurstCapacity(value int)
|
||||
SetHUDEnabled(enabled bool)
|
||||
SetQueuedJobs(value int)
|
||||
SetDesiredPairs(value int)
|
||||
|
|
@ -384,6 +387,9 @@ var defaultMetrics = v1alpha1.MetricsConfig{
|
|||
MetricCapacityProactiveCapacity: {
|
||||
Labels: withExtraLabels(),
|
||||
},
|
||||
MetricCapacityMaxBurstCapacity: {
|
||||
Labels: withExtraLabels(),
|
||||
},
|
||||
MetricCapacityHUDEnabled: {
|
||||
Labels: withExtraLabels(),
|
||||
},
|
||||
|
|
@ -687,6 +693,10 @@ func (e *exporter) SetProactiveCapacity(value int) {
|
|||
e.setGauge(MetricCapacityProactiveCapacity, e.scaleSetLabels, float64(value))
|
||||
}
|
||||
|
||||
func (e *exporter) SetMaxBurstCapacity(value int) {
|
||||
e.setGauge(MetricCapacityMaxBurstCapacity, e.scaleSetLabels, float64(value))
|
||||
}
|
||||
|
||||
func (e *exporter) SetHUDEnabled(enabled bool) {
|
||||
v := 0.0
|
||||
if enabled {
|
||||
|
|
@ -778,6 +788,7 @@ func (*discard) RecordDesiredRunners(int) {}
|
|||
// callers can hold a CapacityRecorder of `&discard{}` (i.e. DiscardCapacity)
|
||||
// without nil-checking before every call.
|
||||
func (*discard) SetProactiveCapacity(int) {}
|
||||
func (*discard) SetMaxBurstCapacity(int) {}
|
||||
func (*discard) SetHUDEnabled(bool) {}
|
||||
func (*discard) SetQueuedJobs(int) {}
|
||||
func (*discard) SetDesiredPairs(int) {}
|
||||
|
|
|
|||
|
|
@ -344,6 +344,9 @@ func TestExporterCapacityRecorder_Gauges(t *testing.T) {
|
|||
e.SetProactiveCapacity(7)
|
||||
assert.Equal(t, 7.0, gaugeValue(t, e, MetricCapacityProactiveCapacity))
|
||||
|
||||
e.SetMaxBurstCapacity(11)
|
||||
assert.Equal(t, 11.0, gaugeValue(t, e, MetricCapacityMaxBurstCapacity))
|
||||
|
||||
e.SetHUDEnabled(true)
|
||||
assert.Equal(t, 1.0, gaugeValue(t, e, MetricCapacityHUDEnabled))
|
||||
e.SetHUDEnabled(false)
|
||||
|
|
@ -486,6 +489,7 @@ func TestDiscardCapacity_NoOpAndAlive(t *testing.T) {
|
|||
d := DiscardCapacity
|
||||
// Each method must return without panic.
|
||||
assert.NotPanics(t, func() { d.SetProactiveCapacity(1) })
|
||||
assert.NotPanics(t, func() { d.SetMaxBurstCapacity(1) })
|
||||
assert.NotPanics(t, func() { d.SetHUDEnabled(true) })
|
||||
assert.NotPanics(t, func() { d.SetHUDEnabled(false) })
|
||||
assert.NotPanics(t, func() { d.SetQueuedJobs(2) })
|
||||
|
|
|
|||
|
|
@ -742,6 +742,46 @@ func (_c *MockCapacityRecorder_SetPlaceholderPods_Call) RunAndReturn(run func(ro
|
|||
return _c
|
||||
}
|
||||
|
||||
// SetMaxBurstCapacity provides a mock function for the type MockCapacityRecorder
|
||||
func (_mock *MockCapacityRecorder) SetMaxBurstCapacity(value int) {
|
||||
_mock.Called(value)
|
||||
return
|
||||
}
|
||||
|
||||
// MockCapacityRecorder_SetMaxBurstCapacity_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxBurstCapacity'
|
||||
type MockCapacityRecorder_SetMaxBurstCapacity_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// SetMaxBurstCapacity is a helper method to define mock.On call
|
||||
// - value int
|
||||
func (_e *MockCapacityRecorder_Expecter) SetMaxBurstCapacity(value interface{}) *MockCapacityRecorder_SetMaxBurstCapacity_Call {
|
||||
return &MockCapacityRecorder_SetMaxBurstCapacity_Call{Call: _e.mock.On("SetMaxBurstCapacity", value)}
|
||||
}
|
||||
|
||||
func (_c *MockCapacityRecorder_SetMaxBurstCapacity_Call) Run(run func(value int)) *MockCapacityRecorder_SetMaxBurstCapacity_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 int
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(int)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockCapacityRecorder_SetMaxBurstCapacity_Call) Return() *MockCapacityRecorder_SetMaxBurstCapacity_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockCapacityRecorder_SetMaxBurstCapacity_Call) RunAndReturn(run func(value int)) *MockCapacityRecorder_SetMaxBurstCapacity_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// SetProactiveCapacity provides a mock function for the type MockCapacityRecorder
|
||||
func (_mock *MockCapacityRecorder) SetProactiveCapacity(value int) {
|
||||
_mock.Called(value)
|
||||
|
|
@ -1662,6 +1702,46 @@ func (_c *MockServerExporter_SetPlaceholderPods_Call) RunAndReturn(run func(role
|
|||
return _c
|
||||
}
|
||||
|
||||
// SetMaxBurstCapacity provides a mock function for the type MockServerExporter
|
||||
func (_mock *MockServerExporter) SetMaxBurstCapacity(value int) {
|
||||
_mock.Called(value)
|
||||
return
|
||||
}
|
||||
|
||||
// MockServerExporter_SetMaxBurstCapacity_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetMaxBurstCapacity'
|
||||
type MockServerExporter_SetMaxBurstCapacity_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// SetMaxBurstCapacity is a helper method to define mock.On call
|
||||
// - value int
|
||||
func (_e *MockServerExporter_Expecter) SetMaxBurstCapacity(value interface{}) *MockServerExporter_SetMaxBurstCapacity_Call {
|
||||
return &MockServerExporter_SetMaxBurstCapacity_Call{Call: _e.mock.On("SetMaxBurstCapacity", value)}
|
||||
}
|
||||
|
||||
func (_c *MockServerExporter_SetMaxBurstCapacity_Call) Run(run func(value int)) *MockServerExporter_SetMaxBurstCapacity_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 int
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(int)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockServerExporter_SetMaxBurstCapacity_Call) Return() *MockServerExporter_SetMaxBurstCapacity_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockServerExporter_SetMaxBurstCapacity_Call) RunAndReturn(run func(value int)) *MockServerExporter_SetMaxBurstCapacity_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// SetProactiveCapacity provides a mock function for the type MockServerExporter
|
||||
func (_mock *MockServerExporter) SetProactiveCapacity(value int) {
|
||||
_mock.Called(value)
|
||||
|
|
|
|||
|
|
@ -71,6 +71,8 @@ listenerMetrics:
|
|||
labels: ["name", "namespace", "repository", "organization", "enterprise"]
|
||||
gha_capacity_proactive_capacity:
|
||||
labels: ["name", "namespace", "repository", "organization", "enterprise"]
|
||||
gha_capacity_max_burst_capacity:
|
||||
labels: ["name", "namespace", "repository", "organization", "enterprise"]
|
||||
gha_capacity_hud_enabled:
|
||||
labels: ["name", "namespace", "repository", "organization", "enterprise"]
|
||||
gha_capacity_queued_jobs:
|
||||
|
|
|
|||
Loading…
Reference in New Issue