"--scheduler-profile" option to allow different orchestration patterns (#224)

* "--scheduler-profile" option to allow different orchestration patterns

* API(cluster settings): provide a default value for scheduler profile
This commit is contained in:
Nikolay Edigaryev 2024-11-28 20:07:46 +04:00 committed by GitHub
parent 4c63cea062
commit 7fe0414981
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 338 additions and 99 deletions

View File

@ -41,6 +41,8 @@ func runGetClusterSettings(cmd *cobra.Command, args []string) error {
hostDirPoliciesDescription := strings.Join(hostDirPoliciesAsStrings, ",")
table.AddRow("hostDir policies", nonEmptyOrNone(hostDirPoliciesDescription))
table.AddRow("Scheduler profile", clusterSettings.SchedulerProfile)
fmt.Println(table)
return nil

View File

@ -11,56 +11,78 @@ import (
var ErrClusterSettingsFailed = errors.New("failed to set cluster settings")
var hostDirPoliciesRaw []string
var schedulerProfileRaw string
const hostDirPoliciesFlag = "host-dir-policies"
const (
hostDirPoliciesFlag = "host-dir-policies"
schedulerProfileFlag = "scheduler-profile"
)
func newSetClusterSettingsCommand() *cobra.Command {
command := &cobra.Command{
cmd := &cobra.Command{
Use: "cluster-settings",
Short: "Set cluster settings",
RunE: runSetClusterSettings,
}
command.PersistentFlags().StringSliceVar(&hostDirPoliciesRaw, hostDirPoliciesFlag, []string{},
cmd.Flags().StringSliceVar(&hostDirPoliciesRaw, hostDirPoliciesFlag, []string{},
fmt.Sprintf("comma-separated list of hostDir policies containing an allowed path prefix "+
"and an optional \":ro\" modifier to only allow read-only mounts for that path prefix "+
"(for example, --%s=/Users/ci/sources:ro,/tmp)", hostDirPoliciesFlag))
cmd.Flags().StringVar(&schedulerProfileRaw, schedulerProfileFlag, "", fmt.Sprintf(
`scheduler profile to use:
return command
* --%s=%s when scheduling a pending VM to a worker, pick the busiest worker that can fit a VM first,
falling back to less busier workers (this is the default behavior when no explicit scheduler profile is set)
* --%s=%s when scheduling a pending VM to a worker, pick the least occupied worker that can fit a VM first,
falling back to more busier workers
`, schedulerProfileFlag, v1.SchedulerProfileOptimizeUtilization, schedulerProfileFlag,
v1.SchedulerProfileDistributeLoad))
return cmd
}
func runSetClusterSettings(cmd *cobra.Command, args []string) error {
// Convert arguments
var hostDirPolicies []v1.HostDirPolicy
for _, hostDirPolicyRaw := range hostDirPoliciesRaw {
hostDirPolicy, err := v1.NewHostDirPolicyFromString(hostDirPolicyRaw)
if err != nil {
return err
}
hostDirPolicies = append(hostDirPolicies, hostDirPolicy)
}
// Check if we need to update anything in the cluster settings
if !cmd.Flag(hostDirPoliciesFlag).Changed {
return fmt.Errorf("%w: you need to specify at least one setting to update",
ErrClusterSettingsFailed)
}
// Update cluster settings
client, err := client.New()
if err != nil {
return err
}
var needUpdate bool
clusterSettings, err := client.ClusterSettings().Get(cmd.Context())
if err != nil {
return err
}
if cmd.Flag(hostDirPoliciesFlag).Changed {
clusterSettings.HostDirPolicies = hostDirPolicies
for _, hostDirPolicyRaw := range hostDirPoliciesRaw {
hostDirPolicy, err := v1.NewHostDirPolicyFromString(hostDirPolicyRaw)
if err != nil {
return err
}
clusterSettings.HostDirPolicies = append(clusterSettings.HostDirPolicies, hostDirPolicy)
}
needUpdate = true
}
if cmd.Flag(schedulerProfileFlag).Changed {
clusterSettings.SchedulerProfile, err = v1.NewSchedulerProfile(schedulerProfileRaw)
if err != nil {
return err
}
needUpdate = true
}
// Check if we need to update anything in the cluster settings
if !needUpdate {
return fmt.Errorf("%w: you need to specify at least one setting to update", ErrClusterSettingsFailed)
}
return client.ClusterSettings().Set(cmd.Context(), clusterSettings)

View File

@ -44,6 +44,14 @@ func (controller *Controller) updateClusterSettings(ctx *gin.Context) responder.
}
}
if clusterSettings.SchedulerProfile == "" {
clusterSettings.SchedulerProfile = v1.SchedulerProfileOptimizeUtilization
} else {
if _, err := v1.NewSchedulerProfile(string(clusterSettings.SchedulerProfile)); err != nil {
return responder.JSON(http.StatusBadRequest, NewErrorResponse("%v", err))
}
}
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
if err := txn.SetClusterSettings(clusterSettings); err != nil {
controller.logger.Errorf("failed to set cluster settings in the DB: %v", err)

View File

@ -350,10 +350,10 @@ func (controller *Controller) initializeMetrics() error {
return err
}
_, workerToResources := scheduler.ProcessVMs(vms)
_, workerInfos := scheduler.ProcessVMs(vms)
for _, worker := range workers {
resourcesUsed := workerToResources.Get(worker.Name)
resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed
for key, value := range resourcesUsed {
observer.Observe(int64(value), metric.WithAttributes(

View File

@ -1,6 +1,7 @@
package scheduler
import (
"cmp"
"context"
"github.com/cirruslabs/orchard/internal/controller/lifecycle"
"github.com/cirruslabs/orchard/internal/controller/notifier"
@ -12,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"slices"
"sort"
"time"
)
@ -121,17 +123,41 @@ func (scheduler *Scheduler) schedulingLoopIteration() error {
if err != nil {
return err
}
unscheduledVMs, workerToResources := ProcessVMs(vms)
unscheduledVMs, workerInfos := ProcessVMs(vms)
workers, err := txn.ListWorkers()
if err != nil {
return err
}
// Retrieve cluster settings to figure out which scheduler profile to use
clusterSettings, err := txn.GetClusterSettings()
if err != nil {
return err
}
for _, unscheduledVM := range unscheduledVMs {
// Order workers depending on the scheduler profile
switch clusterSettings.SchedulerProfile {
case v1.SchedulerProfileDistributeLoad:
slices.SortFunc(workers, func(a, b v1.Worker) int {
// Sort by the number of running VMs, ascending order
return cmp.Compare(workerInfos[a.Name].NumRunningVMs,
workerInfos[b.Name].NumRunningVMs)
})
case v1.SchedulerProfileOptimizeUtilization:
fallthrough
default:
slices.SortFunc(workers, func(a, b v1.Worker) int {
// Sort by the number of running VMs, descending order
return cmp.Compare(workerInfos[b.Name].NumRunningVMs,
workerInfos[a.Name].NumRunningVMs)
})
}
// Find a worker that can run this VM
for _, worker := range workers {
resourcesUsed := workerToResources.Get(worker.Name)
resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed
resourcesRemaining := worker.Resources.Subtracted(resourcesUsed)
if resourcesRemaining.CanFit(unscheduledVM.Resources) &&
@ -149,7 +175,7 @@ func (scheduler *Scheduler) schedulingLoopIteration() error {
}
affectedWorkers[worker.Name] = true
workerToResources.Add(worker.Name, unscheduledVM.Resources)
workerInfos.AddVM(worker.Name, unscheduledVM.Resources)
break
}
@ -173,15 +199,15 @@ func (scheduler *Scheduler) schedulingLoopIteration() error {
return err
}
func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerToResources) {
func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) {
var unscheduledVMs []v1.VM
workerToResources := make(WorkerToResources)
workerToResources := make(WorkerInfos)
for _, vm := range vms {
if vm.Worker == "" {
unscheduledVMs = append(unscheduledVMs, vm)
} else if !vm.TerminalState() {
workerToResources.Add(vm.Worker, vm.Resources)
workerToResources.AddVM(vm.Worker, vm.Resources)
}
}

View File

@ -0,0 +1,37 @@
package scheduler
import v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
type WorkerInfo struct {
ResourcesUsed v1.Resources
NumRunningVMs int
}
type WorkerInfos map[string]WorkerInfo
func (workerInfos WorkerInfos) AddVM(name string, resourcesUsed v1.Resources) {
workerInfo, ok := workerInfos[name]
if !ok {
workerInfo = WorkerInfo{
ResourcesUsed: v1.Resources{},
}
}
workerInfo.ResourcesUsed.Add(resourcesUsed)
workerInfo.NumRunningVMs++
workerInfos[name] = workerInfo
}
func (workerInfos WorkerInfos) Get(name string) WorkerInfo {
workerInfo, ok := workerInfos[name]
if !ok {
workerInfo = WorkerInfo{
ResourcesUsed: v1.Resources{},
}
workerInfos[name] = workerInfo
}
return workerInfo
}

View File

@ -0,0 +1,29 @@
package scheduler_test
import (
"github.com/cirruslabs/orchard/internal/controller/scheduler"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/stretchr/testify/require"
"testing"
)
func TestWorkerInfos(t *testing.T) {
workerInfos := make(scheduler.WorkerInfos)
require.Len(t, workerInfos, 0)
workerInfos.AddVM("worker-name", v1.Resources{
"tart-vms": 1,
})
require.Len(t, workerInfos, 1)
workerInfos.AddVM("worker-name", v1.Resources{
"tart-vms": 1,
})
require.Len(t, workerInfos, 1)
require.Equal(t, scheduler.WorkerInfo{
ResourcesUsed: map[string]uint64{
"tart-vms": 2,
},
NumRunningVMs: 2,
}, workerInfos.Get("worker-name"))
}

View File

@ -1,26 +0,0 @@
package scheduler
import v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
type WorkerToResources map[string]v1.Resources
func (workerToResources WorkerToResources) Add(name string, other v1.Resources) {
workerResources, ok := workerToResources[name]
if !ok {
workerResources = make(v1.Resources)
}
workerResources.Add(other)
workerToResources[name] = workerResources
}
func (workerToResources WorkerToResources) Get(name string) v1.Resources {
workerResources, ok := workerToResources[name]
if !ok {
workerResources = make(v1.Resources)
workerToResources[name] = workerResources
}
return workerResources
}

View File

@ -1,24 +0,0 @@
package scheduler_test
import (
"github.com/cirruslabs/orchard/internal/controller/scheduler"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/stretchr/testify/require"
"testing"
)
func TestWorkerToResources(t *testing.T) {
workerToResources := make(scheduler.WorkerToResources)
require.Len(t, workerToResources, 0)
workerToResources.Add("worker-name", v1.Resources{
"tart-vms": 1,
})
require.Len(t, workerToResources, 1)
workerToResources.Add("worker-name", v1.Resources{
"tart-vms": 1,
})
require.Len(t, workerToResources, 1)
require.Equal(t, v1.Resources{"tart-vms": 2}, workerToResources.Get("worker-name"))
}

View File

@ -14,12 +14,17 @@ import (
)
func StartIntegrationTestEnvironment(t *testing.T) (*client.Client, *controller.Controller, *worker.Worker) {
return StartIntegrationTestEnvironmentWithAdditionalOpts(t, nil, nil)
return StartIntegrationTestEnvironmentWithAdditionalOpts(t,
false, nil,
false, nil,
)
}
func StartIntegrationTestEnvironmentWithAdditionalOpts(
t *testing.T,
noController bool,
additionalControllerOpts []controller.Option,
noWorker bool,
additionalWorkerOpts []worker.Option,
) (*client.Client, *controller.Controller, *worker.Worker) {
t.Setenv("ORCHARD_HOME", t.TempDir())
@ -34,19 +39,23 @@ func StartIntegrationTestEnvironmentWithAdditionalOpts(
devContext, cancelDevFunc := context.WithCancel(context.Background())
t.Cleanup(cancelDevFunc)
go func() {
err := devController.Run(devContext)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) {
t.Errorf("dev controller failed: %v", err)
}
}()
if !noController {
go func() {
err := devController.Run(devContext)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) {
t.Errorf("dev controller failed: %v", err)
}
}()
}
go func() {
err := devWorker.Run(devContext)
if err != nil && !errors.Is(err, context.Canceled) {
t.Errorf("dev worker failed: %v", err)
}
}()
if !noWorker {
go func() {
err := devWorker.Run(devContext)
if err != nil && !errors.Is(err, context.Canceled) {
t.Errorf("dev worker failed: %v", err)
}
}()
}
time.Sleep(5 * time.Second)

View File

@ -276,7 +276,9 @@ func TestSchedulerHealthCheckingOfflineWorker(t *testing.T) {
ctx := context.Background()
devClient, _, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t,
[]controller.Option{controller.WithWorkerOfflineTimeout(1 * time.Minute)}, nil)
false, []controller.Option{controller.WithWorkerOfflineTimeout(1 * time.Minute)},
false, nil,
)
const (
dummyWorkerName = "dummy-worker"

View File

@ -0,0 +1,130 @@
package tests_test
import (
"context"
"github.com/cirruslabs/orchard/internal/tests/devcontroller"
"github.com/cirruslabs/orchard/internal/tests/wait"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestSchedulerProfileOptimizeUtilization(t *testing.T) {
ctx := context.Background()
// Create a development environment
devClient, _, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t,
false, nil,
true, nil,
)
// Change the scheduler to pack as many VMs as possible on each worker
clusterSettings, err := devClient.ClusterSettings().Get(ctx)
require.NoError(t, err)
clusterSettings.SchedulerProfile = v1.SchedulerProfileOptimizeUtilization
require.NoError(t, devClient.ClusterSettings().Set(ctx, clusterSettings))
// Create three workers and three VMs
threeWorkersThreeVMsScenario(t, devClient)
ensureAssignment(t, devClient, "test-vm-1", "worker-a")
ensureAssignment(t, devClient, "test-vm-2", "worker-a")
ensureAssignment(t, devClient, "test-vm-3", "worker-a")
}
func TestSchedulerProfileDistributeLoad(t *testing.T) {
ctx := context.Background()
// Create a development environment
devClient, _, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t,
false, nil,
true, nil,
)
// Change the scheduler to spread VMs as much as possible between workers
clusterSettings, err := devClient.ClusterSettings().Get(ctx)
require.NoError(t, err)
clusterSettings.SchedulerProfile = v1.SchedulerProfileDistributeLoad
require.NoError(t, devClient.ClusterSettings().Set(ctx, clusterSettings))
// Create three workers and three VMs
threeWorkersThreeVMsScenario(t, devClient)
ensureAssignment(t, devClient, "test-vm-1", "worker-a")
ensureAssignment(t, devClient, "test-vm-2", "worker-b")
ensureAssignment(t, devClient, "test-vm-3", "worker-c")
}
func threeWorkersThreeVMsScenario(t *testing.T, devClient *client.Client) {
ctx := context.Background()
_, err := devClient.Workers().Create(ctx, v1.Worker{
Meta: v1.Meta{
Name: "worker-a",
},
Resources: map[string]uint64{
v1.ResourceTartVMs: 3,
},
})
require.NoError(t, err)
_, err = devClient.Workers().Create(ctx, v1.Worker{
Meta: v1.Meta{
Name: "worker-b",
},
Resources: map[string]uint64{
v1.ResourceTartVMs: 3,
},
})
require.NoError(t, err)
_, err = devClient.Workers().Create(ctx, v1.Worker{
Meta: v1.Meta{
Name: "worker-c",
},
Resources: map[string]uint64{
v1.ResourceTartVMs: 3,
},
})
require.NoError(t, err)
require.NoError(t, devClient.VMs().Create(ctx, &v1.VM{
Meta: v1.Meta{
Name: "test-vm-1",
},
Image: "example.com/doesnt/matter:latest",
CPU: 4,
Memory: 8 * 1024,
Status: v1.VMStatusPending,
}))
require.NoError(t, devClient.VMs().Create(ctx, &v1.VM{
Meta: v1.Meta{
Name: "test-vm-2",
},
Image: "example.com/doesnt/matter:latest",
CPU: 4,
Memory: 8 * 1024,
Status: v1.VMStatusPending,
}))
require.NoError(t, devClient.VMs().Create(ctx, &v1.VM{
Meta: v1.Meta{
Name: "test-vm-3",
},
Image: "example.com/doesnt/matter:latest",
CPU: 4,
Memory: 8 * 1024,
Status: v1.VMStatusPending,
}))
}
func ensureAssignment(t *testing.T, devClient *client.Client, vmName string, workerName string) {
require.True(t, wait.Wait(2*time.Minute, func() bool {
vm, err := devClient.VMs().Get(context.Background(), vmName)
require.NoError(t, err)
t.Logf("Waiting for the VM %s to be assigned to a worker", vmName)
return vm.Worker == workerName
}), "VM was %s expected to be assigned to the worker %q, but was assigned to the worker %q",
vmName, workerName)
}

View File

@ -29,9 +29,12 @@ func TestSSHServer(t *testing.T) {
require.NoError(t, err)
// Run the Controller
devClient, devController, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t, []controller.Option{
controller.WithSSHServer(":0", signer, false),
}, nil)
devClient, devController, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t,
false, []controller.Option{
controller.WithSSHServer(":0", signer, false),
},
false, nil,
)
// Create a VM to which we'll connect via Controller's SSH server
err = devClient.VMs().Create(context.Background(), &v1.VM{

View File

@ -1,5 +1,26 @@
package v1
import "fmt"
type SchedulerProfile string
const (
SchedulerProfileOptimizeUtilization SchedulerProfile = "optimize-utilization"
SchedulerProfileDistributeLoad SchedulerProfile = "distribute-load"
)
type ClusterSettings struct {
HostDirPolicies []HostDirPolicy `json:"hostDirPolicies,omitempty"`
HostDirPolicies []HostDirPolicy `json:"hostDirPolicies,omitempty"`
SchedulerProfile SchedulerProfile `json:"schedulerProfile,omitempty"`
}
func NewSchedulerProfile(value string) (SchedulerProfile, error) {
switch value {
case string(SchedulerProfileOptimizeUtilization):
return SchedulerProfileOptimizeUtilization, nil
case string(SchedulerProfileDistributeLoad):
return SchedulerProfileDistributeLoad, nil
default:
return "", fmt.Errorf("unsupported scheduler profile: %q", value)
}
}