From a7a0dea41bb51f8de655effc570c0cd7372fa152 Mon Sep 17 00:00:00 2001 From: Fedor Korotkov Date: Wed, 29 Oct 2025 06:36:07 -0700 Subject: [PATCH] Codex implementation of restart --- api/openapi.yaml | 26 +++++++++ internal/controller/api.go | 3 ++ internal/controller/api_vms_restart.go | 74 ++++++++++++++++++++++++++ internal/tests/integration_test.go | 39 ++++++++++++++ internal/worker/vmmanager/vm.go | 39 ++++++++++++++ internal/worker/worker.go | 10 +++- pkg/client/vms.go | 12 +++++ pkg/resource/v1/v1.go | 2 + 8 files changed, 204 insertions(+), 1 deletion(-) create mode 100644 internal/controller/api_vms_restart.go diff --git a/api/openapi.yaml b/api/openapi.yaml index 17bc97a..0fc24c4 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -422,6 +422,28 @@ paths: description: VM resource with the given name doesn't exist '503': description: Failed to resolve the IP address on the worker responsible for the specified VM + /vms/{name}/restart: + parameters: + - in: path + name: name + required: true + schema: + type: string + post: + summary: "Restart a VM" + tags: + - vms + responses: + '202': + description: VM restart was requested + content: + application/json: + schema: + $ref: '#/components/schemas/VM' + '404': + description: VM resource with the given name doesn't exist + '412': + description: VM is not assigned to a worker and cannot be restarted components: schemas: Worker: @@ -565,6 +587,10 @@ components: VM restart policy: specify "Never" to never restart or "OnFailure" to only restart when the VM fails default: Never enum: [Never, OnFailure] + restart_requested: + type: boolean + description: Indicates that the worker should restart this VM in-place + readOnly: true resources: type: object description: Resources required by this VM on the worker diff --git a/internal/controller/api.go b/internal/controller/api.go index 99070e8..6cdf0a4 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -159,6 +159,9 @@ func (controller *Controller) initAPI() *gin.Engine { v1.DELETE("/vms/:name", func(c *gin.Context) { controller.deleteVM(c).Respond(c) }) + v1.POST("/vms/:name/restart", func(c *gin.Context) { + controller.restartVM(c).Respond(c) + }) v1.GET("/vms/:name/events", func(c *gin.Context) { controller.listVMEvents(c).Respond(c) }) diff --git a/internal/controller/api_vms_restart.go b/internal/controller/api_vms_restart.go new file mode 100644 index 0000000..493e709 --- /dev/null +++ b/internal/controller/api_vms_restart.go @@ -0,0 +1,74 @@ +package controller + +import ( + "context" + "net/http" + "time" + + "github.com/cirruslabs/orchard/internal/controller/lifecycle" + storepkg "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/cirruslabs/orchard/internal/responder" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/cirruslabs/orchard/rpc" + "github.com/gin-gonic/gin" +) + +func (controller *Controller) restartVM(ctx *gin.Context) responder.Responder { + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil { + return responder + } + + name := ctx.Param("name") + + var updatedVM v1.VM + var workerName string + var shouldNotify bool + + response := controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { + vm, err := txn.GetVM(name) + if err != nil { + return responder.Error(err) + } + + if vm.Worker == "" { + return responder.JSON(http.StatusPreconditionFailed, + NewErrorResponse("VM is not assigned to a worker, cannot restart")) + } + + if !vm.RestartRequested { + vm.RestartRequested = true + vm.RestartedAt = time.Now() + vm.RestartCount++ + shouldNotify = true + workerName = vm.Worker + + lifecycle.Report(vm, "VM restart requested", controller.logger) + } + + if err := txn.SetVM(*vm); err != nil { + controller.logger.Errorf("failed to store VM restart request in the DB: %v", err) + + return responder.Code(http.StatusInternalServerError) + } + + updatedVM = *vm + + return responder.JSON(http.StatusAccepted, &updatedVM) + }) + + if shouldNotify && workerName != "" { + notifyCtx, notifyCtxCancel := context.WithTimeout(ctx, time.Second) + defer notifyCtxCancel() + + if err := controller.workerNotifier.Notify(notifyCtx, workerName, &rpc.WatchInstruction{ + Action: &rpc.WatchInstruction_SyncVmsAction{ + SyncVmsAction: &rpc.WatchInstruction_SyncVMs{}, + }, + }); err != nil { + controller.logger.Warnf("failed to notify worker %s about VM restart: %v", + workerName, err) + } + } + + return response +} diff --git a/internal/tests/integration_test.go b/internal/tests/integration_test.go index 5b62e42..bba3688 100644 --- a/internal/tests/integration_test.go +++ b/internal/tests/integration_test.go @@ -99,6 +99,45 @@ func TestSingleVM(t *testing.T) { }), "VM was not garbage collected in a timely manner") } +func TestVMRestart(t *testing.T) { + devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t) + + err := devClient.VMs().Create(context.Background(), &v1.VM{ + Meta: v1.Meta{ + Name: "restart-vm", + }, + Image: imageconstant.DefaultMacosImage, + CPU: 2, + Memory: 4 * 1024, + Headless: true, + Status: v1.VMStatusPending, + }) + require.NoError(t, err) + + require.True(t, wait.Wait(2*time.Minute, func() bool { + vm, getErr := devClient.VMs().Get(context.Background(), "restart-vm") + require.NoError(t, getErr) + + return vm.Status == v1.VMStatusRunning + }), "failed to wait for the VM to start") + + vmAfterRestartRequest, err := devClient.VMs().Restart(context.Background(), "restart-vm") + require.NoError(t, err) + require.True(t, vmAfterRestartRequest.RestartRequested, "restart flag should be set") + require.EqualValues(t, 1, vmAfterRestartRequest.RestartCount, "restart count should increment") + require.False(t, vmAfterRestartRequest.RestartedAt.IsZero(), "restart timestamp should be set") + + require.True(t, wait.Wait(2*time.Minute, func() bool { + vm, getErr := devClient.VMs().Get(context.Background(), "restart-vm") + require.NoError(t, getErr) + + return !vm.RestartRequested && + vm.RestartCount == 1 && + vm.Status == v1.VMStatusRunning && + vm.StatusMessage == "VM restarted" + }), "VM wasn't restarted in-place by the worker") +} + func TestFailedStartupScript(t *testing.T) { devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t) diff --git a/internal/worker/vmmanager/vm.go b/internal/worker/vmmanager/vm.go index 0f1aa15..313f8c1 100644 --- a/internal/worker/vmmanager/vm.go +++ b/internal/worker/vmmanager/vm.go @@ -415,6 +415,45 @@ func (vm *VM) Stop() { vm.logger.Debugf("VM stopped") } +func (vm *VM) Restart() { + vm.logger.Debugf("restarting VM") + + vm.Stop() + + vm.stopping.Store(false) + vm.err.Store(nil) + + vm.ctx, vm.cancel = context.WithCancel(context.Background()) + + vm.wg.Add(1) + + go func() { + defer vm.wg.Done() + + vm.setStatus("VM restarted") + + if err := vm.run(vm.ctx); err != nil { + select { + case <-vm.ctx.Done(): + // Do not report error if restart was interrupted via context cancellation + default: + vm.setErr(fmt.Errorf("%w: %v", ErrVMFailed, err)) + } + + return + } + + select { + case <-vm.ctx.Done(): + // Do not report error if restart was interrupted via context cancellation + default: + if !vm.stopping.Load() { + vm.setErr(fmt.Errorf("%w: VM exited unexpectedly", ErrVMFailed)) + } + } + }() +} + func (vm *VM) Delete() error { if !vm.cloned.Load() { return nil diff --git a/internal/worker/worker.go b/internal/worker/worker.go index f019920..f5a17e7 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -288,7 +288,15 @@ func (worker *Worker) syncVMs(ctx context.Context) error { return err } } else { - if remoteVM.Status == v1.VMStatusFailed { + if remoteVM.RestartRequested { + vm.Restart() + + remoteVM.RestartRequested = false + remoteVM.StatusMessage = vm.Status() + if _, err := worker.client.VMs().Update(ctx, remoteVM); err != nil { + return err + } + } else if remoteVM.Status == v1.VMStatusFailed { // VM has failed on the remote side, stop it locally to prevent incorrect // worker's resources calculation in the Controller's scheduler vm.Stop() diff --git a/pkg/client/vms.go b/pkg/client/vms.go index 372ae4c..bae2fdf 100644 --- a/pkg/client/vms.go +++ b/pkg/client/vms.go @@ -88,6 +88,18 @@ func (service *VMsService) Delete(ctx context.Context, name string) error { return nil } +func (service *VMsService) Restart(ctx context.Context, name string) (*v1.VM, error) { + var vm v1.VM + + err := service.client.request(ctx, http.MethodPost, fmt.Sprintf("vms/%s/restart", url.PathEscape(name)), + nil, &vm, nil) + if err != nil { + return nil, err + } + + return &vm, nil +} + func (service *VMsService) PortForward( ctx context.Context, name string, diff --git a/pkg/resource/v1/v1.go b/pkg/resource/v1/v1.go index 0c80eb3..a1409a3 100644 --- a/pkg/resource/v1/v1.go +++ b/pkg/resource/v1/v1.go @@ -63,6 +63,8 @@ type VM struct { RestartPolicy RestartPolicy `json:"restart_policy,omitempty"` RestartedAt time.Time `json:"restarted_at,omitempty"` RestartCount uint64 `json:"restart_count,omitempty"` + // RestartRequested indicates that the worker should restart the VM in-place. + RestartRequested bool `json:"restart_requested,omitempty"` // RandomSerial controls whether the worker will run the // "tart set --random-serial" when instantiating this VM.