orchard/internal/controller/scheduler/scheduler.go

563 lines
16 KiB
Go

package scheduler
import (
"cmp"
"context"
"errors"
"slices"
"sort"
"time"
"github.com/cirruslabs/orchard/internal/controller/lifecycle"
"github.com/cirruslabs/orchard/internal/controller/notifier"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/opentelemetry"
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/cirruslabs/orchard/rpc"
mapset "github.com/deckarep/golang-set/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)
const (
schedulerInterval = 5 * time.Second
schedulerVMRestartDelay = 15 * time.Second
)
var (
ErrVMSchedulingSkipped = errors.New("scheduling skipped for VM")
ErrWorkerSchedulingSkipped = errors.New("scheduling skipped for worker")
)
var (
schedulerLoopIterationStat = promauto.NewCounter(prometheus.CounterOpts{
Name: "orchard_scheduler_loop_iteration_total",
})
workersStat = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "orchard_workers",
}, []string{"worker_name", "status"})
vmsStat = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "orchard_vms",
}, []string{"status"})
)
type Scheduler struct {
store storepkg.Store
notifier *notifier.Notifier
workerOfflineTimeout time.Duration
logger *zap.SugaredLogger
schedulingRequested chan bool
prometheusMetrics bool
schedulingTimeHistogram metric.Float64Histogram
}
func NewScheduler(
store storepkg.Store,
notifier *notifier.Notifier,
workerOfflineTimeout time.Duration,
prometheusMetrics bool,
logger *zap.SugaredLogger,
) (*Scheduler, error) {
scheduler := &Scheduler{
store: store,
notifier: notifier,
workerOfflineTimeout: workerOfflineTimeout,
logger: logger,
schedulingRequested: make(chan bool, 1),
prometheusMetrics: prometheusMetrics,
}
// Metrics
var err error
scheduler.schedulingTimeHistogram, err = opentelemetry.DefaultMeter.
Float64Histogram("org.cirruslabs.orchard.controller.scheduling_time")
if err != nil {
return nil, err
}
return scheduler, nil
}
func (scheduler *Scheduler) Run() {
for {
// wait either the scheduling interval or a request to schedule
select {
case <-scheduler.schedulingRequested:
case <-time.After(schedulerInterval):
}
healthCheckingLoopIterationStart := time.Now()
numWorkersHealth, numVMsHealth, err := scheduler.healthCheckingLoopIteration()
healthCheckingLoopIterationEnd := time.Now()
if err != nil {
scheduler.logger.Errorf("Failed to health-check VMs: %v", err)
}
schedulingLoopIterationStart := time.Now()
numWorkersScheduling, numVMsScheduling, err := scheduler.schedulingLoopIteration()
schedulingLoopIterationEnd := time.Now()
scheduler.logger.Debugf("Health checking loop iteration for %d workers and %d VMs took %v, "+
"scheduling loop iteration for %d workers and %d VMs took %v",
numWorkersHealth, numVMsHealth,
healthCheckingLoopIterationEnd.Sub(healthCheckingLoopIterationStart),
numWorkersScheduling, numVMsScheduling,
schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart))
if err != nil {
scheduler.logger.Errorf("Failed to schedule VMs: %v", err)
}
if scheduler.prometheusMetrics {
schedulerLoopIterationStat.Inc()
}
}
}
func (scheduler *Scheduler) reportStats(workers []v1.Worker, vms []v1.VM) {
for _, worker := range workers {
if worker.Offline(scheduler.workerOfflineTimeout) {
workersStat.With(map[string]string{"worker_name": worker.Name, "status": "online"}).Set(0)
workersStat.With(map[string]string{"worker_name": worker.Name, "status": "offline"}).Set(1)
} else {
workersStat.With(map[string]string{"worker_name": worker.Name, "status": "online"}).Set(1)
workersStat.With(map[string]string{"worker_name": worker.Name, "status": "offline"}).Set(0)
}
}
for _, vm := range vms {
vmsStat.With(map[string]string{"status": string(vm.Status)}).Inc()
}
}
func (scheduler *Scheduler) RequestScheduling() {
select {
case scheduler.schedulingRequested <- true:
scheduler.logger.Debugf("Successfully requested scheduling")
default:
scheduler.logger.Debugf("There's already a scheduling request in the queue, skipping")
}
}
//nolint:gocognit,gocyclo // this logic could be seen as even more complex if split into multiple functions
func (scheduler *Scheduler) schedulingLoopIteration() (int, int, error) {
affectedWorkers := mapset.NewSet[string]()
// Scheduler consistency model is based on the following:
//
// EXCLUSIVENESS:
//
// Only one scheduler might operate in a cluster at any given time.
//
// Currently, this is achieved automatically since we run in-process
// BadgerDB that runs in the Orchard Controller, and Orchard Controller
// in turn runs a single scheduler.
//
// In the future, we might support etcd, and in that case leader
// election can be implemented to ensure this property, thanks to
// etcd leases[1].
//
// [1]: https://medium.com/@ahadrana/understanding-etcd3-8784c4f61755
//
// OVERESTIMATION OF USED RESOURCES:
//
// Scheduler acts opportunistically on a lagging view of resource
// usage in the cluster.
//
// This means that we won't assign more than what a worker can handle,
// but we might skip the worker from the consideration, even through
// in reality it can already handle the VM we're scheduling, and assign
// the VM to a next worker, thus slightly violating the scheduler
// profile at times.
//
// LAGGING SCHEDULER PROFILE:
//
// In case the scheduler profile is changed amidst the scheduling loop
// iteration, we'll act on a previously set scheduler profile.
//
// It feels that this is totally fine, assuming that (1) the scheduler
// profile is not something that's changed frequently and that (2) at
// the same time when the scheduler profile is changed, a user won't
// schedule a bunch of VMs in the hope that they'll be serviced using
// that new scheduling profile.
var vms []v1.VM
var workers []v1.Worker
var schedulerProfile v1.SchedulerProfile
if err := scheduler.store.View(func(txn storepkg.Transaction) error {
var err error
vms, err = txn.ListVMs()
if err != nil {
return err
}
workers, err = txn.ListWorkers()
if err != nil {
return err
}
clusterSettings, err := txn.GetClusterSettings()
if err != nil {
return err
}
schedulerProfile = clusterSettings.SchedulerProfile
return nil
}); err != nil {
return 0, 0, err
}
unscheduledVMs, workerInfos := ProcessVMs(vms)
NextVM:
for _, unscheduledVM := range unscheduledVMs {
// Order workers depending on the scheduler profile and
// our updated lagging resource usage for each worker
switch schedulerProfile {
case v1.SchedulerProfileDistributeLoad:
slices.SortFunc(workers, func(a, b v1.Worker) int {
// Sort by the number of running VMs, ascending order
return cmp.Compare(workerInfos[a.Name].NumRunningVMs,
workerInfos[b.Name].NumRunningVMs)
})
case v1.SchedulerProfileOptimizeUtilization:
fallthrough
default:
slices.SortFunc(workers, func(a, b v1.Worker) int {
// Sort by the number of running VMs, descending order
return cmp.Compare(workerInfos[b.Name].NumRunningVMs,
workerInfos[a.Name].NumRunningVMs)
})
}
// Iterate through sorted workers and find a worker that can run this VM
NextWorker:
for _, worker := range workers {
resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed
resourcesRemaining := worker.Resources.Subtracted(resourcesUsed)
if worker.Offline(scheduler.workerOfflineTimeout) ||
worker.SchedulingPaused ||
!resourcesRemaining.CanFit(unscheduledVM.Resources) ||
!worker.Labels.Contains(unscheduledVM.Labels) {
continue NextWorker
}
err := scheduler.store.Update(func(txn storepkg.Transaction) error {
currentUnscheduledVM, err := txn.GetVM(unscheduledVM.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
// The unscheduled VM ceased to exist,
// so nothing to schedule
return ErrVMSchedulingSkipped
}
return err
}
if currentUnscheduledVM.UID != unscheduledVM.UID {
// The unscheduled VM had changed, so we'll re-evaluate a new
// version of it in the next scheduling loop iteration
return ErrVMSchedulingSkipped
}
if unscheduledVM.IsScheduled() {
// Unscheduled VM is not unscheduled anymore,
// so there's nothing to do
return ErrVMSchedulingSkipped
}
if unscheduledVM.TerminalState() {
// We don't support re-scheduling of VMs in terminal state at the moment
return ErrVMSchedulingSkipped
}
if unscheduledVM.PowerState.TerminalState() {
// We don't support re-scheduling of stopped/suspended VMs at the moment
return ErrVMSchedulingSkipped
}
currentWorker, err := txn.GetWorker(worker.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
// The worker that we were planning to schedule
// this VM on has ceased to exist, so move on
return ErrWorkerSchedulingSkipped
}
return err
}
if currentWorker.Offline(scheduler.workerOfflineTimeout) ||
currentWorker.SchedulingPaused {
return ErrWorkerSchedulingSkipped
}
if currentWorker.MachineID != worker.MachineID ||
!currentWorker.Resources.Equal(worker.Resources) {
// Worker has changed
return ErrWorkerSchedulingSkipped
}
unscheduledVM.Worker = worker.Name
unscheduledVM.ScheduledAt = time.Now()
v1.ConditionsSet(&unscheduledVM.Conditions, v1.Condition{
Type: v1.ConditionTypeScheduled,
State: v1.ConditionStateTrue,
})
// Fill out the actual CPU allocation
if unscheduledVM.CPU == 0 {
// Provide defaults for VMs with implicit CPU specification
if worker.DefaultCPU != 0 {
unscheduledVM.AssignedCPU = worker.DefaultCPU
} else {
unscheduledVM.AssignedCPU = 4
}
} else {
unscheduledVM.AssignedCPU = unscheduledVM.CPU
}
// Fill out the actual memory allocation
if unscheduledVM.Memory == 0 {
// Provide defaults for VMs with implicit memory specification
if worker.DefaultMemory != 0 {
unscheduledVM.AssignedMemory = worker.DefaultMemory
} else {
unscheduledVM.AssignedMemory = 8192
}
} else {
unscheduledVM.AssignedMemory = unscheduledVM.Memory
}
if err := txn.SetVM(unscheduledVM); err != nil {
return err
}
return nil
})
if err != nil {
if errors.Is(err, ErrVMSchedulingSkipped) {
continue NextVM
}
if errors.Is(err, ErrWorkerSchedulingSkipped) {
continue NextWorker
}
return 0, 0, err
}
// Update lagging resource usage
workerInfos.AddVM(worker.Name, unscheduledVM.Resources)
// Ping the worker afterward for faster VM execution
affectedWorkers.Add(worker.Name)
// Update metrics
scheduler.schedulingTimeHistogram.Record(context.Background(),
time.Since(unscheduledVM.CreatedAt).Seconds())
break
}
}
for affectedWorker := range affectedWorkers.Iter() {
// It's fine to not treat the error as fatal here,
// since the worker will sync the VMs on the next
// scheduling iteration
notifyContext, notifyContextCancel := context.WithTimeout(context.Background(), time.Second)
if err := scheduler.notifier.Notify(notifyContext, affectedWorker, &rpc.WatchInstruction{
Action: &rpc.WatchInstruction_SyncVmsAction{},
}); err != nil {
scheduler.logger.Errorf("Failed to reactively sync VMs on worker %s: %v", affectedWorker, err)
}
notifyContextCancel()
}
return len(workers), len(vms), nil
}
func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) {
var unscheduledVMs []v1.VM
workerToResources := make(WorkerInfos)
for _, vm := range vms {
if vm.IsScheduled() {
workerToResources.AddVM(vm.Worker, vm.Resources)
} else {
unscheduledVMs = append(unscheduledVMs, vm)
}
}
// Sort unscheduled VMs by the date of creation
sort.Slice(unscheduledVMs, func(i, j int) bool {
return unscheduledVMs[i].CreatedAt.Before(unscheduledVMs[j].CreatedAt)
})
return unscheduledVMs, workerToResources
}
func (scheduler *Scheduler) healthCheckingLoopIteration() (int, int, error) {
// Stats for the caller
var numWorkers, numVMs int
// Get a lagging view of VMs
var vms []v1.VM
if err := scheduler.store.View(func(txn storepkg.Transaction) error {
var err error
vms, err = txn.ListVMs()
if err != nil {
return err
}
numVMs = len(vms)
// Update metrics
if scheduler.prometheusMetrics {
workers, err := txn.ListWorkers()
if err != nil {
return err
}
numWorkers = len(workers)
scheduler.reportStats(workers, vms)
}
return nil
}); err != nil {
return 0, 0, err
}
// Process each VM in a lagging list of VMs in an individual
// transaction, re-checking that the VM still exists
// and it is still scheduled
for _, vm := range vms {
if !vm.IsScheduled() {
// Not a scheduled VM
//
// We'll re-check this below, but this allows us
// to avoid wasting cycles opening a transaction
// for nothing.
continue
}
if err := scheduler.store.Update(func(txn storepkg.Transaction) error {
currentVM, err := txn.GetVM(vm.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
// VM ceased to exist, nothing to do
return nil
}
return err
}
if !vm.IsScheduled() {
// Not a scheduled VM, nothing to do
return nil
}
return scheduler.healthCheckVM(txn, *currentVM)
}); err != nil {
return 0, 0, err
}
}
return numWorkers, numVMs, nil
}
func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, vm v1.VM) error {
logger := scheduler.logger.With("vm_name", vm.Name, "vm_uid", vm.UID, "vm_restart_count", vm.RestartCount)
// Schedule a VM restart if the restart policy mandates it
needsRestart := vm.RestartPolicy == v1.RestartPolicyOnFailure &&
vm.Status == v1.VMStatusFailed &&
time.Since(vm.RestartedAt) > schedulerVMRestartDelay
if needsRestart {
logger.Debugf("restarting VM")
lifecycle.Report(&vm, "VM restarted", scheduler.logger)
vm.Status = v1.VMStatusPending
vm.StatusMessage = ""
vm.Worker = ""
vm.AssignedCPU = 0
vm.AssignedMemory = 0
vm.RestartedAt = time.Now()
vm.RestartCount++
vm.ScheduledAt = time.Time{}
vm.StartedAt = time.Time{}
vm.PowerState = v1.PowerStateRunning
vm.TartName = ondiskname.New(vm.Name, vm.UID, vm.RestartCount).String()
vm.Conditions = []v1.Condition{
{
Type: v1.ConditionTypeScheduled,
State: v1.ConditionStateFalse,
},
}
return txn.SetVM(vm)
}
worker, err := txn.GetWorker(vm.Worker)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
vm.Status = v1.VMStatusFailed
vm.StatusMessage = "VM is assigned to a worker that " +
"doesn't exist anymore"
return txn.SetVM(vm)
}
return err
}
if worker.Offline(scheduler.workerOfflineTimeout) && !vm.TerminalState() {
vm.Status = v1.VMStatusFailed
vm.StatusMessage = "VM is assigned to a worker that " +
"lost connection with the controller"
return txn.SetVM(vm)
}
if vm.PowerState.TerminalState() && v1.ConditionIsFalse(vm.Conditions, v1.ConditionTypeRunning) {
// VM has entered a terminal power state and stopped running,
// de-schedule it to free up resources
v1.ConditionsSet(&vm.Conditions, v1.Condition{
Type: v1.ConditionTypeScheduled,
State: v1.ConditionStateFalse,
})
return txn.SetVM(vm)
}
if vm.TerminalState() {
// VM has entered a terminal state,
// de-schedule it to free up resources
v1.ConditionsSet(&vm.Conditions, v1.Condition{
Type: v1.ConditionTypeScheduled,
State: v1.ConditionStateFalse,
})
// Also correct the conditions for the worker
v1.ConditionsSet(&vm.Conditions, v1.Condition{
Type: v1.ConditionTypeRunning,
State: v1.ConditionStateFalse,
})
return txn.SetVM(vm)
}
return nil
}