From 4eafec99a56190b36abb55e1512ba7e0c9d59798 Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Mon, 3 Apr 2023 16:47:49 +0400 Subject: [PATCH] Fail VMs if the worker had crashed/is unhealthy (#70) * Fail VMs if the worker had crashed/is unhealthy * OnDiskName: properly handle cases when VM's name contains hyphens * Worker: introduce Offline() method and check it before scheduling * tart.List(): use Tart's JSON output * OnDiskName: remove empty parts check * Scheduler: move health-checking logic to a separate function * Only fail "running" VMs * Only fail orphaned VMs if they're in terminal state * Integration tests * Run healthCheckingLoopIteration() before schedulingLoopIteration() * Worker: sync on-disk VMs only once at start --- go.mod | 1 + go.sum | 2 + internal/command/dev/dev.go | 23 +- internal/controller/controller.go | 9 +- internal/controller/option.go | 7 + internal/controller/scheduler/scheduler.go | 94 +++++++- internal/tests/integration_test.go | 200 +++++++++++++++++- internal/worker/ondiskname/ondiskname.go | 58 +++++ internal/worker/ondiskname/ondiskname_test.go | 29 +++ internal/worker/{vmmanager => tart}/tart.go | 32 ++- internal/worker/vmmanager/vm.go | 20 +- internal/worker/worker.go | 63 +++++- pkg/resource/v1/v1.go | 13 -- pkg/resource/v1/worker.go | 20 ++ 14 files changed, 523 insertions(+), 48 deletions(-) create mode 100644 internal/worker/ondiskname/ondiskname.go create mode 100644 internal/worker/ondiskname/ondiskname_test.go rename internal/worker/{vmmanager => tart}/tart.go (70%) create mode 100644 pkg/resource/v1/worker.go diff --git a/go.mod b/go.mod index d7d1882..9d7ff8d 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/stretchr/testify v1.8.1 go.uber.org/zap v1.24.0 golang.org/x/crypto v0.1.0 + golang.org/x/exp v0.0.0-20230321023759-10a507213a29 golang.org/x/net v0.7.0 golang.org/x/term v0.5.0 google.golang.org/grpc v1.53.0 diff --git a/go.sum b/go.sum index b60d7db..19f796b 100644 --- a/go.sum +++ b/go.sum @@ -320,6 +320,8 @@ golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= +golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/internal/command/dev/dev.go b/internal/command/dev/dev.go index b9fdaa1..6698954 100644 --- a/internal/command/dev/dev.go +++ b/internal/command/dev/dev.go @@ -52,7 +52,8 @@ func runDev(cmd *cobra.Command, args []string) error { } devController, devWorker, err := CreateDevControllerAndWorker(devDataDirPath, - fmt.Sprintf(":%d", netconstants.DefaultControllerPort), resources) + fmt.Sprintf(":%d", netconstants.DefaultControllerPort), resources, + nil, nil) if err != nil { return err @@ -79,6 +80,8 @@ func CreateDevControllerAndWorker( devDataDirPath string, controllerListenAddr string, resources v1.Resources, + additionalControllerOpts []controller.Option, + additionalWorkerOpts []worker.Option, ) (*controller.Controller, *worker.Worker, error) { // Initialize the logger logger, err := zap.NewDevelopment() @@ -96,13 +99,17 @@ func CreateDevControllerAndWorker( return nil, nil, err } - devController, err := controller.New( + controllerOpts := []controller.Option{ controller.WithDataDir(dataDir), controller.WithListenAddr(controllerListenAddr), controller.WithInsecureAuthDisabled(), controller.WithSwaggerDocs(), controller.WithLogger(logger), - ) + } + + controllerOpts = append(controllerOpts, additionalControllerOpts...) + + devController, err := controller.New(controllerOpts...) if err != nil { return nil, nil, err } @@ -111,7 +118,15 @@ func CreateDevControllerAndWorker( if err != nil { return nil, nil, err } - devWorker, err := worker.New(defaultClient, worker.WithResources(resources), worker.WithLogger(logger)) + + workerOpts := []worker.Option{ + worker.WithResources(resources), + worker.WithLogger(logger), + } + + workerOpts = append(workerOpts, additionalWorkerOpts...) + + devWorker, err := worker.New(defaultClient, workerOpts...) if err != nil { return nil, nil, err } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 94d6754..235d171 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -43,14 +43,16 @@ type Controller struct { workerNotifier *notifier.Notifier proxy *proxy.Proxy enableSwaggerDocs bool + workerOfflineTimeout time.Duration rpc.UnimplementedControllerServer } func New(opts ...Option) (*Controller, error) { controller := &Controller{ - workerNotifier: notifier.NewNotifier(), - proxy: proxy.NewProxy(), + workerNotifier: notifier.NewNotifier(), + proxy: proxy.NewProxy(), + workerOfflineTimeout: 3 * time.Minute, } // Apply options @@ -76,7 +78,8 @@ func New(opts ...Option) (*Controller, error) { return nil, err } controller.store = store - controller.scheduler = scheduler.NewScheduler(store, controller.workerNotifier, controller.logger) + controller.scheduler = scheduler.NewScheduler(store, controller.workerNotifier, + controller.workerOfflineTimeout, controller.logger) listener, err := net.Listen("tcp", controller.listenAddr) if err != nil { diff --git a/internal/controller/option.go b/internal/controller/option.go index 67f084d..61740a8 100644 --- a/internal/controller/option.go +++ b/internal/controller/option.go @@ -3,6 +3,7 @@ package controller import ( "crypto/tls" "go.uber.org/zap" + "time" ) type Option func(*Controller) @@ -37,6 +38,12 @@ func WithSwaggerDocs() Option { } } +func WithWorkerOfflineTimeout(workerOfflineTimeout time.Duration) Option { + return func(controller *Controller) { + controller.workerOfflineTimeout = workerOfflineTimeout + } +} + func WithLogger(logger *zap.Logger) Option { return func(controller *Controller) { controller.logger = logger.Sugar() diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index e35a308..63f436a 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -14,18 +14,25 @@ import ( const schedulerInterval = 5 * time.Second type Scheduler struct { - store storepkg.Store - notifier *notifier.Notifier - logger *zap.SugaredLogger - schedulingRequested chan bool + store storepkg.Store + notifier *notifier.Notifier + workerOfflineTimeout time.Duration + logger *zap.SugaredLogger + schedulingRequested chan bool } -func NewScheduler(store storepkg.Store, notifier *notifier.Notifier, logger *zap.SugaredLogger) *Scheduler { +func NewScheduler( + store storepkg.Store, + notifier *notifier.Notifier, + workerOfflineTimeout time.Duration, + logger *zap.SugaredLogger, +) *Scheduler { return &Scheduler{ - store: store, - notifier: notifier, - logger: logger, - schedulingRequested: make(chan bool, 1), + store: store, + notifier: notifier, + workerOfflineTimeout: workerOfflineTimeout, + logger: logger, + schedulingRequested: make(chan bool, 1), } } @@ -36,6 +43,10 @@ func (scheduler *Scheduler) Run() { case <-scheduler.schedulingRequested: case <-time.After(schedulerInterval): } + + if err := scheduler.healthCheckingLoopIteration(); err != nil { + scheduler.logger.Errorf("Failed to health-check VMs: %v", err) + } if err := scheduler.schedulingLoopIteration(); err != nil { scheduler.logger.Errorf("Failed to schedule VMs: %v", err) } @@ -53,6 +64,7 @@ func (scheduler *Scheduler) RequestScheduling() { func (scheduler *Scheduler) schedulingLoopIteration() error { affectedWorkers := map[string]bool{} + err := scheduler.store.Update(func(txn storepkg.Transaction) error { vms, err := txn.ListVMs() if err != nil { @@ -71,7 +83,8 @@ func (scheduler *Scheduler) schedulingLoopIteration() error { resourcesUsed := workerToResources.Get(worker.Name) resourcesRemaining := worker.Resources.Subtracted(resourcesUsed) - if resourcesRemaining.CanFit(unscheduledVM.Resources) { + if resourcesRemaining.CanFit(unscheduledVM.Resources) && + !worker.Offline(scheduler.workerOfflineTimeout) { unscheduledVM.Worker = worker.Name if err := txn.SetVM(unscheduledVM); err != nil { @@ -86,6 +99,7 @@ func (scheduler *Scheduler) schedulingLoopIteration() error { return nil }) + syncVMsInstruction := rpc.WatchInstruction{ Action: &rpc.WatchInstruction_SyncVmsAction{}, } @@ -96,6 +110,7 @@ func (scheduler *Scheduler) schedulingLoopIteration() error { scheduler.logger.Errorf("Failed to reactively sync VMs on worker %s: %v", workerToPoke, notifyErr) } } + return err } @@ -118,3 +133,62 @@ func processVMs(vms []v1.VM) ([]v1.VM, WorkerToResources) { return unscheduledVMs, workerToResources } + +func (scheduler *Scheduler) healthCheckingLoopIteration() error { + return scheduler.store.Update(func(txn storepkg.Transaction) error { + // Retrieve scheduled VMs + vms, err := txn.ListVMs() + if err != nil { + return err + } + + var scheduledVMs []v1.VM + + for _, vm := range vms { + if vm.Worker != "" { + scheduledVMs = append(scheduledVMs, vm) + } + } + + // Retrieve and index workers by name + workers, err := txn.ListWorkers() + if err != nil { + return err + } + + nameToWorker := map[string]v1.Worker{} + for _, worker := range workers { + nameToWorker[worker.Name] = worker + } + + // Process scheduled VMs + for _, scheduledVM := range scheduledVMs { + if err := scheduler.healthCheckVM(txn, nameToWorker, scheduledVM); err != nil { + return err + } + } + + return nil + }) +} + +func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, nameToWorker map[string]v1.Worker, vm v1.VM) error { + worker, ok := nameToWorker[vm.Worker] + if !ok { + vm.Status = v1.VMStatusFailed + vm.StatusMessage = "VM is assigned to a worker that " + + "doesn't exist anymore" + + return txn.SetVM(vm) + } + + if worker.Offline(scheduler.workerOfflineTimeout) && !vm.TerminalState() { + vm.Status = v1.VMStatusFailed + vm.StatusMessage = "VM is assigned to a worker that " + + "lost connection with the controller" + + return txn.SetVM(vm) + } + + return nil +} diff --git a/internal/tests/integration_test.go b/internal/tests/integration_test.go index 6e1e3a6..e3fa49e 100644 --- a/internal/tests/integration_test.go +++ b/internal/tests/integration_test.go @@ -4,11 +4,18 @@ import ( "context" "errors" "github.com/cirruslabs/orchard/internal/command/dev" + "github.com/cirruslabs/orchard/internal/controller" + "github.com/cirruslabs/orchard/internal/worker" + "github.com/cirruslabs/orchard/internal/worker/ondiskname" + "github.com/cirruslabs/orchard/internal/worker/tart" "github.com/cirruslabs/orchard/pkg/client" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "golang.org/x/crypto/ssh" + "golang.org/x/exp/slices" "net" "net/http" "testing" @@ -149,9 +156,20 @@ func Wait(duration time.Duration, condition func() bool) bool { } } -func StartIntegrationTestEnvironment(t *testing.T) *client.Client { +func StartIntegrationTestEnvironment( + t *testing.T, +) *client.Client { + return StartIntegrationTestEnvironmentWithAdditionalOpts(t, nil, nil) +} + +func StartIntegrationTestEnvironmentWithAdditionalOpts( + t *testing.T, + additionalControllerOpts []controller.Option, + additionalWorkerOpts []worker.Option, +) *client.Client { t.Setenv("ORCHARD_HOME", t.TempDir()) - devController, devWorker, err := dev.CreateDevControllerAndWorker(t.TempDir(), ":0", nil) + devController, devWorker, err := dev.CreateDevControllerAndWorker(t.TempDir(), + ":0", nil, additionalControllerOpts, additionalWorkerOpts) if err != nil { t.Fatal(err) } @@ -234,3 +252,181 @@ func TestPortForwarding(t *testing.T) { require.NoError(t, err) require.Contains(t, string(unameOutput), "Darwin arm64") } + +// TestSchedulerHealthCheckingNonExistentWorker ensures that scheduler +// will eventually fail VMs that are scheduled on a worker that was +// deleted from the API. +func TestSchedulerHealthCheckingNonExistentWorker(t *testing.T) { + ctx := context.Background() + + devClient := StartIntegrationTestEnvironment(t) + + const ( + dummyWorkerName = "dummy-worker" + dummyVMName = "dummy-vm" + ) + + // Create a dummy worker that won't update it's LastSeen + // timestamp, which will result in scheduler failing VMs + // scheduled on that worker. + // + // We use a special resource "unique-resource" to prevent + // our dummy VM (see below) from scheduling on any worker + // other than this one. + _, err := devClient.Workers().Create(ctx, v1.Worker{ + Meta: v1.Meta{ + Name: dummyWorkerName, + }, + LastSeen: time.Now(), + MachineID: uuid.New().String(), + Resources: map[string]uint64{ + v1.ResourceTartVMs: 1, + "unique-resource": 1, + }, + }) + require.NoError(t, err) + + // Create a dummy VM + err = devClient.VMs().Create(context.Background(), &v1.VM{ + Meta: v1.Meta{ + Name: dummyVMName, + }, + Image: "ghcr.io/cirruslabs/macos-ventura-base:latest", + CPU: 4, + Memory: 8 * 1024, + Headless: true, + Resources: map[string]uint64{ + "unique-resource": 1, + }, + }) + require.NoError(t, err) + + // Wait for the dummy VM to get scheduled to a dummy worker + require.True(t, Wait(2*time.Minute, func() bool { + vm, err := devClient.VMs().Get(context.Background(), dummyVMName) + require.NoError(t, err) + + t.Logf("Waiting for the VM to be assigned to a dummy worker, current worker: %q", vm.Worker) + + return vm.Worker == dummyWorkerName + }), "failed to wait for the dummy VM to be assigned to a dummy worker") + + // Delete the dummy worker + err = devClient.Workers().Delete(ctx, dummyWorkerName) + require.NoError(t, err) + + // Wait for the scheduler to change the dummy VM's status to "failed" + require.True(t, Wait(2*time.Minute, func() bool { + vm, err := devClient.VMs().Get(context.Background(), dummyVMName) + require.NoError(t, err) + + t.Logf("Waiting for the VM to be failed by the scheduler") + + return vm.Status == v1.VMStatusFailed + }), "VM was not marked as failed in time") + + // Double check VM's status and status message + vm, err := devClient.VMs().Get(context.Background(), dummyVMName) + require.NoError(t, err) + require.Equal(t, v1.VMStatusFailed, vm.Status) + require.Equal(t, "VM is assigned to a worker that doesn't exist anymore", vm.StatusMessage) +} + +// TestSchedulerHealthCheckingOfflineWorker ensures that scheduler +// will eventually fail VMs that are scheduled on a worker that had +// gone offline for a long time. +func TestSchedulerHealthCheckingOfflineWorker(t *testing.T) { + ctx := context.Background() + + devClient := StartIntegrationTestEnvironmentWithAdditionalOpts(t, + []controller.Option{controller.WithWorkerOfflineTimeout(1 * time.Minute)}, nil) + + const ( + dummyWorkerName = "dummy-worker" + dummyVMName = "dummy-vm" + ) + + // Create a dummy worker that will be eventually marked as offline + // because we won't update the LastSeen field + _, err := devClient.Workers().Create(ctx, v1.Worker{ + Meta: v1.Meta{ + Name: dummyWorkerName, + }, + LastSeen: time.Now(), + MachineID: uuid.New().String(), + Resources: map[string]uint64{ + v1.ResourceTartVMs: 1, + "unique-resource": 1, + }, + }) + require.NoError(t, err) + + // Create a dummy VM that will be assigned to our dummy worker + err = devClient.VMs().Create(context.Background(), &v1.VM{ + Meta: v1.Meta{ + Name: dummyVMName, + }, + Image: "ghcr.io/cirruslabs/macos-ventura-base:latest", + CPU: 4, + Memory: 8 * 1024, + Headless: true, + Resources: map[string]uint64{ + "unique-resource": 1, + }, + }) + require.NoError(t, err) + + // Wait for the VM to be marked as failed + assert.True(t, Wait(2*time.Minute, func() bool { + vm, err := devClient.VMs().Get(context.Background(), dummyVMName) + require.NoError(t, err) + + t.Logf("Waiting for the VM to be marked as failed, current status: %s", vm.Status) + + return vm.Status == v1.VMStatusFailed + }), "VM wasn't marked as failed in a reasonable time") + + // Double-check the VM's status message + runningVM, err := devClient.VMs().Get(context.Background(), dummyVMName) + require.NoError(t, err) + require.Equal(t, v1.VMStatusFailed, runningVM.Status) + require.Equal(t, "VM is assigned to a worker that lost connection with the controller", + runningVM.StatusMessage) +} + +// TestVMGarbageCollection ensures that on-disk Tart VMs that are managed by Orchard +// and are not present in the API anymore are garbage-collected by the Orchard Worker +// at startup. +func TestVMGarbageCollection(t *testing.T) { + ctx := context.Background() + + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + // Create on-disk Tart VM that looks like it's managed by Orchard + vmName := ondiskname.New("test", uuid.New().String()).String() + _, _, err = tart.Tart(ctx, logger.Sugar(), "clone", + "ghcr.io/cirruslabs/macos-ventura-base:latest", vmName) + require.NoError(t, err) + + // Make sure that this VM exists + hasVM := func(name string) bool { + vmInfos, err := tart.List(ctx, logger.Sugar()) + require.NoError(t, err) + + return slices.ContainsFunc(vmInfos, func(vmInfo tart.VMInfo) bool { + return vmInfo.Name == name + }) + } + require.True(t, hasVM(vmName)) + + // Start the Orchard Worker + _ = StartIntegrationTestEnvironment(t) + + // Wait for the Orchard Worker to garbage-collect this VM + require.True(t, Wait(2*time.Minute, func() bool { + t.Logf("Waiting for the on-disk VM to be cleaned up by the worker") + + return !hasVM(vmName) + }), "failed to wait for the VM %s to be garbage-collected", vmName) +} diff --git a/internal/worker/ondiskname/ondiskname.go b/internal/worker/ondiskname/ondiskname.go new file mode 100644 index 0000000..c7b602a --- /dev/null +++ b/internal/worker/ondiskname/ondiskname.go @@ -0,0 +1,58 @@ +package ondiskname + +import ( + "errors" + "fmt" + "strings" +) + +var ( + ErrNotManagedByOrchard = errors.New("this on-disk VM is not managed by Orchard") + ErrInvalidOnDiskName = errors.New("invalid on-disk VM name") +) + +const ( + prefix = "orchard" + numHyphensInUUID = 5 +) + +type OnDiskName struct { + Name string + UID string +} + +func New(name string, uid string) OnDiskName { + return OnDiskName{ + Name: name, + UID: uid, + } +} + +func Parse(s string) (OnDiskName, error) { + splits := strings.Split(s, "-") + + if !strings.HasPrefix(s, fmt.Sprintf("%s-", prefix)) { + return OnDiskName{}, ErrNotManagedByOrchard + } + + if len(splits) < 7 { + return OnDiskName{}, fmt.Errorf("%w: name should contain at least 7 parts delimited by \"-\"", + ErrInvalidOnDiskName) + } + + if splits[0] != prefix { + return OnDiskName{}, fmt.Errorf("%w: name should begin with \"%s\" prefix", + ErrInvalidOnDiskName, prefix) + } + + uuidStart := len(splits) - numHyphensInUUID + + return OnDiskName{ + Name: strings.Join(splits[1:uuidStart], "-"), + UID: strings.Join(splits[uuidStart:], "-"), + }, nil +} + +func (odn OnDiskName) String() string { + return fmt.Sprintf("%s-%s-%s", prefix, odn.Name, odn.UID) +} diff --git a/internal/worker/ondiskname/ondiskname_test.go b/internal/worker/ondiskname/ondiskname_test.go new file mode 100644 index 0000000..1e2f5ba --- /dev/null +++ b/internal/worker/ondiskname/ondiskname_test.go @@ -0,0 +1,29 @@ +package ondiskname_test + +import ( + "github.com/cirruslabs/orchard/internal/worker/ondiskname" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "testing" +) + +func TestOnDiskNameUUID(t *testing.T) { + onDiskNameOriginal := ondiskname.New("test-vm--", uuid.New().String()) + + onDiskNameParsed, err := ondiskname.Parse(onDiskNameOriginal.String()) + require.NoError(t, err) + + require.Equal(t, onDiskNameOriginal, onDiskNameParsed) +} + +func TestOnDiskNameNonUUID(t *testing.T) { + onDiskNameOriginal := ondiskname.New("some-vm", "some-uid") + + _, err := ondiskname.Parse(onDiskNameOriginal.String()) + require.Error(t, err) +} + +func TestOnDiskNameNonOrchard(t *testing.T) { + _, err := ondiskname.Parse("ghcr.io/cirruslabs/macos-ventura-base:latest") + require.Error(t, err) +} diff --git a/internal/worker/vmmanager/tart.go b/internal/worker/tart/tart.go similarity index 70% rename from internal/worker/vmmanager/tart.go rename to internal/worker/tart/tart.go index 558b6ae..fe88d8d 100644 --- a/internal/worker/vmmanager/tart.go +++ b/internal/worker/tart/tart.go @@ -1,10 +1,12 @@ -package vmmanager +package tart import ( "bytes" "context" + "encoding/json" "errors" "fmt" + "go.uber.org/zap" "os/exec" "strings" ) @@ -16,9 +18,14 @@ var ( ErrTartFailed = errors.New("tart command returned non-zero exit code") ) -//nolint:unparam // might use stderr at some point -func (vm *VM) tart( +type VMInfo struct { + Name string + Running bool +} + +func Tart( ctx context.Context, + logger *zap.SugaredLogger, args ...string, ) (string, string, error) { cmd := exec.CommandContext(ctx, tartCommandName, args...) @@ -28,7 +35,7 @@ func (vm *VM) tart( cmd.Stdout = &stdout cmd.Stderr = &stderr - vm.logger.Debugf("running '%s %s'", tartCommandName, strings.Join(args, " ")) + logger.Debugf("running '%s %s'", tartCommandName, strings.Join(args, " ")) err := cmd.Run() if err != nil { if errors.Is(err, exec.ErrNotFound) { @@ -37,7 +44,7 @@ func (vm *VM) tart( } if exitErr, ok := err.(*exec.ExitError); ok { - vm.logger.Warnf( + logger.Warnf( "'%s %s' failed with exit code %d: %s", tartCommandName, strings.Join(args, " "), exitErr.ExitCode(), firstNonEmptyLine(stderr.String(), stdout.String()), @@ -51,6 +58,21 @@ func (vm *VM) tart( return stdout.String(), stderr.String(), err } +func List(ctx context.Context, logger *zap.SugaredLogger) ([]VMInfo, error) { + output, _, err := Tart(ctx, logger, "list", "--format", "json") + if err != nil { + return nil, err + } + + var entries []VMInfo + + if err := json.Unmarshal([]byte(output), &entries); err != nil { + return nil, err + } + + return entries, nil +} + func firstNonEmptyLine(outputs ...string) string { for _, output := range outputs { for _, line := range strings.Split(output, "\n") { diff --git a/internal/worker/vmmanager/vm.go b/internal/worker/vmmanager/vm.go index b358894..804587b 100644 --- a/internal/worker/vmmanager/vm.go +++ b/internal/worker/vmmanager/vm.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" "github.com/avast/retry-go" + "github.com/cirruslabs/orchard/internal/worker/ondiskname" + "github.com/cirruslabs/orchard/internal/worker/tart" "github.com/cirruslabs/orchard/pkg/resource/v1" "go.uber.org/zap" "golang.org/x/crypto/ssh" @@ -34,7 +36,7 @@ func NewVM(ctx context.Context, vmResource v1.VM, logger *zap.SugaredLogger) (*V vmContext, vmContextCancel := context.WithCancel(context.Background()) vm := &VM{ - id: fmt.Sprintf("orchard-%s-%s", vmResource.Name, vmResource.UID), + id: ondiskname.New(vmResource.Name, vmResource.UID).String(), Resource: vmResource, logger: logger, @@ -64,20 +66,22 @@ func NewVM(ctx context.Context, vmResource v1.VM, logger *zap.SugaredLogger) (*V } func (vm *VM) cloneAndConfigure(ctx context.Context) error { - _, _, err := vm.tart(ctx, "clone", vm.Resource.Image, vm.id) + _, _, err := tart.Tart(ctx, vm.logger, "clone", vm.Resource.Image, vm.id) if err != nil { return err } if vm.Resource.Memory != 0 { - _, _, err = vm.tart(ctx, "set", "--memory", strconv.FormatUint(vm.Resource.Memory, 10), vm.id) + _, _, err = tart.Tart(ctx, vm.logger, "set", "--memory", + strconv.FormatUint(vm.Resource.Memory, 10), vm.id) if err != nil { return err } } if vm.Resource.CPU != 0 { - _, _, err = vm.tart(ctx, "set", "--cpu", strconv.FormatUint(vm.Resource.CPU, 10), vm.id) + _, _, err = tart.Tart(ctx, vm.logger, "set", "--cpu", + strconv.FormatUint(vm.Resource.CPU, 10), vm.id) if err != nil { return err } @@ -97,7 +101,7 @@ func (vm *VM) run(ctx context.Context) error { } runArgs = append(runArgs, vm.id) - _, _, err := vm.tart(ctx, runArgs...) + _, _, err := tart.Tart(ctx, vm.logger, runArgs...) if err != nil { return err } @@ -106,7 +110,7 @@ func (vm *VM) run(ctx context.Context) error { } func (vm *VM) IP(ctx context.Context) (string, error) { - stdout, _, err := vm.tart(ctx, "ip", "--wait", "60", vm.id) + stdout, _, err := tart.Tart(ctx, vm.logger, "ip", "--wait", "60", vm.id) if err != nil { return "", err } @@ -115,7 +119,7 @@ func (vm *VM) IP(ctx context.Context) (string, error) { } func (vm *VM) Stop() error { - _, _, _ = vm.tart(context.Background(), "stop", vm.id) + _, _, _ = tart.Tart(context.Background(), vm.logger, "stop", vm.id) vm.cancel() @@ -125,7 +129,7 @@ func (vm *VM) Stop() error { } func (vm *VM) Delete() error { - _, _, err := vm.tart(context.Background(), "delete", vm.id) + _, _, err := tart.Tart(context.Background(), vm.logger, "delete", vm.id) if err != nil { return fmt.Errorf("%w: failed to delete VM %s: %v", ErrFailed, vm.id, err) } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 82df581..5de29ef 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -6,6 +6,8 @@ import ( "fmt" "github.com/avast/retry-go/v4" "github.com/cirruslabs/orchard/internal/worker/iokitregistry" + "github.com/cirruslabs/orchard/internal/worker/ondiskname" + "github.com/cirruslabs/orchard/internal/worker/tart" "github.com/cirruslabs/orchard/internal/worker/vmmanager" "github.com/cirruslabs/orchard/pkg/client" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" @@ -91,6 +93,11 @@ func (worker *Worker) runNewSession(ctx context.Context) error { }), retry.Context(subCtx), retry.Attempts(0)) }() + // Sync on-disk VMs + if err := worker.syncOnDiskVMs(ctx); err != nil { + return err + } + for { if err := worker.updateWorker(ctx); err != nil { worker.logger.Errorf("failed to update worker resource: %v", err) @@ -164,7 +171,7 @@ func (worker *Worker) syncVMs(ctx context.Context) error { worker.logger.Infof("syncing %d VMs...", len(remoteVMs)) - // check if need to stop any of the VMs + // Check if we need to stop any of the VMs for _, vmResource := range remoteVMs { if vmResource.Status == v1.VMStatusStopping && worker.vmm.Exists(vmResource) { if err := worker.stopVM(vmResource); err != nil { @@ -173,7 +180,7 @@ func (worker *Worker) syncVMs(ctx context.Context) error { } } - // then, handle pending VMs first + // Handle pending VMs for _, vmResource := range remoteVMs { // handle pending VMs if vmResource.Status == v1.VMStatusPending && !worker.vmm.Exists(vmResource) { @@ -183,7 +190,7 @@ func (worker *Worker) syncVMs(ctx context.Context) error { } } - // lastly, try to sync local VMs with the remote ones + // Sync in-memory VMs for _, vm := range worker.vmm.List() { remoteVM, ok := remoteVMs[vm.Resource.UID] if !ok { @@ -204,6 +211,56 @@ func (worker *Worker) syncVMs(ctx context.Context) error { return nil } +func (worker *Worker) syncOnDiskVMs(ctx context.Context) error { + remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name) + if err != nil { + return err + } + + worker.logger.Infof("syncing on-disk VMs...") + + vmInfos, err := tart.List(ctx, worker.logger) + if err != nil { + return err + } + + for _, vmInfo := range vmInfos { + if vmInfo.Running { + continue + } + + onDiskName, err := ondiskname.Parse(vmInfo.Name) + if err != nil { + if errors.Is(err, ondiskname.ErrNotManagedByOrchard) { + continue + } + + return err + } + + remoteVM, ok := remoteVMs[onDiskName.UID] + if !ok { + // On-disk VM doesn't exist on the controller, delete it + _, _, err := tart.Tart(ctx, worker.logger, "delete", vmInfo.Name) + if err != nil { + return err + } + } else if remoteVM.Status == v1.VMStatusRunning && !worker.vmm.Exists(v1.VM{UID: onDiskName.UID}) { + // On-disk VM exist on the controller, + // but we don't know about it, so + // mark it as failed + remoteVM.Status = v1.VMStatusFailed + remoteVM.StatusMessage = "Worker lost track of VM" + _, err := worker.client.VMs().Update(ctx, remoteVM) + if err != nil { + return err + } + } + } + + return nil +} + func (worker *Worker) deleteVM(vmResource v1.VM) error { worker.logger.Debugf("deleting VM %s (%s)", vmResource.Name, vmResource.UID) diff --git a/pkg/resource/v1/v1.go b/pkg/resource/v1/v1.go index e2b3eb0..ba89305 100644 --- a/pkg/resource/v1/v1.go +++ b/pkg/resource/v1/v1.go @@ -18,19 +18,6 @@ type Meta struct { CreatedAt time.Time `json:"createdAt"` } -type Worker struct { - // LastSeen is set by the Worker and is used by the Controller - // to track unhealthy Workers. - LastSeen time.Time - - MachineID string - - // Resources available on this Worker. - Resources Resources `json:"resources"` - - Meta -} - type VM struct { Image string `json:"image"` CPU uint64 `json:"cpu"` diff --git a/pkg/resource/v1/worker.go b/pkg/resource/v1/worker.go new file mode 100644 index 0000000..c3ef845 --- /dev/null +++ b/pkg/resource/v1/worker.go @@ -0,0 +1,20 @@ +package v1 + +import "time" + +type Worker struct { + // LastSeen is set by the Worker and is used by the Controller + // to track unhealthy Workers. + LastSeen time.Time + + MachineID string + + // Resources available on this Worker. + Resources Resources `json:"resources"` + + Meta +} + +func (worker Worker) Offline(workerOfflineTimeout time.Duration) bool { + return time.Since(worker.LastSeen) > workerOfflineTimeout +}