Codex implementation of restart

This commit is contained in:
Fedor Korotkov 2025-10-29 06:36:07 -07:00
parent c744d724dc
commit a7a0dea41b
8 changed files with 204 additions and 1 deletions

View File

@ -422,6 +422,28 @@ paths:
description: VM resource with the given name doesn't exist
'503':
description: Failed to resolve the IP address on the worker responsible for the specified VM
/vms/{name}/restart:
parameters:
- in: path
name: name
required: true
schema:
type: string
post:
summary: "Restart a VM"
tags:
- vms
responses:
'202':
description: VM restart was requested
content:
application/json:
schema:
$ref: '#/components/schemas/VM'
'404':
description: VM resource with the given name doesn't exist
'412':
description: VM is not assigned to a worker and cannot be restarted
components:
schemas:
Worker:
@ -565,6 +587,10 @@ components:
VM restart policy: specify "Never" to never restart or "OnFailure" to only restart when the VM fails
default: Never
enum: [Never, OnFailure]
restart_requested:
type: boolean
description: Indicates that the worker should restart this VM in-place
readOnly: true
resources:
type: object
description: Resources required by this VM on the worker

View File

@ -159,6 +159,9 @@ func (controller *Controller) initAPI() *gin.Engine {
v1.DELETE("/vms/:name", func(c *gin.Context) {
controller.deleteVM(c).Respond(c)
})
v1.POST("/vms/:name/restart", func(c *gin.Context) {
controller.restartVM(c).Respond(c)
})
v1.GET("/vms/:name/events", func(c *gin.Context) {
controller.listVMEvents(c).Respond(c)
})

View File

@ -0,0 +1,74 @@
package controller
import (
"context"
"net/http"
"time"
"github.com/cirruslabs/orchard/internal/controller/lifecycle"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/responder"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/cirruslabs/orchard/rpc"
"github.com/gin-gonic/gin"
)
func (controller *Controller) restartVM(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
name := ctx.Param("name")
var updatedVM v1.VM
var workerName string
var shouldNotify bool
response := controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
vm, err := txn.GetVM(name)
if err != nil {
return responder.Error(err)
}
if vm.Worker == "" {
return responder.JSON(http.StatusPreconditionFailed,
NewErrorResponse("VM is not assigned to a worker, cannot restart"))
}
if !vm.RestartRequested {
vm.RestartRequested = true
vm.RestartedAt = time.Now()
vm.RestartCount++
shouldNotify = true
workerName = vm.Worker
lifecycle.Report(vm, "VM restart requested", controller.logger)
}
if err := txn.SetVM(*vm); err != nil {
controller.logger.Errorf("failed to store VM restart request in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
updatedVM = *vm
return responder.JSON(http.StatusAccepted, &updatedVM)
})
if shouldNotify && workerName != "" {
notifyCtx, notifyCtxCancel := context.WithTimeout(ctx, time.Second)
defer notifyCtxCancel()
if err := controller.workerNotifier.Notify(notifyCtx, workerName, &rpc.WatchInstruction{
Action: &rpc.WatchInstruction_SyncVmsAction{
SyncVmsAction: &rpc.WatchInstruction_SyncVMs{},
},
}); err != nil {
controller.logger.Warnf("failed to notify worker %s about VM restart: %v",
workerName, err)
}
}
return response
}

View File

@ -99,6 +99,45 @@ func TestSingleVM(t *testing.T) {
}), "VM was not garbage collected in a timely manner")
}
func TestVMRestart(t *testing.T) {
devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t)
err := devClient.VMs().Create(context.Background(), &v1.VM{
Meta: v1.Meta{
Name: "restart-vm",
},
Image: imageconstant.DefaultMacosImage,
CPU: 2,
Memory: 4 * 1024,
Headless: true,
Status: v1.VMStatusPending,
})
require.NoError(t, err)
require.True(t, wait.Wait(2*time.Minute, func() bool {
vm, getErr := devClient.VMs().Get(context.Background(), "restart-vm")
require.NoError(t, getErr)
return vm.Status == v1.VMStatusRunning
}), "failed to wait for the VM to start")
vmAfterRestartRequest, err := devClient.VMs().Restart(context.Background(), "restart-vm")
require.NoError(t, err)
require.True(t, vmAfterRestartRequest.RestartRequested, "restart flag should be set")
require.EqualValues(t, 1, vmAfterRestartRequest.RestartCount, "restart count should increment")
require.False(t, vmAfterRestartRequest.RestartedAt.IsZero(), "restart timestamp should be set")
require.True(t, wait.Wait(2*time.Minute, func() bool {
vm, getErr := devClient.VMs().Get(context.Background(), "restart-vm")
require.NoError(t, getErr)
return !vm.RestartRequested &&
vm.RestartCount == 1 &&
vm.Status == v1.VMStatusRunning &&
vm.StatusMessage == "VM restarted"
}), "VM wasn't restarted in-place by the worker")
}
func TestFailedStartupScript(t *testing.T) {
devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t)

View File

@ -415,6 +415,45 @@ func (vm *VM) Stop() {
vm.logger.Debugf("VM stopped")
}
func (vm *VM) Restart() {
vm.logger.Debugf("restarting VM")
vm.Stop()
vm.stopping.Store(false)
vm.err.Store(nil)
vm.ctx, vm.cancel = context.WithCancel(context.Background())
vm.wg.Add(1)
go func() {
defer vm.wg.Done()
vm.setStatus("VM restarted")
if err := vm.run(vm.ctx); err != nil {
select {
case <-vm.ctx.Done():
// Do not report error if restart was interrupted via context cancellation
default:
vm.setErr(fmt.Errorf("%w: %v", ErrVMFailed, err))
}
return
}
select {
case <-vm.ctx.Done():
// Do not report error if restart was interrupted via context cancellation
default:
if !vm.stopping.Load() {
vm.setErr(fmt.Errorf("%w: VM exited unexpectedly", ErrVMFailed))
}
}
}()
}
func (vm *VM) Delete() error {
if !vm.cloned.Load() {
return nil

View File

@ -288,7 +288,15 @@ func (worker *Worker) syncVMs(ctx context.Context) error {
return err
}
} else {
if remoteVM.Status == v1.VMStatusFailed {
if remoteVM.RestartRequested {
vm.Restart()
remoteVM.RestartRequested = false
remoteVM.StatusMessage = vm.Status()
if _, err := worker.client.VMs().Update(ctx, remoteVM); err != nil {
return err
}
} else if remoteVM.Status == v1.VMStatusFailed {
// VM has failed on the remote side, stop it locally to prevent incorrect
// worker's resources calculation in the Controller's scheduler
vm.Stop()

View File

@ -88,6 +88,18 @@ func (service *VMsService) Delete(ctx context.Context, name string) error {
return nil
}
func (service *VMsService) Restart(ctx context.Context, name string) (*v1.VM, error) {
var vm v1.VM
err := service.client.request(ctx, http.MethodPost, fmt.Sprintf("vms/%s/restart", url.PathEscape(name)),
nil, &vm, nil)
if err != nil {
return nil, err
}
return &vm, nil
}
func (service *VMsService) PortForward(
ctx context.Context,
name string,

View File

@ -63,6 +63,8 @@ type VM struct {
RestartPolicy RestartPolicy `json:"restart_policy,omitempty"`
RestartedAt time.Time `json:"restarted_at,omitempty"`
RestartCount uint64 `json:"restart_count,omitempty"`
// RestartRequested indicates that the worker should restart the VM in-place.
RestartRequested bool `json:"restart_requested,omitempty"`
// RandomSerial controls whether the worker will run the
// "tart set --random-serial" when instantiating this VM.