563 lines
16 KiB
Go
563 lines
16 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"errors"
|
|
"slices"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/cirruslabs/orchard/internal/controller/lifecycle"
|
|
"github.com/cirruslabs/orchard/internal/controller/notifier"
|
|
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
|
|
"github.com/cirruslabs/orchard/internal/opentelemetry"
|
|
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
|
|
"github.com/cirruslabs/orchard/pkg/resource/v1"
|
|
"github.com/cirruslabs/orchard/rpc"
|
|
mapset "github.com/deckarep/golang-set/v2"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
schedulerInterval = 5 * time.Second
|
|
|
|
schedulerVMRestartDelay = 15 * time.Second
|
|
)
|
|
|
|
var (
|
|
ErrVMSchedulingSkipped = errors.New("scheduling skipped for VM")
|
|
ErrWorkerSchedulingSkipped = errors.New("scheduling skipped for worker")
|
|
)
|
|
|
|
var (
|
|
schedulerLoopIterationStat = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "orchard_scheduler_loop_iteration_total",
|
|
})
|
|
workersStat = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "orchard_workers",
|
|
}, []string{"worker_name", "status"})
|
|
vmsStat = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "orchard_vms",
|
|
}, []string{"status"})
|
|
)
|
|
|
|
type Scheduler struct {
|
|
store storepkg.Store
|
|
notifier *notifier.Notifier
|
|
workerOfflineTimeout time.Duration
|
|
logger *zap.SugaredLogger
|
|
schedulingRequested chan bool
|
|
prometheusMetrics bool
|
|
|
|
schedulingTimeHistogram metric.Float64Histogram
|
|
}
|
|
|
|
func NewScheduler(
|
|
store storepkg.Store,
|
|
notifier *notifier.Notifier,
|
|
workerOfflineTimeout time.Duration,
|
|
prometheusMetrics bool,
|
|
logger *zap.SugaredLogger,
|
|
) (*Scheduler, error) {
|
|
scheduler := &Scheduler{
|
|
store: store,
|
|
notifier: notifier,
|
|
workerOfflineTimeout: workerOfflineTimeout,
|
|
logger: logger,
|
|
schedulingRequested: make(chan bool, 1),
|
|
prometheusMetrics: prometheusMetrics,
|
|
}
|
|
|
|
// Metrics
|
|
var err error
|
|
|
|
scheduler.schedulingTimeHistogram, err = opentelemetry.DefaultMeter.
|
|
Float64Histogram("org.cirruslabs.orchard.controller.scheduling_time")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return scheduler, nil
|
|
}
|
|
|
|
func (scheduler *Scheduler) Run() {
|
|
for {
|
|
// wait either the scheduling interval or a request to schedule
|
|
select {
|
|
case <-scheduler.schedulingRequested:
|
|
case <-time.After(schedulerInterval):
|
|
}
|
|
|
|
healthCheckingLoopIterationStart := time.Now()
|
|
numWorkersHealth, numVMsHealth, err := scheduler.healthCheckingLoopIteration()
|
|
healthCheckingLoopIterationEnd := time.Now()
|
|
if err != nil {
|
|
scheduler.logger.Errorf("Failed to health-check VMs: %v", err)
|
|
}
|
|
|
|
schedulingLoopIterationStart := time.Now()
|
|
numWorkersScheduling, numVMsScheduling, err := scheduler.schedulingLoopIteration()
|
|
schedulingLoopIterationEnd := time.Now()
|
|
|
|
scheduler.logger.Debugf("Health checking loop iteration for %d workers and %d VMs took %v, "+
|
|
"scheduling loop iteration for %d workers and %d VMs took %v",
|
|
numWorkersHealth, numVMsHealth,
|
|
healthCheckingLoopIterationEnd.Sub(healthCheckingLoopIterationStart),
|
|
numWorkersScheduling, numVMsScheduling,
|
|
schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart))
|
|
|
|
if err != nil {
|
|
scheduler.logger.Errorf("Failed to schedule VMs: %v", err)
|
|
}
|
|
|
|
if scheduler.prometheusMetrics {
|
|
schedulerLoopIterationStat.Inc()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (scheduler *Scheduler) reportStats(workers []v1.Worker, vms []v1.VM) {
|
|
for _, worker := range workers {
|
|
if worker.Offline(scheduler.workerOfflineTimeout) {
|
|
workersStat.With(map[string]string{"worker_name": worker.Name, "status": "online"}).Set(0)
|
|
workersStat.With(map[string]string{"worker_name": worker.Name, "status": "offline"}).Set(1)
|
|
} else {
|
|
workersStat.With(map[string]string{"worker_name": worker.Name, "status": "online"}).Set(1)
|
|
workersStat.With(map[string]string{"worker_name": worker.Name, "status": "offline"}).Set(0)
|
|
}
|
|
}
|
|
for _, vm := range vms {
|
|
vmsStat.With(map[string]string{"status": string(vm.Status)}).Inc()
|
|
}
|
|
}
|
|
|
|
func (scheduler *Scheduler) RequestScheduling() {
|
|
select {
|
|
case scheduler.schedulingRequested <- true:
|
|
scheduler.logger.Debugf("Successfully requested scheduling")
|
|
default:
|
|
scheduler.logger.Debugf("There's already a scheduling request in the queue, skipping")
|
|
}
|
|
}
|
|
|
|
//nolint:gocognit,gocyclo // this logic could be seen as even more complex if split into multiple functions
|
|
func (scheduler *Scheduler) schedulingLoopIteration() (int, int, error) {
|
|
affectedWorkers := mapset.NewSet[string]()
|
|
|
|
// Scheduler consistency model is based on the following:
|
|
//
|
|
// EXCLUSIVENESS:
|
|
//
|
|
// Only one scheduler might operate in a cluster at any given time.
|
|
//
|
|
// Currently, this is achieved automatically since we run in-process
|
|
// BadgerDB that runs in the Orchard Controller, and Orchard Controller
|
|
// in turn runs a single scheduler.
|
|
//
|
|
// In the future, we might support etcd, and in that case leader
|
|
// election can be implemented to ensure this property, thanks to
|
|
// etcd leases[1].
|
|
//
|
|
// [1]: https://medium.com/@ahadrana/understanding-etcd3-8784c4f61755
|
|
//
|
|
// OVERESTIMATION OF USED RESOURCES:
|
|
//
|
|
// Scheduler acts opportunistically on a lagging view of resource
|
|
// usage in the cluster.
|
|
//
|
|
// This means that we won't assign more than what a worker can handle,
|
|
// but we might skip the worker from the consideration, even through
|
|
// in reality it can already handle the VM we're scheduling, and assign
|
|
// the VM to a next worker, thus slightly violating the scheduler
|
|
// profile at times.
|
|
//
|
|
// LAGGING SCHEDULER PROFILE:
|
|
//
|
|
// In case the scheduler profile is changed amidst the scheduling loop
|
|
// iteration, we'll act on a previously set scheduler profile.
|
|
//
|
|
// It feels that this is totally fine, assuming that (1) the scheduler
|
|
// profile is not something that's changed frequently and that (2) at
|
|
// the same time when the scheduler profile is changed, a user won't
|
|
// schedule a bunch of VMs in the hope that they'll be serviced using
|
|
// that new scheduling profile.
|
|
|
|
var vms []v1.VM
|
|
var workers []v1.Worker
|
|
var schedulerProfile v1.SchedulerProfile
|
|
|
|
if err := scheduler.store.View(func(txn storepkg.Transaction) error {
|
|
var err error
|
|
|
|
vms, err = txn.ListVMs()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
workers, err = txn.ListWorkers()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
clusterSettings, err := txn.GetClusterSettings()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
schedulerProfile = clusterSettings.SchedulerProfile
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
unscheduledVMs, workerInfos := ProcessVMs(vms)
|
|
|
|
NextVM:
|
|
for _, unscheduledVM := range unscheduledVMs {
|
|
// Order workers depending on the scheduler profile and
|
|
// our updated lagging resource usage for each worker
|
|
switch schedulerProfile {
|
|
case v1.SchedulerProfileDistributeLoad:
|
|
slices.SortFunc(workers, func(a, b v1.Worker) int {
|
|
// Sort by the number of running VMs, ascending order
|
|
return cmp.Compare(workerInfos[a.Name].NumRunningVMs,
|
|
workerInfos[b.Name].NumRunningVMs)
|
|
})
|
|
case v1.SchedulerProfileOptimizeUtilization:
|
|
fallthrough
|
|
default:
|
|
slices.SortFunc(workers, func(a, b v1.Worker) int {
|
|
// Sort by the number of running VMs, descending order
|
|
return cmp.Compare(workerInfos[b.Name].NumRunningVMs,
|
|
workerInfos[a.Name].NumRunningVMs)
|
|
})
|
|
}
|
|
|
|
// Iterate through sorted workers and find a worker that can run this VM
|
|
NextWorker:
|
|
for _, worker := range workers {
|
|
resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed
|
|
resourcesRemaining := worker.Resources.Subtracted(resourcesUsed)
|
|
|
|
if worker.Offline(scheduler.workerOfflineTimeout) ||
|
|
worker.SchedulingPaused ||
|
|
!resourcesRemaining.CanFit(unscheduledVM.Resources) ||
|
|
!worker.Labels.Contains(unscheduledVM.Labels) {
|
|
continue NextWorker
|
|
}
|
|
|
|
err := scheduler.store.Update(func(txn storepkg.Transaction) error {
|
|
currentUnscheduledVM, err := txn.GetVM(unscheduledVM.Name)
|
|
if err != nil {
|
|
if errors.Is(err, storepkg.ErrNotFound) {
|
|
// The unscheduled VM ceased to exist,
|
|
// so nothing to schedule
|
|
return ErrVMSchedulingSkipped
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
if currentUnscheduledVM.UID != unscheduledVM.UID {
|
|
// The unscheduled VM had changed, so we'll re-evaluate a new
|
|
// version of it in the next scheduling loop iteration
|
|
return ErrVMSchedulingSkipped
|
|
}
|
|
|
|
if unscheduledVM.IsScheduled() {
|
|
// Unscheduled VM is not unscheduled anymore,
|
|
// so there's nothing to do
|
|
return ErrVMSchedulingSkipped
|
|
}
|
|
|
|
if unscheduledVM.TerminalState() {
|
|
// We don't support re-scheduling of VMs in terminal state at the moment
|
|
return ErrVMSchedulingSkipped
|
|
}
|
|
|
|
if unscheduledVM.PowerState.TerminalState() {
|
|
// We don't support re-scheduling of stopped/suspended VMs at the moment
|
|
return ErrVMSchedulingSkipped
|
|
}
|
|
|
|
currentWorker, err := txn.GetWorker(worker.Name)
|
|
if err != nil {
|
|
if errors.Is(err, storepkg.ErrNotFound) {
|
|
// The worker that we were planning to schedule
|
|
// this VM on has ceased to exist, so move on
|
|
return ErrWorkerSchedulingSkipped
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
if currentWorker.Offline(scheduler.workerOfflineTimeout) ||
|
|
currentWorker.SchedulingPaused {
|
|
return ErrWorkerSchedulingSkipped
|
|
}
|
|
|
|
if currentWorker.MachineID != worker.MachineID ||
|
|
!currentWorker.Resources.Equal(worker.Resources) {
|
|
// Worker has changed
|
|
return ErrWorkerSchedulingSkipped
|
|
}
|
|
|
|
unscheduledVM.Worker = worker.Name
|
|
unscheduledVM.ScheduledAt = time.Now()
|
|
v1.ConditionsSet(&unscheduledVM.Conditions, v1.Condition{
|
|
Type: v1.ConditionTypeScheduled,
|
|
State: v1.ConditionStateTrue,
|
|
})
|
|
|
|
// Fill out the actual CPU allocation
|
|
if unscheduledVM.CPU == 0 {
|
|
// Provide defaults for VMs with implicit CPU specification
|
|
if worker.DefaultCPU != 0 {
|
|
unscheduledVM.AssignedCPU = worker.DefaultCPU
|
|
} else {
|
|
unscheduledVM.AssignedCPU = 4
|
|
}
|
|
} else {
|
|
unscheduledVM.AssignedCPU = unscheduledVM.CPU
|
|
}
|
|
|
|
// Fill out the actual memory allocation
|
|
if unscheduledVM.Memory == 0 {
|
|
// Provide defaults for VMs with implicit memory specification
|
|
if worker.DefaultMemory != 0 {
|
|
unscheduledVM.AssignedMemory = worker.DefaultMemory
|
|
} else {
|
|
unscheduledVM.AssignedMemory = 8192
|
|
}
|
|
} else {
|
|
unscheduledVM.AssignedMemory = unscheduledVM.Memory
|
|
}
|
|
|
|
if err := txn.SetVM(unscheduledVM); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
if errors.Is(err, ErrVMSchedulingSkipped) {
|
|
continue NextVM
|
|
}
|
|
|
|
if errors.Is(err, ErrWorkerSchedulingSkipped) {
|
|
continue NextWorker
|
|
}
|
|
|
|
return 0, 0, err
|
|
}
|
|
|
|
// Update lagging resource usage
|
|
workerInfos.AddVM(worker.Name, unscheduledVM.Resources)
|
|
|
|
// Ping the worker afterward for faster VM execution
|
|
affectedWorkers.Add(worker.Name)
|
|
|
|
// Update metrics
|
|
scheduler.schedulingTimeHistogram.Record(context.Background(),
|
|
time.Since(unscheduledVM.CreatedAt).Seconds())
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
for affectedWorker := range affectedWorkers.Iter() {
|
|
// It's fine to not treat the error as fatal here,
|
|
// since the worker will sync the VMs on the next
|
|
// scheduling iteration
|
|
notifyContext, notifyContextCancel := context.WithTimeout(context.Background(), time.Second)
|
|
if err := scheduler.notifier.Notify(notifyContext, affectedWorker, &rpc.WatchInstruction{
|
|
Action: &rpc.WatchInstruction_SyncVmsAction{},
|
|
}); err != nil {
|
|
scheduler.logger.Errorf("Failed to reactively sync VMs on worker %s: %v", affectedWorker, err)
|
|
}
|
|
notifyContextCancel()
|
|
}
|
|
|
|
return len(workers), len(vms), nil
|
|
}
|
|
|
|
func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) {
|
|
var unscheduledVMs []v1.VM
|
|
workerToResources := make(WorkerInfos)
|
|
|
|
for _, vm := range vms {
|
|
if vm.IsScheduled() {
|
|
workerToResources.AddVM(vm.Worker, vm.Resources)
|
|
} else {
|
|
unscheduledVMs = append(unscheduledVMs, vm)
|
|
}
|
|
}
|
|
|
|
// Sort unscheduled VMs by the date of creation
|
|
sort.Slice(unscheduledVMs, func(i, j int) bool {
|
|
return unscheduledVMs[i].CreatedAt.Before(unscheduledVMs[j].CreatedAt)
|
|
})
|
|
|
|
return unscheduledVMs, workerToResources
|
|
}
|
|
|
|
func (scheduler *Scheduler) healthCheckingLoopIteration() (int, int, error) {
|
|
// Stats for the caller
|
|
var numWorkers, numVMs int
|
|
|
|
// Get a lagging view of VMs
|
|
var vms []v1.VM
|
|
|
|
if err := scheduler.store.View(func(txn storepkg.Transaction) error {
|
|
var err error
|
|
|
|
vms, err = txn.ListVMs()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
numVMs = len(vms)
|
|
|
|
// Update metrics
|
|
if scheduler.prometheusMetrics {
|
|
workers, err := txn.ListWorkers()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
numWorkers = len(workers)
|
|
|
|
scheduler.reportStats(workers, vms)
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
// Process each VM in a lagging list of VMs in an individual
|
|
// transaction, re-checking that the VM still exists
|
|
// and it is still scheduled
|
|
for _, vm := range vms {
|
|
if !vm.IsScheduled() {
|
|
// Not a scheduled VM
|
|
//
|
|
// We'll re-check this below, but this allows us
|
|
// to avoid wasting cycles opening a transaction
|
|
// for nothing.
|
|
continue
|
|
}
|
|
|
|
if err := scheduler.store.Update(func(txn storepkg.Transaction) error {
|
|
currentVM, err := txn.GetVM(vm.Name)
|
|
if err != nil {
|
|
if errors.Is(err, storepkg.ErrNotFound) {
|
|
// VM ceased to exist, nothing to do
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
if !vm.IsScheduled() {
|
|
// Not a scheduled VM, nothing to do
|
|
return nil
|
|
}
|
|
|
|
return scheduler.healthCheckVM(txn, *currentVM)
|
|
}); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
}
|
|
|
|
return numWorkers, numVMs, nil
|
|
}
|
|
|
|
func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, vm v1.VM) error {
|
|
logger := scheduler.logger.With("vm_name", vm.Name, "vm_uid", vm.UID, "vm_restart_count", vm.RestartCount)
|
|
|
|
// Schedule a VM restart if the restart policy mandates it
|
|
needsRestart := vm.RestartPolicy == v1.RestartPolicyOnFailure &&
|
|
vm.Status == v1.VMStatusFailed &&
|
|
time.Since(vm.RestartedAt) > schedulerVMRestartDelay
|
|
|
|
if needsRestart {
|
|
logger.Debugf("restarting VM")
|
|
|
|
lifecycle.Report(&vm, "VM restarted", scheduler.logger)
|
|
|
|
vm.Status = v1.VMStatusPending
|
|
vm.StatusMessage = ""
|
|
vm.Worker = ""
|
|
vm.AssignedCPU = 0
|
|
vm.AssignedMemory = 0
|
|
vm.RestartedAt = time.Now()
|
|
vm.RestartCount++
|
|
vm.ScheduledAt = time.Time{}
|
|
vm.StartedAt = time.Time{}
|
|
vm.PowerState = v1.PowerStateRunning
|
|
vm.TartName = ondiskname.New(vm.Name, vm.UID, vm.RestartCount).String()
|
|
vm.Conditions = []v1.Condition{
|
|
{
|
|
Type: v1.ConditionTypeScheduled,
|
|
State: v1.ConditionStateFalse,
|
|
},
|
|
}
|
|
|
|
return txn.SetVM(vm)
|
|
}
|
|
|
|
worker, err := txn.GetWorker(vm.Worker)
|
|
if err != nil {
|
|
if errors.Is(err, storepkg.ErrNotFound) {
|
|
vm.Status = v1.VMStatusFailed
|
|
vm.StatusMessage = "VM is assigned to a worker that " +
|
|
"doesn't exist anymore"
|
|
|
|
return txn.SetVM(vm)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
if worker.Offline(scheduler.workerOfflineTimeout) && !vm.TerminalState() {
|
|
vm.Status = v1.VMStatusFailed
|
|
vm.StatusMessage = "VM is assigned to a worker that " +
|
|
"lost connection with the controller"
|
|
|
|
return txn.SetVM(vm)
|
|
}
|
|
|
|
if vm.PowerState.TerminalState() && v1.ConditionIsFalse(vm.Conditions, v1.ConditionTypeRunning) {
|
|
// VM has entered a terminal power state and stopped running,
|
|
// de-schedule it to free up resources
|
|
v1.ConditionsSet(&vm.Conditions, v1.Condition{
|
|
Type: v1.ConditionTypeScheduled,
|
|
State: v1.ConditionStateFalse,
|
|
})
|
|
|
|
return txn.SetVM(vm)
|
|
}
|
|
|
|
if vm.TerminalState() {
|
|
// VM has entered a terminal state,
|
|
// de-schedule it to free up resources
|
|
v1.ConditionsSet(&vm.Conditions, v1.Condition{
|
|
Type: v1.ConditionTypeScheduled,
|
|
State: v1.ConditionStateFalse,
|
|
})
|
|
|
|
// Also correct the conditions for the worker
|
|
v1.ConditionsSet(&vm.Conditions, v1.Condition{
|
|
Type: v1.ConditionTypeRunning,
|
|
State: v1.ConditionStateFalse,
|
|
})
|
|
|
|
return txn.SetVM(vm)
|
|
}
|
|
|
|
return nil
|
|
}
|