Schedule opportunistically and more granularly (#225)

* Schedule opportunistically and more granularly

To avoid transaction conflicts.

* Measure scheduling loop iteration duration and log it at debugging level

* Use "continue NextWorker" instead of just "continue" for clarity
This commit is contained in:
Nikolay Edigaryev 2024-12-03 18:11:48 +04:00 committed by GitHub
parent 7fe0414981
commit d94690176e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 216 additions and 66 deletions

View File

@ -3,12 +3,14 @@ package scheduler
import (
"cmp"
"context"
"errors"
"github.com/cirruslabs/orchard/internal/controller/lifecycle"
"github.com/cirruslabs/orchard/internal/controller/notifier"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/opentelemetry"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/cirruslabs/orchard/rpc"
mapset "github.com/deckarep/golang-set/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/metric"
@ -24,6 +26,11 @@ const (
schedulerVMRestartDelay = 15 * time.Second
)
var (
ErrVMSchedulingSkipped = errors.New("scheduling skipped for VM")
ErrWorkerSchedulingSkipped = errors.New("scheduling skipped for worker")
)
var (
schedulerLoopIterationStat = promauto.NewCounter(prometheus.CounterOpts{
Name: "orchard_scheduler_loop_iteration_total",
@ -83,7 +90,15 @@ func (scheduler *Scheduler) Run() {
if err := scheduler.healthCheckingLoopIteration(); err != nil {
scheduler.logger.Errorf("Failed to health-check VMs: %v", err)
}
if err := scheduler.schedulingLoopIteration(); err != nil {
schedulingLoopIterationStart := time.Now()
err := scheduler.schedulingLoopIteration()
schedulingLoopIterationEnd := time.Now()
scheduler.logger.Debugf("Scheduling loop iteration took %v",
schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart))
if err != nil {
scheduler.logger.Errorf("Failed to schedule VMs: %v", err)
} else {
schedulerLoopIterationStat.Inc()
@ -115,88 +130,205 @@ func (scheduler *Scheduler) RequestScheduling() {
}
}
//nolint:gocognit // this logic could be said to be considered even more complex if split into multiple functions
func (scheduler *Scheduler) schedulingLoopIteration() error {
affectedWorkers := map[string]bool{}
affectedWorkers := mapset.NewSet[string]()
err := scheduler.store.Update(func(txn storepkg.Transaction) error {
vms, err := txn.ListVMs()
if err != nil {
return err
}
unscheduledVMs, workerInfos := ProcessVMs(vms)
// Scheduler consistency model is based on the following:
//
// EXCLUSIVENESS:
//
// Only one scheduler might operate in a cluster at any given time.
//
// Currently, this is achieved automatically since we run in-process
// BadgerDB that runs in the Orchard Controller, and Orchard Controller
// in turn runs a single scheduler.
//
// In the future, we might support etcd, and in that case leader
// election can be implemented to ensure this property, thanks to
// etcd leases[1].
//
// [1]: https://medium.com/@ahadrana/understanding-etcd3-8784c4f61755
//
// OVERESTIMATION OF USED RESOURCES:
//
// Scheduler acts opportunistically on a lagging view of resource
// usage in the cluster.
//
// This means that we won't assign more than what a worker can handle,
// but we might skip the worker from the consideration, even through
// in reality it can already handle the VM we're scheduling, and assign
// the VM to a next worker, thus slightly violating the scheduler
// profile at times.
//
// LAGGING SCHEDULER PROFILE:
//
// In case the scheduler profile is changed amidst the scheduling loop
// iteration, we'll act on a previously set scheduler profile.
//
// It feels that this is totally fine, assuming that (1) the scheduler
// profile is not something that's changed frequently and that (2) at
// the same time when the scheduler profile is changed, a user won't
// schedule a bunch of VMs in the hope that they'll be serviced using
// that new scheduling profile.
workers, err := txn.ListWorkers()
var vms []v1.VM
var workers []v1.Worker
var schedulerProfile v1.SchedulerProfile
if err := scheduler.store.View(func(txn storepkg.Transaction) error {
var err error
vms, err = txn.ListVMs()
if err != nil {
return err
}
workers, err = txn.ListWorkers()
if err != nil {
return err
}
// Retrieve cluster settings to figure out which scheduler profile to use
clusterSettings, err := txn.GetClusterSettings()
if err != nil {
return err
}
for _, unscheduledVM := range unscheduledVMs {
// Order workers depending on the scheduler profile
switch clusterSettings.SchedulerProfile {
case v1.SchedulerProfileDistributeLoad:
slices.SortFunc(workers, func(a, b v1.Worker) int {
// Sort by the number of running VMs, ascending order
return cmp.Compare(workerInfos[a.Name].NumRunningVMs,
workerInfos[b.Name].NumRunningVMs)
})
case v1.SchedulerProfileOptimizeUtilization:
fallthrough
default:
slices.SortFunc(workers, func(a, b v1.Worker) int {
// Sort by the number of running VMs, descending order
return cmp.Compare(workerInfos[b.Name].NumRunningVMs,
workerInfos[a.Name].NumRunningVMs)
})
}
// Find a worker that can run this VM
for _, worker := range workers {
resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed
resourcesRemaining := worker.Resources.Subtracted(resourcesUsed)
if resourcesRemaining.CanFit(unscheduledVM.Resources) &&
!worker.Offline(scheduler.workerOfflineTimeout) &&
!worker.SchedulingPaused {
// Metrics
scheduler.schedulingTimeHistogram.Record(context.Background(),
time.Since(unscheduledVM.CreatedAt).Seconds())
unscheduledVM.Worker = worker.Name
unscheduledVM.ScheduledAt = time.Now()
if err := txn.SetVM(unscheduledVM); err != nil {
return err
}
affectedWorkers[worker.Name] = true
workerInfos.AddVM(worker.Name, unscheduledVM.Resources)
break
}
}
}
schedulerProfile = clusterSettings.SchedulerProfile
return nil
})
syncVMsInstruction := rpc.WatchInstruction{
Action: &rpc.WatchInstruction_SyncVmsAction{},
}); err != nil {
return err
}
for workerToPoke := range affectedWorkers {
// it's fine to ignore the error here, since the worker will sync the VMs on the next cycle
notifyErr := scheduler.notifier.Notify(context.Background(), workerToPoke, &syncVMsInstruction)
if notifyErr != nil {
scheduler.logger.Errorf("Failed to reactively sync VMs on worker %s: %v", workerToPoke, notifyErr)
unscheduledVMs, workerInfos := ProcessVMs(vms)
NextVM:
for _, unscheduledVM := range unscheduledVMs {
// Order workers depending on the scheduler profile and
// our updated lagging resource usage for each worker
switch schedulerProfile {
case v1.SchedulerProfileDistributeLoad:
slices.SortFunc(workers, func(a, b v1.Worker) int {
// Sort by the number of running VMs, ascending order
return cmp.Compare(workerInfos[a.Name].NumRunningVMs,
workerInfos[b.Name].NumRunningVMs)
})
case v1.SchedulerProfileOptimizeUtilization:
fallthrough
default:
slices.SortFunc(workers, func(a, b v1.Worker) int {
// Sort by the number of running VMs, descending order
return cmp.Compare(workerInfos[b.Name].NumRunningVMs,
workerInfos[a.Name].NumRunningVMs)
})
}
// Iterate through sorted workers and find a worker that can run this VM
NextWorker:
for _, worker := range workers {
resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed
resourcesRemaining := worker.Resources.Subtracted(resourcesUsed)
if worker.Offline(scheduler.workerOfflineTimeout) ||
worker.SchedulingPaused ||
!resourcesRemaining.CanFit(unscheduledVM.Resources) {
continue NextWorker
}
err := scheduler.store.Update(func(txn storepkg.Transaction) error {
currentUnscheduledVM, err := txn.GetVM(unscheduledVM.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
// The unscheduled VM ceased to exist,
// so nothing to schedule
return ErrVMSchedulingSkipped
}
return err
}
if currentUnscheduledVM.UID != unscheduledVM.UID {
// The unscheduled VM had changed, so we'll re-evaluate a new
// version of it in the next scheduling loop iteration
return ErrVMSchedulingSkipped
}
if currentUnscheduledVM.Status != v1.VMStatusPending ||
currentUnscheduledVM.Worker != "" {
// Unscheduled VM is not unscheduled anymore,
// so there's nothing to do
return ErrVMSchedulingSkipped
}
currentWorker, err := txn.GetWorker(worker.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
// The worker that we were planning to schedule
// this VM on has ceased to exist, so move on
return ErrWorkerSchedulingSkipped
}
return err
}
if currentWorker.Offline(scheduler.workerOfflineTimeout) ||
currentWorker.SchedulingPaused {
return ErrWorkerSchedulingSkipped
}
if currentWorker.MachineID != worker.MachineID ||
!currentWorker.Resources.Equal(worker.Resources) {
// Worker has changed
return ErrWorkerSchedulingSkipped
}
unscheduledVM.Worker = worker.Name
unscheduledVM.ScheduledAt = time.Now()
if err := txn.SetVM(unscheduledVM); err != nil {
return err
}
return nil
})
if err != nil {
if errors.Is(err, ErrVMSchedulingSkipped) {
continue NextVM
}
if errors.Is(err, ErrWorkerSchedulingSkipped) {
continue NextWorker
}
return err
}
// Update lagging resource usage
workerInfos.AddVM(worker.Name, unscheduledVM.Resources)
// Ping the worker afterward for faster VM execution
affectedWorkers.Add(worker.Name)
// Update metrics
scheduler.schedulingTimeHistogram.Record(context.Background(),
time.Since(unscheduledVM.CreatedAt).Seconds())
break
}
}
return err
for affectedWorker := range affectedWorkers.Iter() {
// It's fine to not treat the error as fatal here,
// since the worker will sync the VMs on the next
// scheduling iteration
if err := scheduler.notifier.Notify(context.Background(), affectedWorker, &rpc.WatchInstruction{
Action: &rpc.WatchInstruction_SyncVmsAction{},
}); err != nil {
scheduler.logger.Errorf("Failed to reactively sync VMs on worker %s: %v", affectedWorker, err)
}
}
return nil
}
func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) {

View File

@ -3,6 +3,7 @@ package v1
import (
"errors"
"fmt"
"maps"
"strconv"
)
@ -98,3 +99,7 @@ func (resources Resources) CanFit(other Resources) bool {
return true
}
func (resources Resources) Equal(other Resources) bool {
return maps.Equal(resources, other)
}

View File

@ -104,3 +104,16 @@ func TestResourcesMerged(t *testing.T) {
v1.ResourceTartVMs: 4,
}))
}
func TestEqual(t *testing.T) {
//nolint:gocritic // "dupArg: suspicious method call with the same argument and receiver" // it's not suspicious at all
require.True(t, v1.Resources{}.Equal(v1.Resources{}))
require.True(t, v1.Resources{"a": 10.0}.Equal(v1.Resources{"a": 10.0}))
require.False(t, v1.Resources{"a": 10.0}.Equal(v1.Resources{"a": 10.0, "b": 15.0}))
require.False(t, v1.Resources{"a": 10.0, "b": 15.0}.Equal(v1.Resources{"a": 10.0}))
require.False(t, v1.Resources{"a": 0.0}.Equal(v1.Resources{"b": 0.0}))
}