From d94690176e328ac27dd0ce9d9a4ce49a12de4f2e Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Tue, 3 Dec 2024 18:11:48 +0400 Subject: [PATCH] Schedule opportunistically and more granularly (#225) * Schedule opportunistically and more granularly To avoid transaction conflicts. * Measure scheduling loop iteration duration and log it at debugging level * Use "continue NextWorker" instead of just "continue" for clarity --- internal/controller/scheduler/scheduler.go | 264 +++++++++++++++------ pkg/resource/v1/resources.go | 5 + pkg/resource/v1/resources_test.go | 13 + 3 files changed, 216 insertions(+), 66 deletions(-) diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index 1748872..aaab508 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -3,12 +3,14 @@ package scheduler import ( "cmp" "context" + "errors" "github.com/cirruslabs/orchard/internal/controller/lifecycle" "github.com/cirruslabs/orchard/internal/controller/notifier" storepkg "github.com/cirruslabs/orchard/internal/controller/store" "github.com/cirruslabs/orchard/internal/opentelemetry" "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/cirruslabs/orchard/rpc" + mapset "github.com/deckarep/golang-set/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/otel/metric" @@ -24,6 +26,11 @@ const ( schedulerVMRestartDelay = 15 * time.Second ) +var ( + ErrVMSchedulingSkipped = errors.New("scheduling skipped for VM") + ErrWorkerSchedulingSkipped = errors.New("scheduling skipped for worker") +) + var ( schedulerLoopIterationStat = promauto.NewCounter(prometheus.CounterOpts{ Name: "orchard_scheduler_loop_iteration_total", @@ -83,7 +90,15 @@ func (scheduler *Scheduler) Run() { if err := scheduler.healthCheckingLoopIteration(); err != nil { scheduler.logger.Errorf("Failed to health-check VMs: %v", err) } - if err := scheduler.schedulingLoopIteration(); err != nil { + + schedulingLoopIterationStart := time.Now() + err := scheduler.schedulingLoopIteration() + schedulingLoopIterationEnd := time.Now() + + scheduler.logger.Debugf("Scheduling loop iteration took %v", + schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart)) + + if err != nil { scheduler.logger.Errorf("Failed to schedule VMs: %v", err) } else { schedulerLoopIterationStat.Inc() @@ -115,88 +130,205 @@ func (scheduler *Scheduler) RequestScheduling() { } } +//nolint:gocognit // this logic could be said to be considered even more complex if split into multiple functions func (scheduler *Scheduler) schedulingLoopIteration() error { - affectedWorkers := map[string]bool{} + affectedWorkers := mapset.NewSet[string]() - err := scheduler.store.Update(func(txn storepkg.Transaction) error { - vms, err := txn.ListVMs() - if err != nil { - return err - } - unscheduledVMs, workerInfos := ProcessVMs(vms) + // Scheduler consistency model is based on the following: + // + // EXCLUSIVENESS: + // + // Only one scheduler might operate in a cluster at any given time. + // + // Currently, this is achieved automatically since we run in-process + // BadgerDB that runs in the Orchard Controller, and Orchard Controller + // in turn runs a single scheduler. + // + // In the future, we might support etcd, and in that case leader + // election can be implemented to ensure this property, thanks to + // etcd leases[1]. + // + // [1]: https://medium.com/@ahadrana/understanding-etcd3-8784c4f61755 + // + // OVERESTIMATION OF USED RESOURCES: + // + // Scheduler acts opportunistically on a lagging view of resource + // usage in the cluster. + // + // This means that we won't assign more than what a worker can handle, + // but we might skip the worker from the consideration, even through + // in reality it can already handle the VM we're scheduling, and assign + // the VM to a next worker, thus slightly violating the scheduler + // profile at times. + // + // LAGGING SCHEDULER PROFILE: + // + // In case the scheduler profile is changed amidst the scheduling loop + // iteration, we'll act on a previously set scheduler profile. + // + // It feels that this is totally fine, assuming that (1) the scheduler + // profile is not something that's changed frequently and that (2) at + // the same time when the scheduler profile is changed, a user won't + // schedule a bunch of VMs in the hope that they'll be serviced using + // that new scheduling profile. - workers, err := txn.ListWorkers() + var vms []v1.VM + var workers []v1.Worker + var schedulerProfile v1.SchedulerProfile + + if err := scheduler.store.View(func(txn storepkg.Transaction) error { + var err error + + vms, err = txn.ListVMs() + if err != nil { + return err + } + + workers, err = txn.ListWorkers() if err != nil { return err } - // Retrieve cluster settings to figure out which scheduler profile to use clusterSettings, err := txn.GetClusterSettings() if err != nil { return err } - - for _, unscheduledVM := range unscheduledVMs { - // Order workers depending on the scheduler profile - switch clusterSettings.SchedulerProfile { - case v1.SchedulerProfileDistributeLoad: - slices.SortFunc(workers, func(a, b v1.Worker) int { - // Sort by the number of running VMs, ascending order - return cmp.Compare(workerInfos[a.Name].NumRunningVMs, - workerInfos[b.Name].NumRunningVMs) - }) - case v1.SchedulerProfileOptimizeUtilization: - fallthrough - default: - slices.SortFunc(workers, func(a, b v1.Worker) int { - // Sort by the number of running VMs, descending order - return cmp.Compare(workerInfos[b.Name].NumRunningVMs, - workerInfos[a.Name].NumRunningVMs) - }) - } - - // Find a worker that can run this VM - for _, worker := range workers { - resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed - resourcesRemaining := worker.Resources.Subtracted(resourcesUsed) - - if resourcesRemaining.CanFit(unscheduledVM.Resources) && - !worker.Offline(scheduler.workerOfflineTimeout) && - !worker.SchedulingPaused { - // Metrics - scheduler.schedulingTimeHistogram.Record(context.Background(), - time.Since(unscheduledVM.CreatedAt).Seconds()) - - unscheduledVM.Worker = worker.Name - unscheduledVM.ScheduledAt = time.Now() - - if err := txn.SetVM(unscheduledVM); err != nil { - return err - } - affectedWorkers[worker.Name] = true - - workerInfos.AddVM(worker.Name, unscheduledVM.Resources) - - break - } - } - } + schedulerProfile = clusterSettings.SchedulerProfile return nil - }) - - syncVMsInstruction := rpc.WatchInstruction{ - Action: &rpc.WatchInstruction_SyncVmsAction{}, + }); err != nil { + return err } - for workerToPoke := range affectedWorkers { - // it's fine to ignore the error here, since the worker will sync the VMs on the next cycle - notifyErr := scheduler.notifier.Notify(context.Background(), workerToPoke, &syncVMsInstruction) - if notifyErr != nil { - scheduler.logger.Errorf("Failed to reactively sync VMs on worker %s: %v", workerToPoke, notifyErr) + + unscheduledVMs, workerInfos := ProcessVMs(vms) + +NextVM: + for _, unscheduledVM := range unscheduledVMs { + // Order workers depending on the scheduler profile and + // our updated lagging resource usage for each worker + switch schedulerProfile { + case v1.SchedulerProfileDistributeLoad: + slices.SortFunc(workers, func(a, b v1.Worker) int { + // Sort by the number of running VMs, ascending order + return cmp.Compare(workerInfos[a.Name].NumRunningVMs, + workerInfos[b.Name].NumRunningVMs) + }) + case v1.SchedulerProfileOptimizeUtilization: + fallthrough + default: + slices.SortFunc(workers, func(a, b v1.Worker) int { + // Sort by the number of running VMs, descending order + return cmp.Compare(workerInfos[b.Name].NumRunningVMs, + workerInfos[a.Name].NumRunningVMs) + }) + } + + // Iterate through sorted workers and find a worker that can run this VM + NextWorker: + for _, worker := range workers { + resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed + resourcesRemaining := worker.Resources.Subtracted(resourcesUsed) + + if worker.Offline(scheduler.workerOfflineTimeout) || + worker.SchedulingPaused || + !resourcesRemaining.CanFit(unscheduledVM.Resources) { + continue NextWorker + } + + err := scheduler.store.Update(func(txn storepkg.Transaction) error { + currentUnscheduledVM, err := txn.GetVM(unscheduledVM.Name) + if err != nil { + if errors.Is(err, storepkg.ErrNotFound) { + // The unscheduled VM ceased to exist, + // so nothing to schedule + return ErrVMSchedulingSkipped + } + + return err + } + + if currentUnscheduledVM.UID != unscheduledVM.UID { + // The unscheduled VM had changed, so we'll re-evaluate a new + // version of it in the next scheduling loop iteration + return ErrVMSchedulingSkipped + } + + if currentUnscheduledVM.Status != v1.VMStatusPending || + currentUnscheduledVM.Worker != "" { + // Unscheduled VM is not unscheduled anymore, + // so there's nothing to do + return ErrVMSchedulingSkipped + } + + currentWorker, err := txn.GetWorker(worker.Name) + if err != nil { + if errors.Is(err, storepkg.ErrNotFound) { + // The worker that we were planning to schedule + // this VM on has ceased to exist, so move on + return ErrWorkerSchedulingSkipped + } + + return err + } + + if currentWorker.Offline(scheduler.workerOfflineTimeout) || + currentWorker.SchedulingPaused { + return ErrWorkerSchedulingSkipped + } + + if currentWorker.MachineID != worker.MachineID || + !currentWorker.Resources.Equal(worker.Resources) { + // Worker has changed + return ErrWorkerSchedulingSkipped + } + + unscheduledVM.Worker = worker.Name + unscheduledVM.ScheduledAt = time.Now() + + if err := txn.SetVM(unscheduledVM); err != nil { + return err + } + + return nil + }) + if err != nil { + if errors.Is(err, ErrVMSchedulingSkipped) { + continue NextVM + } + + if errors.Is(err, ErrWorkerSchedulingSkipped) { + continue NextWorker + } + + return err + } + + // Update lagging resource usage + workerInfos.AddVM(worker.Name, unscheduledVM.Resources) + + // Ping the worker afterward for faster VM execution + affectedWorkers.Add(worker.Name) + + // Update metrics + scheduler.schedulingTimeHistogram.Record(context.Background(), + time.Since(unscheduledVM.CreatedAt).Seconds()) + + break } } - return err + for affectedWorker := range affectedWorkers.Iter() { + // It's fine to not treat the error as fatal here, + // since the worker will sync the VMs on the next + // scheduling iteration + if err := scheduler.notifier.Notify(context.Background(), affectedWorker, &rpc.WatchInstruction{ + Action: &rpc.WatchInstruction_SyncVmsAction{}, + }); err != nil { + scheduler.logger.Errorf("Failed to reactively sync VMs on worker %s: %v", affectedWorker, err) + } + } + + return nil } func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) { diff --git a/pkg/resource/v1/resources.go b/pkg/resource/v1/resources.go index 28bcaa1..95d18c9 100644 --- a/pkg/resource/v1/resources.go +++ b/pkg/resource/v1/resources.go @@ -3,6 +3,7 @@ package v1 import ( "errors" "fmt" + "maps" "strconv" ) @@ -98,3 +99,7 @@ func (resources Resources) CanFit(other Resources) bool { return true } + +func (resources Resources) Equal(other Resources) bool { + return maps.Equal(resources, other) +} diff --git a/pkg/resource/v1/resources_test.go b/pkg/resource/v1/resources_test.go index 903ccfd..03212ef 100644 --- a/pkg/resource/v1/resources_test.go +++ b/pkg/resource/v1/resources_test.go @@ -104,3 +104,16 @@ func TestResourcesMerged(t *testing.T) { v1.ResourceTartVMs: 4, })) } + +func TestEqual(t *testing.T) { + //nolint:gocritic // "dupArg: suspicious method call with the same argument and receiver" // it's not suspicious at all + require.True(t, v1.Resources{}.Equal(v1.Resources{})) + + require.True(t, v1.Resources{"a": 10.0}.Equal(v1.Resources{"a": 10.0})) + + require.False(t, v1.Resources{"a": 10.0}.Equal(v1.Resources{"a": 10.0, "b": 15.0})) + + require.False(t, v1.Resources{"a": 10.0, "b": 15.0}.Equal(v1.Resources{"a": 10.0})) + + require.False(t, v1.Resources{"a": 0.0}.Equal(v1.Resources{"b": 0.0})) +}