From 7fe0414981ea35af66d196a9c5d0a439c6e90e52 Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Thu, 28 Nov 2024 20:07:46 +0400 Subject: [PATCH] "--scheduler-profile" option to allow different orchestration patterns (#224) * "--scheduler-profile" option to allow different orchestration patterns * API(cluster settings): provide a default value for scheduler profile --- internal/command/get/cluster_settings.go | 2 + internal/command/set/cluster_settings.go | 68 +++++---- internal/controller/api_cluster_settings.go | 8 ++ internal/controller/controller.go | 4 +- internal/controller/scheduler/scheduler.go | 38 ++++- internal/controller/scheduler/workerinfo.go | 37 +++++ .../controller/scheduler/workerinfo_test.go | 29 ++++ .../controller/scheduler/workertoresources.go | 26 ---- .../scheduler/workertoresources_test.go | 24 ---- internal/tests/devcontroller/devcontroller.go | 35 +++-- internal/tests/integration_test.go | 4 +- internal/tests/scheduler_profile_test.go | 130 ++++++++++++++++++ internal/tests/sshserver_test.go | 9 +- pkg/resource/v1/cluster_settings.go | 23 +++- 14 files changed, 338 insertions(+), 99 deletions(-) create mode 100644 internal/controller/scheduler/workerinfo.go create mode 100644 internal/controller/scheduler/workerinfo_test.go delete mode 100644 internal/controller/scheduler/workertoresources.go delete mode 100644 internal/controller/scheduler/workertoresources_test.go create mode 100644 internal/tests/scheduler_profile_test.go diff --git a/internal/command/get/cluster_settings.go b/internal/command/get/cluster_settings.go index 2877cc1..d25503e 100644 --- a/internal/command/get/cluster_settings.go +++ b/internal/command/get/cluster_settings.go @@ -41,6 +41,8 @@ func runGetClusterSettings(cmd *cobra.Command, args []string) error { hostDirPoliciesDescription := strings.Join(hostDirPoliciesAsStrings, ",") table.AddRow("hostDir policies", nonEmptyOrNone(hostDirPoliciesDescription)) + table.AddRow("Scheduler profile", clusterSettings.SchedulerProfile) + fmt.Println(table) return nil diff --git a/internal/command/set/cluster_settings.go b/internal/command/set/cluster_settings.go index a521862..4d4dcfc 100644 --- a/internal/command/set/cluster_settings.go +++ b/internal/command/set/cluster_settings.go @@ -11,56 +11,78 @@ import ( var ErrClusterSettingsFailed = errors.New("failed to set cluster settings") var hostDirPoliciesRaw []string +var schedulerProfileRaw string -const hostDirPoliciesFlag = "host-dir-policies" +const ( + hostDirPoliciesFlag = "host-dir-policies" + schedulerProfileFlag = "scheduler-profile" +) func newSetClusterSettingsCommand() *cobra.Command { - command := &cobra.Command{ + cmd := &cobra.Command{ Use: "cluster-settings", Short: "Set cluster settings", RunE: runSetClusterSettings, } - command.PersistentFlags().StringSliceVar(&hostDirPoliciesRaw, hostDirPoliciesFlag, []string{}, + cmd.Flags().StringSliceVar(&hostDirPoliciesRaw, hostDirPoliciesFlag, []string{}, fmt.Sprintf("comma-separated list of hostDir policies containing an allowed path prefix "+ "and an optional \":ro\" modifier to only allow read-only mounts for that path prefix "+ "(for example, --%s=/Users/ci/sources:ro,/tmp)", hostDirPoliciesFlag)) + cmd.Flags().StringVar(&schedulerProfileRaw, schedulerProfileFlag, "", fmt.Sprintf( + `scheduler profile to use: - return command +* --%s=%s — when scheduling a pending VM to a worker, pick the busiest worker that can fit a VM first, +falling back to less busier workers (this is the default behavior when no explicit scheduler profile is set) + +* --%s=%s — when scheduling a pending VM to a worker, pick the least occupied worker that can fit a VM first, +falling back to more busier workers + +`, schedulerProfileFlag, v1.SchedulerProfileOptimizeUtilization, schedulerProfileFlag, + v1.SchedulerProfileDistributeLoad)) + + return cmd } func runSetClusterSettings(cmd *cobra.Command, args []string) error { - // Convert arguments - var hostDirPolicies []v1.HostDirPolicy - - for _, hostDirPolicyRaw := range hostDirPoliciesRaw { - hostDirPolicy, err := v1.NewHostDirPolicyFromString(hostDirPolicyRaw) - if err != nil { - return err - } - - hostDirPolicies = append(hostDirPolicies, hostDirPolicy) - } - - // Check if we need to update anything in the cluster settings - if !cmd.Flag(hostDirPoliciesFlag).Changed { - return fmt.Errorf("%w: you need to specify at least one setting to update", - ErrClusterSettingsFailed) - } - // Update cluster settings client, err := client.New() if err != nil { return err } + var needUpdate bool + clusterSettings, err := client.ClusterSettings().Get(cmd.Context()) if err != nil { return err } if cmd.Flag(hostDirPoliciesFlag).Changed { - clusterSettings.HostDirPolicies = hostDirPolicies + for _, hostDirPolicyRaw := range hostDirPoliciesRaw { + hostDirPolicy, err := v1.NewHostDirPolicyFromString(hostDirPolicyRaw) + if err != nil { + return err + } + + clusterSettings.HostDirPolicies = append(clusterSettings.HostDirPolicies, hostDirPolicy) + } + + needUpdate = true + } + + if cmd.Flag(schedulerProfileFlag).Changed { + clusterSettings.SchedulerProfile, err = v1.NewSchedulerProfile(schedulerProfileRaw) + if err != nil { + return err + } + + needUpdate = true + } + + // Check if we need to update anything in the cluster settings + if !needUpdate { + return fmt.Errorf("%w: you need to specify at least one setting to update", ErrClusterSettingsFailed) } return client.ClusterSettings().Set(cmd.Context(), clusterSettings) diff --git a/internal/controller/api_cluster_settings.go b/internal/controller/api_cluster_settings.go index 129da23..3649b7b 100644 --- a/internal/controller/api_cluster_settings.go +++ b/internal/controller/api_cluster_settings.go @@ -44,6 +44,14 @@ func (controller *Controller) updateClusterSettings(ctx *gin.Context) responder. } } + if clusterSettings.SchedulerProfile == "" { + clusterSettings.SchedulerProfile = v1.SchedulerProfileOptimizeUtilization + } else { + if _, err := v1.NewSchedulerProfile(string(clusterSettings.SchedulerProfile)); err != nil { + return responder.JSON(http.StatusBadRequest, NewErrorResponse("%v", err)) + } + } + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { if err := txn.SetClusterSettings(clusterSettings); err != nil { controller.logger.Errorf("failed to set cluster settings in the DB: %v", err) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 631427b..c62c509 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -350,10 +350,10 @@ func (controller *Controller) initializeMetrics() error { return err } - _, workerToResources := scheduler.ProcessVMs(vms) + _, workerInfos := scheduler.ProcessVMs(vms) for _, worker := range workers { - resourcesUsed := workerToResources.Get(worker.Name) + resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed for key, value := range resourcesUsed { observer.Observe(int64(value), metric.WithAttributes( diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index 01caa34..1748872 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -1,6 +1,7 @@ package scheduler import ( + "cmp" "context" "github.com/cirruslabs/orchard/internal/controller/lifecycle" "github.com/cirruslabs/orchard/internal/controller/notifier" @@ -12,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + "slices" "sort" "time" ) @@ -121,17 +123,41 @@ func (scheduler *Scheduler) schedulingLoopIteration() error { if err != nil { return err } - unscheduledVMs, workerToResources := ProcessVMs(vms) + unscheduledVMs, workerInfos := ProcessVMs(vms) workers, err := txn.ListWorkers() if err != nil { return err } + // Retrieve cluster settings to figure out which scheduler profile to use + clusterSettings, err := txn.GetClusterSettings() + if err != nil { + return err + } + for _, unscheduledVM := range unscheduledVMs { + // Order workers depending on the scheduler profile + switch clusterSettings.SchedulerProfile { + case v1.SchedulerProfileDistributeLoad: + slices.SortFunc(workers, func(a, b v1.Worker) int { + // Sort by the number of running VMs, ascending order + return cmp.Compare(workerInfos[a.Name].NumRunningVMs, + workerInfos[b.Name].NumRunningVMs) + }) + case v1.SchedulerProfileOptimizeUtilization: + fallthrough + default: + slices.SortFunc(workers, func(a, b v1.Worker) int { + // Sort by the number of running VMs, descending order + return cmp.Compare(workerInfos[b.Name].NumRunningVMs, + workerInfos[a.Name].NumRunningVMs) + }) + } + // Find a worker that can run this VM for _, worker := range workers { - resourcesUsed := workerToResources.Get(worker.Name) + resourcesUsed := workerInfos.Get(worker.Name).ResourcesUsed resourcesRemaining := worker.Resources.Subtracted(resourcesUsed) if resourcesRemaining.CanFit(unscheduledVM.Resources) && @@ -149,7 +175,7 @@ func (scheduler *Scheduler) schedulingLoopIteration() error { } affectedWorkers[worker.Name] = true - workerToResources.Add(worker.Name, unscheduledVM.Resources) + workerInfos.AddVM(worker.Name, unscheduledVM.Resources) break } @@ -173,15 +199,15 @@ func (scheduler *Scheduler) schedulingLoopIteration() error { return err } -func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerToResources) { +func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) { var unscheduledVMs []v1.VM - workerToResources := make(WorkerToResources) + workerToResources := make(WorkerInfos) for _, vm := range vms { if vm.Worker == "" { unscheduledVMs = append(unscheduledVMs, vm) } else if !vm.TerminalState() { - workerToResources.Add(vm.Worker, vm.Resources) + workerToResources.AddVM(vm.Worker, vm.Resources) } } diff --git a/internal/controller/scheduler/workerinfo.go b/internal/controller/scheduler/workerinfo.go new file mode 100644 index 0000000..00c608a --- /dev/null +++ b/internal/controller/scheduler/workerinfo.go @@ -0,0 +1,37 @@ +package scheduler + +import v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + +type WorkerInfo struct { + ResourcesUsed v1.Resources + NumRunningVMs int +} + +type WorkerInfos map[string]WorkerInfo + +func (workerInfos WorkerInfos) AddVM(name string, resourcesUsed v1.Resources) { + workerInfo, ok := workerInfos[name] + if !ok { + workerInfo = WorkerInfo{ + ResourcesUsed: v1.Resources{}, + } + } + + workerInfo.ResourcesUsed.Add(resourcesUsed) + workerInfo.NumRunningVMs++ + + workerInfos[name] = workerInfo +} + +func (workerInfos WorkerInfos) Get(name string) WorkerInfo { + workerInfo, ok := workerInfos[name] + if !ok { + workerInfo = WorkerInfo{ + ResourcesUsed: v1.Resources{}, + } + + workerInfos[name] = workerInfo + } + + return workerInfo +} diff --git a/internal/controller/scheduler/workerinfo_test.go b/internal/controller/scheduler/workerinfo_test.go new file mode 100644 index 0000000..4539f1f --- /dev/null +++ b/internal/controller/scheduler/workerinfo_test.go @@ -0,0 +1,29 @@ +package scheduler_test + +import ( + "github.com/cirruslabs/orchard/internal/controller/scheduler" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/stretchr/testify/require" + "testing" +) + +func TestWorkerInfos(t *testing.T) { + workerInfos := make(scheduler.WorkerInfos) + require.Len(t, workerInfos, 0) + + workerInfos.AddVM("worker-name", v1.Resources{ + "tart-vms": 1, + }) + require.Len(t, workerInfos, 1) + + workerInfos.AddVM("worker-name", v1.Resources{ + "tart-vms": 1, + }) + require.Len(t, workerInfos, 1) + require.Equal(t, scheduler.WorkerInfo{ + ResourcesUsed: map[string]uint64{ + "tart-vms": 2, + }, + NumRunningVMs: 2, + }, workerInfos.Get("worker-name")) +} diff --git a/internal/controller/scheduler/workertoresources.go b/internal/controller/scheduler/workertoresources.go deleted file mode 100644 index f794945..0000000 --- a/internal/controller/scheduler/workertoresources.go +++ /dev/null @@ -1,26 +0,0 @@ -package scheduler - -import v1 "github.com/cirruslabs/orchard/pkg/resource/v1" - -type WorkerToResources map[string]v1.Resources - -func (workerToResources WorkerToResources) Add(name string, other v1.Resources) { - workerResources, ok := workerToResources[name] - if !ok { - workerResources = make(v1.Resources) - } - - workerResources.Add(other) - - workerToResources[name] = workerResources -} - -func (workerToResources WorkerToResources) Get(name string) v1.Resources { - workerResources, ok := workerToResources[name] - if !ok { - workerResources = make(v1.Resources) - workerToResources[name] = workerResources - } - - return workerResources -} diff --git a/internal/controller/scheduler/workertoresources_test.go b/internal/controller/scheduler/workertoresources_test.go deleted file mode 100644 index eb04715..0000000 --- a/internal/controller/scheduler/workertoresources_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package scheduler_test - -import ( - "github.com/cirruslabs/orchard/internal/controller/scheduler" - v1 "github.com/cirruslabs/orchard/pkg/resource/v1" - "github.com/stretchr/testify/require" - "testing" -) - -func TestWorkerToResources(t *testing.T) { - workerToResources := make(scheduler.WorkerToResources) - require.Len(t, workerToResources, 0) - - workerToResources.Add("worker-name", v1.Resources{ - "tart-vms": 1, - }) - require.Len(t, workerToResources, 1) - - workerToResources.Add("worker-name", v1.Resources{ - "tart-vms": 1, - }) - require.Len(t, workerToResources, 1) - require.Equal(t, v1.Resources{"tart-vms": 2}, workerToResources.Get("worker-name")) -} diff --git a/internal/tests/devcontroller/devcontroller.go b/internal/tests/devcontroller/devcontroller.go index 37f534f..22f0f2b 100644 --- a/internal/tests/devcontroller/devcontroller.go +++ b/internal/tests/devcontroller/devcontroller.go @@ -14,12 +14,17 @@ import ( ) func StartIntegrationTestEnvironment(t *testing.T) (*client.Client, *controller.Controller, *worker.Worker) { - return StartIntegrationTestEnvironmentWithAdditionalOpts(t, nil, nil) + return StartIntegrationTestEnvironmentWithAdditionalOpts(t, + false, nil, + false, nil, + ) } func StartIntegrationTestEnvironmentWithAdditionalOpts( t *testing.T, + noController bool, additionalControllerOpts []controller.Option, + noWorker bool, additionalWorkerOpts []worker.Option, ) (*client.Client, *controller.Controller, *worker.Worker) { t.Setenv("ORCHARD_HOME", t.TempDir()) @@ -34,19 +39,23 @@ func StartIntegrationTestEnvironmentWithAdditionalOpts( devContext, cancelDevFunc := context.WithCancel(context.Background()) t.Cleanup(cancelDevFunc) - go func() { - err := devController.Run(devContext) - if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) { - t.Errorf("dev controller failed: %v", err) - } - }() + if !noController { + go func() { + err := devController.Run(devContext) + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) { + t.Errorf("dev controller failed: %v", err) + } + }() + } - go func() { - err := devWorker.Run(devContext) - if err != nil && !errors.Is(err, context.Canceled) { - t.Errorf("dev worker failed: %v", err) - } - }() + if !noWorker { + go func() { + err := devWorker.Run(devContext) + if err != nil && !errors.Is(err, context.Canceled) { + t.Errorf("dev worker failed: %v", err) + } + }() + } time.Sleep(5 * time.Second) diff --git a/internal/tests/integration_test.go b/internal/tests/integration_test.go index c41a881..0c5874a 100644 --- a/internal/tests/integration_test.go +++ b/internal/tests/integration_test.go @@ -276,7 +276,9 @@ func TestSchedulerHealthCheckingOfflineWorker(t *testing.T) { ctx := context.Background() devClient, _, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t, - []controller.Option{controller.WithWorkerOfflineTimeout(1 * time.Minute)}, nil) + false, []controller.Option{controller.WithWorkerOfflineTimeout(1 * time.Minute)}, + false, nil, + ) const ( dummyWorkerName = "dummy-worker" diff --git a/internal/tests/scheduler_profile_test.go b/internal/tests/scheduler_profile_test.go new file mode 100644 index 0000000..9bf70ad --- /dev/null +++ b/internal/tests/scheduler_profile_test.go @@ -0,0 +1,130 @@ +package tests_test + +import ( + "context" + "github.com/cirruslabs/orchard/internal/tests/devcontroller" + "github.com/cirruslabs/orchard/internal/tests/wait" + "github.com/cirruslabs/orchard/pkg/client" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestSchedulerProfileOptimizeUtilization(t *testing.T) { + ctx := context.Background() + + // Create a development environment + devClient, _, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t, + false, nil, + true, nil, + ) + + // Change the scheduler to pack as many VMs as possible on each worker + clusterSettings, err := devClient.ClusterSettings().Get(ctx) + require.NoError(t, err) + clusterSettings.SchedulerProfile = v1.SchedulerProfileOptimizeUtilization + require.NoError(t, devClient.ClusterSettings().Set(ctx, clusterSettings)) + + // Create three workers and three VMs + threeWorkersThreeVMsScenario(t, devClient) + + ensureAssignment(t, devClient, "test-vm-1", "worker-a") + ensureAssignment(t, devClient, "test-vm-2", "worker-a") + ensureAssignment(t, devClient, "test-vm-3", "worker-a") +} + +func TestSchedulerProfileDistributeLoad(t *testing.T) { + ctx := context.Background() + + // Create a development environment + devClient, _, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t, + false, nil, + true, nil, + ) + + // Change the scheduler to spread VMs as much as possible between workers + clusterSettings, err := devClient.ClusterSettings().Get(ctx) + require.NoError(t, err) + clusterSettings.SchedulerProfile = v1.SchedulerProfileDistributeLoad + require.NoError(t, devClient.ClusterSettings().Set(ctx, clusterSettings)) + + // Create three workers and three VMs + threeWorkersThreeVMsScenario(t, devClient) + + ensureAssignment(t, devClient, "test-vm-1", "worker-a") + ensureAssignment(t, devClient, "test-vm-2", "worker-b") + ensureAssignment(t, devClient, "test-vm-3", "worker-c") +} + +func threeWorkersThreeVMsScenario(t *testing.T, devClient *client.Client) { + ctx := context.Background() + + _, err := devClient.Workers().Create(ctx, v1.Worker{ + Meta: v1.Meta{ + Name: "worker-a", + }, + Resources: map[string]uint64{ + v1.ResourceTartVMs: 3, + }, + }) + require.NoError(t, err) + _, err = devClient.Workers().Create(ctx, v1.Worker{ + Meta: v1.Meta{ + Name: "worker-b", + }, + Resources: map[string]uint64{ + v1.ResourceTartVMs: 3, + }, + }) + require.NoError(t, err) + _, err = devClient.Workers().Create(ctx, v1.Worker{ + Meta: v1.Meta{ + Name: "worker-c", + }, + Resources: map[string]uint64{ + v1.ResourceTartVMs: 3, + }, + }) + require.NoError(t, err) + + require.NoError(t, devClient.VMs().Create(ctx, &v1.VM{ + Meta: v1.Meta{ + Name: "test-vm-1", + }, + Image: "example.com/doesnt/matter:latest", + CPU: 4, + Memory: 8 * 1024, + Status: v1.VMStatusPending, + })) + require.NoError(t, devClient.VMs().Create(ctx, &v1.VM{ + Meta: v1.Meta{ + Name: "test-vm-2", + }, + Image: "example.com/doesnt/matter:latest", + CPU: 4, + Memory: 8 * 1024, + Status: v1.VMStatusPending, + })) + require.NoError(t, devClient.VMs().Create(ctx, &v1.VM{ + Meta: v1.Meta{ + Name: "test-vm-3", + }, + Image: "example.com/doesnt/matter:latest", + CPU: 4, + Memory: 8 * 1024, + Status: v1.VMStatusPending, + })) +} + +func ensureAssignment(t *testing.T, devClient *client.Client, vmName string, workerName string) { + require.True(t, wait.Wait(2*time.Minute, func() bool { + vm, err := devClient.VMs().Get(context.Background(), vmName) + require.NoError(t, err) + + t.Logf("Waiting for the VM %s to be assigned to a worker", vmName) + + return vm.Worker == workerName + }), "VM was %s expected to be assigned to the worker %q, but was assigned to the worker %q", + vmName, workerName) +} diff --git a/internal/tests/sshserver_test.go b/internal/tests/sshserver_test.go index 9e6bb45..41c3a01 100644 --- a/internal/tests/sshserver_test.go +++ b/internal/tests/sshserver_test.go @@ -29,9 +29,12 @@ func TestSSHServer(t *testing.T) { require.NoError(t, err) // Run the Controller - devClient, devController, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t, []controller.Option{ - controller.WithSSHServer(":0", signer, false), - }, nil) + devClient, devController, _ := devcontroller.StartIntegrationTestEnvironmentWithAdditionalOpts(t, + false, []controller.Option{ + controller.WithSSHServer(":0", signer, false), + }, + false, nil, + ) // Create a VM to which we'll connect via Controller's SSH server err = devClient.VMs().Create(context.Background(), &v1.VM{ diff --git a/pkg/resource/v1/cluster_settings.go b/pkg/resource/v1/cluster_settings.go index 47b011f..ddde2c8 100644 --- a/pkg/resource/v1/cluster_settings.go +++ b/pkg/resource/v1/cluster_settings.go @@ -1,5 +1,26 @@ package v1 +import "fmt" + +type SchedulerProfile string + +const ( + SchedulerProfileOptimizeUtilization SchedulerProfile = "optimize-utilization" + SchedulerProfileDistributeLoad SchedulerProfile = "distribute-load" +) + type ClusterSettings struct { - HostDirPolicies []HostDirPolicy `json:"hostDirPolicies,omitempty"` + HostDirPolicies []HostDirPolicy `json:"hostDirPolicies,omitempty"` + SchedulerProfile SchedulerProfile `json:"schedulerProfile,omitempty"` +} + +func NewSchedulerProfile(value string) (SchedulerProfile, error) { + switch value { + case string(SchedulerProfileOptimizeUtilization): + return SchedulerProfileOptimizeUtilization, nil + case string(SchedulerProfileDistributeLoad): + return SchedulerProfileDistributeLoad, nil + default: + return "", fmt.Errorf("unsupported scheduler profile: %q", value) + } }