From bafcf6fac2e00a98c76a9aef71e35ece55751a73 Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Thu, 6 Nov 2025 20:56:31 +0400 Subject: [PATCH] Simplify state reconciliation and support changing Softnet settings (#364) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Simplify state reconciliation and support changing Softnet settings * Remove unused "updateFunc" parameter from syncOnDiskVMs() * Don't take an address of a loop variable * ensure → ensures * updateVMState(): don't forget to update VMState * Introduce TestSpecUpdateSoftnet integration test * Update OpenAPI specification to include generation/observedGeneration --- api/openapi.yaml | 45 +++-- go.mod | 2 + go.sum | 2 + internal/command/create/vm.go | 34 ++-- internal/controller/api.go | 19 +- internal/controller/api_controller.go | 4 +- internal/controller/api_vms.go | 60 ++++++- internal/tests/spec_update_test.go | 140 +++++++++++++++ internal/worker/fsm.go | 58 ++++++ internal/worker/fsm_test.go | 27 +++ internal/worker/vmmanager/vm.go | 105 ++++++----- internal/worker/worker.go | 247 ++++++++++++++++++-------- internal/worker/worker_test.go | 37 ++++ pkg/client/vms.go | 14 +- pkg/resource/v1/cmp_test.go | 14 ++ pkg/resource/v1/v1.go | 48 +++-- 16 files changed, 690 insertions(+), 166 deletions(-) create mode 100644 internal/tests/spec_update_test.go create mode 100644 internal/worker/fsm.go create mode 100644 internal/worker/fsm_test.go create mode 100644 internal/worker/worker_test.go create mode 100644 pkg/resource/v1/cmp_test.go diff --git a/api/openapi.yaml b/api/openapi.yaml index 64c9b97..765b3c4 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -442,12 +442,27 @@ components: VM: title: Virtual Machine type: object + allOf: + - $ref: '#/components/schemas/VMMeta' + - $ref: '#/components/schemas/VMSpec' + - $ref: '#/components/schemas/VMState' + VMMeta: + title: Virtual Machine Metadata + type: object properties: name: type: string description: VM name example: macos-tahoe-base readOnly: true + generation: + type: number + description: Incremented by the controller each time a VM's specification changes + readOnly: true + VMSpec: + title: Virtual Machine Specification + type: object + properties: image: type: string description: VM image for this VM @@ -517,19 +532,6 @@ components: type: boolean description: Enable nested virtualization default: false - status: - type: string - description: VM status - enum: [pending, running, failed] - readOnly: true - status_message: - type: string - description: VM status message - readOnly: true - worker: - type: string - description: Worker on which the VM was assigned to - readOnly: true username: type: string description: SSH username to use when connecting to a VM @@ -592,6 +594,23 @@ components: - path: /path/on/host/to/sources ro: true - path: /path/on/host/to/builds + VMState: + title: Virtual Machine State + type: object + properties: + status: + type: string + description: VM status + enum: [ pending, running, failed ] + status_message: + type: string + description: VM status message + worker: + type: string + description: Worker on which the VM was assigned to + observedGeneration: + type: number + description: Corresponds to the `Generation` value on which the worker had acted upon Events: title: Events type: object diff --git a/go.mod b/go.mod index 67ddcf1..e93f97c 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/go-openapi/runtime v0.29.0 github.com/gofrs/flock v0.13.0 github.com/golang/protobuf v1.5.4 + github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 github.com/gosuri/uitable v0.0.4 github.com/hashicorp/go-multierror v1.1.1 @@ -26,6 +27,7 @@ require ( github.com/prometheus/client_golang v1.23.2 github.com/pterm/pterm v0.12.82 github.com/samber/lo v1.52.0 + github.com/samber/mo v1.16.0 github.com/sethvargo/go-password v0.3.1 github.com/shirou/gopsutil/v4 v4.25.9 github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 diff --git a/go.sum b/go.sum index ba9541a..91f2ee0 100644 --- a/go.sum +++ b/go.sum @@ -298,6 +298,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw= github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= +github.com/samber/mo v1.16.0 h1:qpEPCI63ou6wXlsNDMLE0IIN8A+devbGX/K1xdgr4b4= +github.com/samber/mo v1.16.0/go.mod h1:DlgzJ4SYhOh41nP1L9kh9rDNERuf8IqWSAs+gj2Vxag= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sethvargo/go-password v0.3.1 h1:WqrLTjo7X6AcVYfC6R7GtSyuUQR9hGyAj/f1PYQZCJU= diff --git a/internal/command/create/vm.go b/internal/command/create/vm.go index 49eff4e..c35126a 100644 --- a/internal/command/create/vm.go +++ b/internal/command/create/vm.go @@ -109,22 +109,24 @@ func runCreateVM(cmd *cobra.Command, args []string) error { Meta: v1.Meta{ Name: name, }, - Image: image, - CPU: cpu, - Memory: memory, - DiskSize: diskSize, - NetSoftnetDeprecated: netSoftnet, - NetSoftnet: netSoftnet, - NetSoftnetAllow: netSoftnetAllow, - NetSoftnetBlock: netSoftnetBlock, - NetBridged: netBridged, - Headless: headless, - Nested: nested, - Username: username, - Password: password, - RandomSerial: randomSerial, - Labels: labels, - HostDirs: hostDirs, + Image: image, + CPU: cpu, + Memory: memory, + DiskSize: diskSize, + VMSpec: v1.VMSpec{ + NetSoftnetDeprecated: netSoftnet, + NetSoftnet: netSoftnet, + NetSoftnetAllow: netSoftnetAllow, + NetSoftnetBlock: netSoftnetBlock, + }, + NetBridged: netBridged, + Headless: headless, + Nested: nested, + Username: username, + Password: password, + RandomSerial: randomSerial, + Labels: labels, + HostDirs: hostDirs, } // Convert resources diff --git a/internal/controller/api.go b/internal/controller/api.go index 99070e8..b8697ca 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -142,7 +142,24 @@ func (controller *Controller) initAPI() *gin.Engine { controller.createVM(c).Respond(c) }) v1.PUT("/vms/:name", func(c *gin.Context) { - controller.updateVM(c).Respond(c) + if strings.HasPrefix(c.GetHeader("User-Agent"), "Orchard/0") { + // Backward compatibility for older Orchard Workers that still + // use the PUT /vms/{name} API endpoint to update a VM status + // + // Note that we include the "0" here to avoid targeting users + // of the github.com/cirruslabs/orchard/pkg/client package. For + // them, the UA string should normally be "Orchard/unknown-unknown". + // + // After some months/years we can remove this workaround and at + // the very worst the workers simply won't progress with the VMs + // assigned to them. An upgrade to a newer version will fix that. + controller.updateVMState(c).Respond(c) + } else { + controller.updateVMSpec(c).Respond(c) + } + }) + v1.PUT("/vms/:name/state", func(c *gin.Context) { + controller.updateVMState(c).Respond(c) }) v1.GET("/vms/:name", func(c *gin.Context) { controller.getVM(c).Respond(c) diff --git a/internal/controller/api_controller.go b/internal/controller/api_controller.go index 1fdc2cc..b67ca2a 100644 --- a/internal/controller/api_controller.go +++ b/internal/controller/api_controller.go @@ -1,11 +1,12 @@ package controller import ( + "net/http" + "github.com/cirruslabs/orchard/internal/responder" "github.com/cirruslabs/orchard/internal/version" v1pkg "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/gin-gonic/gin" - "net/http" ) func (controller *Controller) controllerInfo(ctx *gin.Context) responder.Responder { @@ -17,6 +18,7 @@ func (controller *Controller) controllerInfo(ctx *gin.Context) responder.Respond capabilities := []v1pkg.ControllerCapability{ v1pkg.ControllerCapabilityRPCV1, + v1pkg.ControllerCapabilityVMStateEndpoint, } if controller.experimentalRPCV2 { diff --git a/internal/controller/api_vms.go b/internal/controller/api_vms.go index c277d41..bc46bce 100644 --- a/internal/controller/api_vms.go +++ b/internal/controller/api_vms.go @@ -12,6 +12,7 @@ import ( "github.com/cirruslabs/orchard/internal/simplename" "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/gin-gonic/gin" + "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/samber/lo" ) @@ -108,7 +109,7 @@ func (controller *Controller) createVM(ctx *gin.Context) responder.Responder { return response } -func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder { +func (controller *Controller) updateVMSpec(ctx *gin.Context) responder.Responder { if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil { return responder } @@ -119,12 +120,60 @@ func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder { return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided")) } - if userVM.Name == "" { - return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("VM name is empty")) - } + name := ctx.Param("name") return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { - dbVM, err := txn.GetVM(userVM.Name) + dbVM, err := txn.GetVM(name) + if err != nil { + return responder.Error(err) + } + + if dbVM.TerminalState() { + return responder.JSON(http.StatusPreconditionFailed, + NewErrorResponse("cannot update VM in a terminal state")) + } + + // Softnet-specific logic: automatically enable Softnet when NetSoftnetAllow or NetSoftnetBlock are set + // and propagate deprecated and non-deprecated boolean fields into each other + if userVM.NetSoftnetDeprecated || userVM.NetSoftnet || len(userVM.NetSoftnetAllow) != 0 || len(userVM.NetSoftnetBlock) != 0 { + userVM.NetSoftnetDeprecated = true + userVM.NetSoftnet = true + } + + if cmp.Equal(dbVM.VMSpec, userVM.VMSpec) { + // Nothing was changed + return responder.JSON(http.StatusOK, dbVM) + } + + // VM specification was changed + dbVM.VMSpec = userVM.VMSpec + dbVM.Generation++ + + if err := txn.SetVM(*dbVM); err != nil { + controller.logger.Errorf("failed to update VM in the DB: %v", err) + + return responder.Code(http.StatusInternalServerError) + } + + return responder.JSON(http.StatusOK, dbVM) + }) +} + +func (controller *Controller) updateVMState(ctx *gin.Context) responder.Responder { + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil { + return responder + } + + var userVM v1.VM + + if err := ctx.ShouldBindJSON(&userVM); err != nil { + return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided")) + } + + name := ctx.Param("name") + + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { + dbVM, err := txn.GetVM(name) if err != nil { return responder.Error(err) } @@ -141,6 +190,7 @@ func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder { dbVM.Status = userVM.Status dbVM.StatusMessage = userVM.StatusMessage dbVM.ImageFQN = userVM.ImageFQN + dbVM.VMState = userVM.VMState if err := txn.SetVM(*dbVM); err != nil { controller.logger.Errorf("failed to update VM in the DB: %v", err) diff --git a/internal/tests/spec_update_test.go b/internal/tests/spec_update_test.go new file mode 100644 index 0000000..ebee72f --- /dev/null +++ b/internal/tests/spec_update_test.go @@ -0,0 +1,140 @@ +package tests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cirruslabs/orchard/internal/imageconstant" + "github.com/cirruslabs/orchard/internal/tests/devcontroller" + "github.com/cirruslabs/orchard/internal/tests/wait" + "github.com/cirruslabs/orchard/internal/worker/ondiskname" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/samber/lo" + "github.com/shirou/gopsutil/v4/process" + "github.com/stretchr/testify/require" +) + +func TestSpecUpdateSoftnet(t *testing.T) { + devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t) + + // Create a VM + vmName := "test" + + err := devClient.VMs().Create(t.Context(), &v1.VM{ + Meta: v1.Meta{ + Name: vmName, + }, + Image: imageconstant.DefaultMacosImage, + CPU: 4, + Memory: 8 * 1024, + Headless: true, + Status: v1.VMStatusPending, + }) + require.NoError(t, err) + + // Wait for the VM to start + var vm *v1.VM + + require.True(t, wait.Wait(2*time.Minute, func() bool { + vm, err = devClient.VMs().Get(context.Background(), vmName) + require.NoError(t, err) + + t.Logf("Waiting for the VM to start. Current status: %s", vm.Status) + + return vm.Status == v1.VMStatusRunning + }), "failed to start a VM") + + // Ensure that Softnet is not enabled for a VM + tartVMName := ondiskname.New(vmName, vm.UID, vm.RestartCount).String() + + tartRunCmdline, err := tartRunProcessCmdline(tartVMName) + require.NoError(t, err) + require.NotContains(t, tartRunCmdline, "--net-softnet") + require.NotContains(t, tartRunCmdline, "--net-softnet-allow") + require.NotContains(t, tartRunCmdline, "--net-softnet-block") + + // Update the VM's specification and enable Softnet + vm.NetSoftnetAllow = []string{"10.0.0.0/16"} + vm.NetSoftnetBlock = []string{"0.0.0.0/0"} + + vm, err = devClient.VMs().Update(t.Context(), *vm) + require.NoError(t, err) + require.EqualValues(t, 1, vm.Generation) + require.EqualValues(t, 0, vm.ObservedGeneration) + + require.True(t, wait.Wait(30*time.Second, func() bool { + vm, err = devClient.VMs().Get(context.Background(), vmName) + require.NoError(t, err) + + t.Logf("Waiting for the VM's observed generation to be updated...") + + return vm.ObservedGeneration == 1 + }), "failed to update a VM") + + tartRunCmdline, err = tartRunProcessCmdline(tartVMName) + require.NoError(t, err) + require.Contains(t, tartRunCmdline, "--net-softnet") + require.True(t, sliceContainsAnotherSlice(tartRunCmdline, []string{"--net-softnet-allow", "10.0.0.0/16"})) + require.True(t, sliceContainsAnotherSlice(tartRunCmdline, []string{"--net-softnet-block", "0.0.0.0/0"})) +} + +func tartRunProcessCmdline(vmName string) ([]string, error) { + processes, err := process.Processes() + if err != nil { + return nil, err + } + + for _, process := range processes { + name, err := process.Name() + if err != nil { + // On macOS, process.Name() returns "invalid argument" for most + // of the processes likely due to permissions, so just ignore it + continue + } + + if name != "tart" { + continue + } + + cmdline, err := process.CmdlineSlice() + if err != nil { + return nil, err + } + + if len(cmdline) < 3 { + continue + } + + if cmdline[1] != "run" { + continue + } + + if lo.Contains(cmdline[2:], vmName) { + return cmdline, nil + } + } + + return nil, fmt.Errorf("failed to find a \"tart run\" process for VM %q", vmName) +} + +func sliceContainsAnotherSlice(haystack []string, needle []string) bool { + if len(needle) == 0 { + return true + } + + var needleIdx int + + for _, haystackItem := range haystack { + if haystackItem == needle[needleIdx] { + needleIdx++ + + if needleIdx == len(needle) { + return true + } + } + } + + return false +} diff --git a/internal/worker/fsm.go b/internal/worker/fsm.go new file mode 100644 index 0000000..34f90ab --- /dev/null +++ b/internal/worker/fsm.go @@ -0,0 +1,58 @@ +package worker + +import ( + "fmt" + + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/samber/mo" +) + +type Action string + +const ( + ActionIgnore Action = "ignore" + ActionCreate Action = "create" + ActionMonitorPending Action = "monitor-pending" + ActionReportRunning Action = "report-running" + ActionMonitorRunning Action = "monitor-running" + ActionStop Action = "stop" + ActionFail Action = "fail" + ActionLostTrack Action = "lost-track" + ActionImpossible Action = "impossible" + ActionDelete Action = "delete" +) + +var transitions = map[mo.Option[v1.VMStatus]]map[mo.Option[v1.VMStatus]]Action{ + mo.None[v1.VMStatus](): { + mo.None[v1.VMStatus](): ActionIgnore, + mo.Some(v1.VMStatusPending): ActionDelete, + mo.Some(v1.VMStatusRunning): ActionDelete, + mo.Some(v1.VMStatusFailed): ActionDelete, + }, + mo.Some(v1.VMStatusPending): { + mo.None[v1.VMStatus](): ActionCreate, + mo.Some(v1.VMStatusPending): ActionMonitorPending, + mo.Some(v1.VMStatusRunning): ActionReportRunning, + mo.Some(v1.VMStatusFailed): ActionFail, + }, + mo.Some(v1.VMStatusRunning): { + mo.None[v1.VMStatus](): ActionLostTrack, + mo.Some(v1.VMStatusPending): ActionImpossible, + mo.Some(v1.VMStatusRunning): ActionMonitorRunning, + mo.Some(v1.VMStatusFailed): ActionFail, + }, + mo.Some(v1.VMStatusFailed): { + mo.None[v1.VMStatus](): ActionIgnore, + mo.Some(v1.VMStatusPending): ActionStop, + mo.Some(v1.VMStatusRunning): ActionStop, + mo.Some(v1.VMStatusFailed): ActionIgnore, + }, +} + +func optionToString[T any](option mo.Option[T]) string { + if option.IsNone() { + return "None" + } + + return fmt.Sprintf("Some(%v)", option.MustGet()) +} diff --git a/internal/worker/fsm_test.go b/internal/worker/fsm_test.go new file mode 100644 index 0000000..6c5415f --- /dev/null +++ b/internal/worker/fsm_test.go @@ -0,0 +1,27 @@ +package worker + +import ( + "testing" + + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/samber/mo" + "github.com/stretchr/testify/require" +) + +// TestExplicitStateTransitions ensures that all state transitions +// yield a defined action (something other than ActionUndefined). +func TestExplicitStateTransitions(t *testing.T) { + possibleStates := []mo.Option[v1.VMStatus]{ + mo.None[v1.VMStatus](), + mo.Some(v1.VMStatusPending), + mo.Some(v1.VMStatusRunning), + mo.Some(v1.VMStatusFailed), + } + + for _, remote := range possibleStates { + for _, local := range possibleStates { + require.Positivef(t, transitions[remote][local], "state transition %s -> %s is not defined", + optionToString(remote), optionToString(local)) + } + } +} diff --git a/internal/worker/vmmanager/vm.go b/internal/worker/vmmanager/vm.go index e0b936b..37ffc3d 100644 --- a/internal/worker/vmmanager/vm.go +++ b/internal/worker/vmmanager/vm.go @@ -46,8 +46,8 @@ type VM struct { // Image FQN feature, see https://github.com/cirruslabs/orchard/issues/164 imageFQN atomic.Pointer[string] - status atomic.Pointer[string] - err atomic.Pointer[error] + statusMessage atomic.Pointer[string] + err atomic.Pointer[error] ctx context.Context cancel context.CancelFunc @@ -89,7 +89,7 @@ func NewVM( defer vm.wg.Done() if vmResource.ImagePullPolicy == v1.ImagePullPolicyAlways { - vm.setStatus("pulling VM image...") + vm.setStatusMessage("pulling VM image...") pullStartedAt := time.Now() @@ -123,38 +123,9 @@ func NewVM( } vm.cloned.Store(true) - - // Launch the startup script goroutine as close as possible - // to the VM startup (below) to avoid "tart ip" timing out - if vm.Resource.StartupScript != nil { - vm.setStatus("VM started, running startup script...") - - go vm.runScript(vm.Resource.StartupScript, eventStreamer) - } else { - vm.setStatus("VM started") - } - vm.started.Store(true) - if err := vm.run(vm.ctx); err != nil { - select { - case <-vm.ctx.Done(): - // Do not return an error because it's the user's intent to cancel this VM - default: - vm.setErr(fmt.Errorf("%w: %v", ErrVMFailed, err)) - } - - return - } - - select { - case <-vm.ctx.Done(): - // Do not return an error because it's the user's intent to cancel this VM - default: - if !vm.stopping.Load() { - vm.setErr(fmt.Errorf("%w: VM exited unexpectedly", ErrVMFailed)) - } - } + vm.run(vm.ctx, eventStreamer) }() return vm @@ -176,8 +147,20 @@ func (vm *VM) id() string { return vm.onDiskName.String() } -func (vm *VM) Status() string { - status := vm.status.Load() +func (vm *VM) Status() v1.VMStatus { + if vm.Err() != nil { + return v1.VMStatusFailed + } + + if vm.Started() { + return v1.VMStatusRunning + } + + return v1.VMStatusPending +} + +func (vm *VM) StatusMessage() string { + status := vm.statusMessage.Load() if status != nil { return *status @@ -186,9 +169,9 @@ func (vm *VM) Status() string { return "" } -func (vm *VM) setStatus(status string) { +func (vm *VM) setStatusMessage(status string) { vm.logger.Debugf(status) - vm.status.Store(&status) + vm.statusMessage.Store(&status) } func (vm *VM) Err() error { @@ -206,7 +189,7 @@ func (vm *VM) setErr(err error) { } func (vm *VM) cloneAndConfigure(ctx context.Context) error { - vm.setStatus("cloning VM...") + vm.setStatusMessage("cloning VM...") _, _, err := tart.Tart(ctx, vm.logger, "clone", vm.Resource.Image, vm.id()) if err != nil { @@ -221,7 +204,7 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error { } // Set memory - vm.setStatus("configuring VM...") + vm.setStatusMessage("configuring VM...") memory := vm.Resource.AssignedMemory @@ -333,7 +316,17 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error { return nil } -func (vm *VM) run(ctx context.Context) error { +func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) { + // Launch the startup script goroutine as close as possible + // to the VM startup (below) to avoid "tart ip" timing out + if vm.Resource.StartupScript != nil { + vm.setStatusMessage("VM started, running startup script...") + + go vm.runScript(vm.Resource.StartupScript, eventStreamer) + } else { + vm.setStatusMessage("VM started") + } + var runArgs = []string{"run"} if vm.Resource.NetSoftnetDeprecated || vm.Resource.NetSoftnet { @@ -364,10 +357,24 @@ func (vm *VM) run(ctx context.Context) error { runArgs = append(runArgs, vm.id()) _, _, err := tart.Tart(ctx, vm.logger, runArgs...) if err != nil { - return err + select { + case <-vm.ctx.Done(): + // Do not return an error because it's the user's intent to cancel this VM + default: + vm.setErr(fmt.Errorf("%w: %v", ErrVMFailed, err)) + } + + return } - return nil + select { + case <-vm.ctx.Done(): + // Do not return an error because it's the user's intent to cancel this VM + default: + if !vm.stopping.Load() { + vm.setErr(fmt.Errorf("%w: VM exited unexpectedly", ErrVMFailed)) + } + } } func (vm *VM) IP(ctx context.Context) (string, error) { @@ -401,6 +408,7 @@ func (vm *VM) Stop() { vm.logger.Debugf("stopping VM") vm.stopping.Store(true) + defer vm.stopping.Store(false) // Try to gracefully terminate the VM _, _, _ = tart.Tart(context.Background(), zap.NewNop().Sugar(), "stop", "--timeout", "5", vm.id()) @@ -412,6 +420,19 @@ func (vm *VM) Stop() { vm.logger.Debugf("VM stopped") } +func (vm *VM) Reboot(eventStreamer *client.EventStreamer) { + vm.Stop() + + vm.ctx, vm.cancel = context.WithCancel(context.Background()) + vm.wg.Add(1) + + go func() { + defer vm.wg.Done() + + vm.run(vm.ctx, eventStreamer) + }() +} + func (vm *VM) Delete() error { if !vm.cloned.Load() { return nil diff --git a/internal/worker/worker.go b/internal/worker/worker.go index f019920..138f08e 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "slices" "time" "github.com/avast/retry-go/v4" @@ -18,8 +19,11 @@ import ( "github.com/cirruslabs/orchard/pkg/client" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/cirruslabs/orchard/rpc" + mapset "github.com/deckarep/golang-set/v2" "github.com/dustin/go-humanize" "github.com/hashicorp/go-multierror" + "github.com/samber/lo" + "github.com/samber/mo" "github.com/shirou/gopsutil/v4/cpu" "github.com/shirou/gopsutil/v4/mem" "go.opentelemetry.io/otel/metric" @@ -191,6 +195,13 @@ func (worker *Worker) runNewSession(ctx context.Context) error { return nil } + // Backward compatibility with for older Orchard Controllers + updateFunc := worker.client.VMs().UpdateState + + if !info.Capabilities.Has(v1.ControllerCapabilityVMStateEndpoint) { + updateFunc = worker.client.VMs().Update + } + for { if err := worker.updateWorker(ctx); err != nil { worker.logger.Errorf("failed to update worker resource: %v", err) @@ -198,7 +209,7 @@ func (worker *Worker) runNewSession(ctx context.Context) error { return nil } - if err := worker.syncVMs(subCtx); err != nil { + if err := worker.syncVMs(subCtx, updateFunc); err != nil { worker.logger.Warnf("failed to sync VMs: %v", err) return nil @@ -260,26 +271,144 @@ func (worker *Worker) updateWorker(ctx context.Context) error { } //nolint:nestif,gocognit // nested "if" and cognitive complexity is tolerable for now -func (worker *Worker) syncVMs(ctx context.Context) error { +func (worker *Worker) syncVMs(ctx context.Context, updateVM func(context.Context, v1.VM) (*v1.VM, error)) error { + allKeys := mapset.NewSet[ondiskname.OnDiskName]() + remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name) if err != nil { return err } - remoteVMsIndex := map[ondiskname.OnDiskName]v1.VM{} + remoteVMsIndex := map[ondiskname.OnDiskName]*v1.VM{} for _, remoteVM := range remoteVMs { - remoteVMsIndex[ondiskname.NewFromResource(remoteVM)] = remoteVM + onDiskName := ondiskname.NewFromResource(remoteVM) + allKeys.Add(onDiskName) + // Can't take an address of a loop variable + remoteVMCopy := remoteVM + remoteVMsIndex[onDiskName] = &remoteVMCopy + } + + localVMsIndex := map[ondiskname.OnDiskName]*vmmanager.VM{} + for _, vm := range worker.vmm.List() { + onDiskName := vm.OnDiskName() + allKeys.Add(onDiskName) + localVMsIndex[onDiskName] = vm } worker.logger.Infof("syncing %d local VMs against %d remote VMs...", - worker.vmm.Len(), len(remoteVMsIndex)) + len(localVMsIndex), len(remoteVMsIndex)) - // It's important to check the remote VMs against local ones first - // to stop the failed VMs before we start the new VMs, otherwise we - // risk violating the resource constraints (e.g. a maximum of 2 VMs - // per host) - for _, vm := range worker.vmm.List() { - remoteVM, ok := remoteVMsIndex[vm.OnDiskName()] - if !ok { + var pairs []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM] + + for onDiskName := range allKeys.Iter() { + vmResource := remoteVMsIndex[onDiskName] + vm := localVMsIndex[onDiskName] + + pairs = append(pairs, lo.T3(onDiskName, vmResource, vm)) + } + + // It's important to process the remote VMs in failed state + // and local VMs that ceased to exist remotely first, otherwise + // we risk violating the scheduler resource assumptions + sortNonExistentAndFailedFirst(pairs) + + for _, tuple := range pairs { + onDiskName, vmResource, vm := lo.Unpack3(tuple) + + remoteState := mo.None[v1.VMStatus]() + if vmResource != nil { + remoteState = mo.Some(vmResource.Status) + } + + localState := mo.None[v1.VMStatus]() + if vm != nil { + localState = mo.Some(vm.Status()) + } + + action := transitions[remoteState][localState] + + worker.logger.Debugf("processing VM: %s, remote: %v, local: %v, action: %v\n", onDiskName, + optionToString(remoteState), optionToString(localState), action) + + switch action { + case ActionCreate: + // Remote VM was created, but not the local VM + worker.createVM(onDiskName, *vmResource) + case ActionMonitorPending: + if vmResource.StatusMessage != vm.StatusMessage() { + vmResource.StatusMessage = vm.StatusMessage() + + if _, err := updateVM(ctx, *vmResource); err != nil { + return err + } + } + case ActionReportRunning: + // Remote VM was created, and the local VM too, + // check if the local VM had already started + // and update the remote VM as accordingly + + // Image FQN feature, see https://github.com/cirruslabs/orchard/issues/164 + if imageFQN := vm.ImageFQN(); imageFQN != nil { + vmResource.ImageFQN = *imageFQN + } + + // Mark the remote VM as started + vmResource.Status = v1.VMStatusRunning + vmResource.StatusMessage = vm.StatusMessage() + + if _, err := updateVM(ctx, *vmResource); err != nil { + return err + } + case ActionMonitorRunning: + if vmResource.StatusMessage != vm.StatusMessage() { + vmResource.StatusMessage = vm.StatusMessage() + + if _, err := updateVM(ctx, *vmResource); err != nil { + return err + } + } + + if vmResource.Generation != vm.Resource.Generation { + // Something changed, reboot the VM for the changes to take effect + vm.Resource = *vmResource + + eventStreamer := worker.client.VMs().StreamEvents(vmResource.Name) + + vm.Reboot(eventStreamer) + + vmResource.ObservedGeneration = vm.Resource.Generation + + if _, err := updateVM(ctx, *vmResource); err != nil { + return err + } + } + case ActionStop: + // VM has failed on the remote side, stop it locally to prevent incorrect + // worker's resources calculation in the Controller's scheduler + vm.Stop() + case ActionFail, ActionLostTrack, ActionImpossible: + // VM has failed on the local side, stop it before reporting as failed to prevent incorrect + // worker's resources calculation in the Controller's scheduler + if vm != nil { + vm.Stop() + } + + var statusMessage string + + switch action { + case ActionFail: + statusMessage = vm.Err().Error() + case ActionLostTrack: + statusMessage = "Worker lost track of VM" + case ActionImpossible: + statusMessage = "Encountered an impossible transition" + } + + vmResource.Status = v1.VMStatusFailed + vmResource.StatusMessage = statusMessage + if _, err := updateVM(ctx, *vmResource); err != nil { + return err + } + case ActionDelete: // Remote VM was deleted, delete local VM // // Note: this check needs to run for each VM @@ -287,58 +416,6 @@ func (worker *Worker) syncVMs(ctx context.Context) error { if err := worker.deleteVM(vm); err != nil { return err } - } else { - if remoteVM.Status == v1.VMStatusFailed { - // VM has failed on the remote side, stop it locally to prevent incorrect - // worker's resources calculation in the Controller's scheduler - vm.Stop() - } else if vm.Err() != nil { - // VM has failed on the local side, stop it before reporting as failed to prevent incorrect - // worker's resources calculation in the Controller's scheduler - vm.Stop() - - // Report the VM as failed - remoteVM.Status = v1.VMStatusFailed - remoteVM.StatusMessage = vm.Err().Error() - if _, err := worker.client.VMs().Update(ctx, remoteVM); err != nil { - return err - } - } else if vm.Status() != remoteVM.StatusMessage { - // Report the new VM status message - remoteVM.StatusMessage = vm.Status() - if _, err := worker.client.VMs().Update(ctx, remoteVM); err != nil { - return err - } - } - } - } - - for _, vmResource := range remoteVMsIndex { - odn := ondiskname.NewFromResource(vmResource) - - if vmResource.Status != v1.VMStatusPending { - continue - } - - if vm, ok := worker.vmm.Get(odn); ok { - // Remote VM was created, and the local VM too, - // check if the local VM had already started - // and update the remote VM as accordingly - if vm.Started() { - // Image FQN feature, see https://github.com/cirruslabs/orchard/issues/164 - if imageFQN := vm.ImageFQN(); imageFQN != nil { - vmResource.ImageFQN = *imageFQN - } - - // Mark the remote VM as started - vmResource.Status = v1.VMStatusRunning - if _, err := worker.client.VMs().Update(ctx, vmResource); err != nil { - return err - } - } - } else { - // Remote VM was created, but not the local VM - worker.createVM(odn, vmResource) } } @@ -403,15 +480,6 @@ func (worker *Worker) syncOnDiskVMs(ctx context.Context) error { worker.logger.Warnf("failed to stop") } } - - if remoteVM.Status != v1.VMStatusFailed { - remoteVM.Status = v1.VMStatusFailed - remoteVM.StatusMessage = "Worker lost track of VM" - _, err := worker.client.VMs().Update(ctx, remoteVM) - if err != nil { - return err - } - } } } @@ -454,3 +522,36 @@ func (worker *Worker) requestVMSyncing() { worker.logger.Debugf("There's already a syncing request in the queue, skipping") } } + +func sortNonExistentAndFailedFirst(input []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM]) { + slices.SortStableFunc(input, func(left, right lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM]) int { + _, leftVM, _ := lo.Unpack3(left) + _, rightVM, _ := lo.Unpack3(right) + + leftNonExistent := leftVM == nil + rightNonExistent := rightVM == nil + + switch { + case leftNonExistent && rightNonExistent: + return 0 + case leftNonExistent: + return -1 + case rightNonExistent: + return 1 + } + + leftFailed := leftVM != nil && leftVM.Status == v1.VMStatusFailed + rightFailed := rightVM != nil && rightVM.Status == v1.VMStatusFailed + + switch { + case leftFailed && rightFailed: + return 0 + case leftFailed: + return -1 + case rightFailed: + return 1 + default: + return 0 + } + }) +} diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go new file mode 100644 index 0000000..1176486 --- /dev/null +++ b/internal/worker/worker_test.go @@ -0,0 +1,37 @@ +package worker + +import ( + "testing" + + "github.com/cirruslabs/orchard/internal/worker/ondiskname" + "github.com/cirruslabs/orchard/internal/worker/vmmanager" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/samber/lo" + "github.com/stretchr/testify/require" +) + +func TestSortNonExistentAndFailedFirst(t *testing.T) { + newVMTuple := func(name string, vmResource *v1.VM) lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM] { + return lo.T3(ondiskname.New(name, name, 0), vmResource, &vmmanager.VM{}) + } + + target := []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM]{ + newVMTuple("test1", &v1.VM{Status: v1.VMStatusFailed}), + newVMTuple("test2", &v1.VM{Status: v1.VMStatusPending}), + newVMTuple("test3", &v1.VM{Status: v1.VMStatusRunning}), + newVMTuple("test5", nil), + newVMTuple("test4", &v1.VM{Status: v1.VMStatusFailed}), + } + + sortNonExistentAndFailedFirst(target) + + expected := []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM]{ + newVMTuple("test5", nil), + newVMTuple("test1", &v1.VM{Status: v1.VMStatusFailed}), + newVMTuple("test4", &v1.VM{Status: v1.VMStatusFailed}), + newVMTuple("test2", &v1.VM{Status: v1.VMStatusPending}), + newVMTuple("test3", &v1.VM{Status: v1.VMStatusRunning}), + } + + require.Equal(t, expected, target) +} diff --git a/pkg/client/vms.go b/pkg/client/vms.go index 372ae4c..2acaa55 100644 --- a/pkg/client/vms.go +++ b/pkg/client/vms.go @@ -3,11 +3,12 @@ package client import ( "context" "fmt" - "github.com/cirruslabs/orchard/pkg/resource/v1" "net" "net/http" "net/url" "strconv" + + "github.com/cirruslabs/orchard/pkg/resource/v1" ) type VMsService struct { @@ -78,6 +79,17 @@ func (service *VMsService) Update(ctx context.Context, vm v1.VM) (*v1.VM, error) return &updatedVM, nil } +func (service *VMsService) UpdateState(ctx context.Context, vm v1.VM) (*v1.VM, error) { + var updatedVM v1.VM + err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("vms/%s/state", url.PathEscape(vm.Name)), + vm, &updatedVM, nil) + if err != nil { + return &updatedVM, err + } + + return &updatedVM, nil +} + func (service *VMsService) Delete(ctx context.Context, name string) error { err := service.client.request(ctx, http.MethodDelete, fmt.Sprintf("vms/%s", url.PathEscape(name)), nil, nil, nil) diff --git a/pkg/resource/v1/cmp_test.go b/pkg/resource/v1/cmp_test.go new file mode 100644 index 0000000..e8094df --- /dev/null +++ b/pkg/resource/v1/cmp_test.go @@ -0,0 +1,14 @@ +package v1_test + +import ( + "testing" + + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/google/go-cmp/cmp" +) + +// TestVM ensures that v1.VM and its embedded structs can be compared +// using github.com/google/go-cmp/cmp without causing panics. +func TestVM(t *testing.T) { + cmp.Equal(v1.VM{}, v1.VM{}) +} diff --git a/pkg/resource/v1/v1.go b/pkg/resource/v1/v1.go index b8b43b4..38ae7ac 100644 --- a/pkg/resource/v1/v1.go +++ b/pkg/resource/v1/v1.go @@ -22,18 +22,17 @@ type Meta struct { } type VM struct { - Image string `json:"image,omitempty"` - ImagePullPolicy ImagePullPolicy `json:"imagePullPolicy,omitempty"` - CPU uint64 `json:"cpu,omitempty"` - Memory uint64 `json:"memory,omitempty"` - DiskSize uint64 `json:"diskSize,omitempty"` - NetSoftnetDeprecated bool `json:"net-softnet,omitempty"` - NetSoftnet bool `json:"netSoftnet,omitempty"` - NetSoftnetAllow []string `json:"netSoftnetAllow,omitempty"` - NetSoftnetBlock []string `json:"netSoftnetBlock,omitempty"` - NetBridged string `json:"net-bridged,omitempty"` - Headless bool `json:"headless,omitempty"` - Nested bool `json:"nested,omitempty"` + Image string `json:"image,omitempty"` + ImagePullPolicy ImagePullPolicy `json:"imagePullPolicy,omitempty"` + CPU uint64 `json:"cpu,omitempty"` + Memory uint64 `json:"memory,omitempty"` + DiskSize uint64 `json:"diskSize,omitempty"` + NetBridged string `json:"net-bridged,omitempty"` + Headless bool `json:"headless,omitempty"` + Nested bool `json:"nested,omitempty"` + + VMSpec + VMState // Status field is used to track the lifecycle of the VM associated with this resource. Status VMStatus `json:"status,omitempty"` @@ -88,9 +87,29 @@ type VM struct { ScheduledAt time.Time `json:"scheduled_at,omitempty"` StartedAt time.Time `json:"started_at,omitempty"` + // Generation is incremented by the controller each time + // the resource's specification is changed. + // + // At some point we'll move Generation field to the Metadata + // structure as it can be useful for other resources too. + Generation uint64 `json:"generation"` + Meta } +type VMSpec struct { + NetSoftnetDeprecated bool `json:"net-softnet,omitempty"` + NetSoftnet bool `json:"netSoftnet,omitempty"` + NetSoftnetAllow []string `json:"netSoftnetAllow,omitempty"` + NetSoftnetBlock []string `json:"netSoftnetBlock,omitempty"` +} + +type VMState struct { + // ObservedGeneration corresponds to the Generation of VM specification + // on which the worker had acted upon. + ObservedGeneration uint64 `json:"observedGeneration"` +} + type Event struct { Kind EventKind `json:"kind,omitempty"` Timestamp int64 `json:"timestamp,omitempty"` @@ -134,8 +153,9 @@ const ( type ControllerCapability string const ( - ControllerCapabilityRPCV1 ControllerCapability = "rpc-v1" - ControllerCapabilityRPCV2 ControllerCapability = "rpc-v2" + ControllerCapabilityRPCV1 ControllerCapability = "rpc-v1" + ControllerCapabilityRPCV2 ControllerCapability = "rpc-v2" + ControllerCapabilityVMStateEndpoint ControllerCapability = "vm-state-endpoint" ) type ControllerCapabilities []ControllerCapability