Fail VMs if the worker had crashed/is unhealthy (#70)

* Fail VMs if the worker had crashed/is unhealthy

* OnDiskName: properly handle cases when VM's name contains hyphens

* Worker: introduce Offline() method and check it before scheduling

* tart.List(): use Tart's JSON output

* OnDiskName: remove empty parts check

* Scheduler: move health-checking logic to a separate function

* Only fail "running" VMs

* Only fail orphaned VMs if they're in terminal state

* Integration tests

* Run healthCheckingLoopIteration() before schedulingLoopIteration()

* Worker: sync on-disk VMs only once at start
This commit is contained in:
Nikolay Edigaryev 2023-04-03 16:47:49 +04:00 committed by GitHub
parent ea1e5c8578
commit 4eafec99a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 523 additions and 48 deletions

1
go.mod
View File

@ -23,6 +23,7 @@ require (
github.com/stretchr/testify v1.8.1
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.1.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/net v0.7.0
golang.org/x/term v0.5.0
google.golang.org/grpc v1.53.0

2
go.sum
View File

@ -320,6 +320,8 @@ golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=

View File

@ -52,7 +52,8 @@ func runDev(cmd *cobra.Command, args []string) error {
}
devController, devWorker, err := CreateDevControllerAndWorker(devDataDirPath,
fmt.Sprintf(":%d", netconstants.DefaultControllerPort), resources)
fmt.Sprintf(":%d", netconstants.DefaultControllerPort), resources,
nil, nil)
if err != nil {
return err
@ -79,6 +80,8 @@ func CreateDevControllerAndWorker(
devDataDirPath string,
controllerListenAddr string,
resources v1.Resources,
additionalControllerOpts []controller.Option,
additionalWorkerOpts []worker.Option,
) (*controller.Controller, *worker.Worker, error) {
// Initialize the logger
logger, err := zap.NewDevelopment()
@ -96,13 +99,17 @@ func CreateDevControllerAndWorker(
return nil, nil, err
}
devController, err := controller.New(
controllerOpts := []controller.Option{
controller.WithDataDir(dataDir),
controller.WithListenAddr(controllerListenAddr),
controller.WithInsecureAuthDisabled(),
controller.WithSwaggerDocs(),
controller.WithLogger(logger),
)
}
controllerOpts = append(controllerOpts, additionalControllerOpts...)
devController, err := controller.New(controllerOpts...)
if err != nil {
return nil, nil, err
}
@ -111,7 +118,15 @@ func CreateDevControllerAndWorker(
if err != nil {
return nil, nil, err
}
devWorker, err := worker.New(defaultClient, worker.WithResources(resources), worker.WithLogger(logger))
workerOpts := []worker.Option{
worker.WithResources(resources),
worker.WithLogger(logger),
}
workerOpts = append(workerOpts, additionalWorkerOpts...)
devWorker, err := worker.New(defaultClient, workerOpts...)
if err != nil {
return nil, nil, err
}

View File

@ -43,14 +43,16 @@ type Controller struct {
workerNotifier *notifier.Notifier
proxy *proxy.Proxy
enableSwaggerDocs bool
workerOfflineTimeout time.Duration
rpc.UnimplementedControllerServer
}
func New(opts ...Option) (*Controller, error) {
controller := &Controller{
workerNotifier: notifier.NewNotifier(),
proxy: proxy.NewProxy(),
workerNotifier: notifier.NewNotifier(),
proxy: proxy.NewProxy(),
workerOfflineTimeout: 3 * time.Minute,
}
// Apply options
@ -76,7 +78,8 @@ func New(opts ...Option) (*Controller, error) {
return nil, err
}
controller.store = store
controller.scheduler = scheduler.NewScheduler(store, controller.workerNotifier, controller.logger)
controller.scheduler = scheduler.NewScheduler(store, controller.workerNotifier,
controller.workerOfflineTimeout, controller.logger)
listener, err := net.Listen("tcp", controller.listenAddr)
if err != nil {

View File

@ -3,6 +3,7 @@ package controller
import (
"crypto/tls"
"go.uber.org/zap"
"time"
)
type Option func(*Controller)
@ -37,6 +38,12 @@ func WithSwaggerDocs() Option {
}
}
func WithWorkerOfflineTimeout(workerOfflineTimeout time.Duration) Option {
return func(controller *Controller) {
controller.workerOfflineTimeout = workerOfflineTimeout
}
}
func WithLogger(logger *zap.Logger) Option {
return func(controller *Controller) {
controller.logger = logger.Sugar()

View File

@ -14,18 +14,25 @@ import (
const schedulerInterval = 5 * time.Second
type Scheduler struct {
store storepkg.Store
notifier *notifier.Notifier
logger *zap.SugaredLogger
schedulingRequested chan bool
store storepkg.Store
notifier *notifier.Notifier
workerOfflineTimeout time.Duration
logger *zap.SugaredLogger
schedulingRequested chan bool
}
func NewScheduler(store storepkg.Store, notifier *notifier.Notifier, logger *zap.SugaredLogger) *Scheduler {
func NewScheduler(
store storepkg.Store,
notifier *notifier.Notifier,
workerOfflineTimeout time.Duration,
logger *zap.SugaredLogger,
) *Scheduler {
return &Scheduler{
store: store,
notifier: notifier,
logger: logger,
schedulingRequested: make(chan bool, 1),
store: store,
notifier: notifier,
workerOfflineTimeout: workerOfflineTimeout,
logger: logger,
schedulingRequested: make(chan bool, 1),
}
}
@ -36,6 +43,10 @@ func (scheduler *Scheduler) Run() {
case <-scheduler.schedulingRequested:
case <-time.After(schedulerInterval):
}
if err := scheduler.healthCheckingLoopIteration(); err != nil {
scheduler.logger.Errorf("Failed to health-check VMs: %v", err)
}
if err := scheduler.schedulingLoopIteration(); err != nil {
scheduler.logger.Errorf("Failed to schedule VMs: %v", err)
}
@ -53,6 +64,7 @@ func (scheduler *Scheduler) RequestScheduling() {
func (scheduler *Scheduler) schedulingLoopIteration() error {
affectedWorkers := map[string]bool{}
err := scheduler.store.Update(func(txn storepkg.Transaction) error {
vms, err := txn.ListVMs()
if err != nil {
@ -71,7 +83,8 @@ func (scheduler *Scheduler) schedulingLoopIteration() error {
resourcesUsed := workerToResources.Get(worker.Name)
resourcesRemaining := worker.Resources.Subtracted(resourcesUsed)
if resourcesRemaining.CanFit(unscheduledVM.Resources) {
if resourcesRemaining.CanFit(unscheduledVM.Resources) &&
!worker.Offline(scheduler.workerOfflineTimeout) {
unscheduledVM.Worker = worker.Name
if err := txn.SetVM(unscheduledVM); err != nil {
@ -86,6 +99,7 @@ func (scheduler *Scheduler) schedulingLoopIteration() error {
return nil
})
syncVMsInstruction := rpc.WatchInstruction{
Action: &rpc.WatchInstruction_SyncVmsAction{},
}
@ -96,6 +110,7 @@ func (scheduler *Scheduler) schedulingLoopIteration() error {
scheduler.logger.Errorf("Failed to reactively sync VMs on worker %s: %v", workerToPoke, notifyErr)
}
}
return err
}
@ -118,3 +133,62 @@ func processVMs(vms []v1.VM) ([]v1.VM, WorkerToResources) {
return unscheduledVMs, workerToResources
}
func (scheduler *Scheduler) healthCheckingLoopIteration() error {
return scheduler.store.Update(func(txn storepkg.Transaction) error {
// Retrieve scheduled VMs
vms, err := txn.ListVMs()
if err != nil {
return err
}
var scheduledVMs []v1.VM
for _, vm := range vms {
if vm.Worker != "" {
scheduledVMs = append(scheduledVMs, vm)
}
}
// Retrieve and index workers by name
workers, err := txn.ListWorkers()
if err != nil {
return err
}
nameToWorker := map[string]v1.Worker{}
for _, worker := range workers {
nameToWorker[worker.Name] = worker
}
// Process scheduled VMs
for _, scheduledVM := range scheduledVMs {
if err := scheduler.healthCheckVM(txn, nameToWorker, scheduledVM); err != nil {
return err
}
}
return nil
})
}
func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, nameToWorker map[string]v1.Worker, vm v1.VM) error {
worker, ok := nameToWorker[vm.Worker]
if !ok {
vm.Status = v1.VMStatusFailed
vm.StatusMessage = "VM is assigned to a worker that " +
"doesn't exist anymore"
return txn.SetVM(vm)
}
if worker.Offline(scheduler.workerOfflineTimeout) && !vm.TerminalState() {
vm.Status = v1.VMStatusFailed
vm.StatusMessage = "VM is assigned to a worker that " +
"lost connection with the controller"
return txn.SetVM(vm)
}
return nil
}

View File

@ -4,11 +4,18 @@ import (
"context"
"errors"
"github.com/cirruslabs/orchard/internal/command/dev"
"github.com/cirruslabs/orchard/internal/controller"
"github.com/cirruslabs/orchard/internal/worker"
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/internal/worker/tart"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/crypto/ssh"
"golang.org/x/exp/slices"
"net"
"net/http"
"testing"
@ -149,9 +156,20 @@ func Wait(duration time.Duration, condition func() bool) bool {
}
}
func StartIntegrationTestEnvironment(t *testing.T) *client.Client {
func StartIntegrationTestEnvironment(
t *testing.T,
) *client.Client {
return StartIntegrationTestEnvironmentWithAdditionalOpts(t, nil, nil)
}
func StartIntegrationTestEnvironmentWithAdditionalOpts(
t *testing.T,
additionalControllerOpts []controller.Option,
additionalWorkerOpts []worker.Option,
) *client.Client {
t.Setenv("ORCHARD_HOME", t.TempDir())
devController, devWorker, err := dev.CreateDevControllerAndWorker(t.TempDir(), ":0", nil)
devController, devWorker, err := dev.CreateDevControllerAndWorker(t.TempDir(),
":0", nil, additionalControllerOpts, additionalWorkerOpts)
if err != nil {
t.Fatal(err)
}
@ -234,3 +252,181 @@ func TestPortForwarding(t *testing.T) {
require.NoError(t, err)
require.Contains(t, string(unameOutput), "Darwin arm64")
}
// TestSchedulerHealthCheckingNonExistentWorker ensures that scheduler
// will eventually fail VMs that are scheduled on a worker that was
// deleted from the API.
func TestSchedulerHealthCheckingNonExistentWorker(t *testing.T) {
ctx := context.Background()
devClient := StartIntegrationTestEnvironment(t)
const (
dummyWorkerName = "dummy-worker"
dummyVMName = "dummy-vm"
)
// Create a dummy worker that won't update it's LastSeen
// timestamp, which will result in scheduler failing VMs
// scheduled on that worker.
//
// We use a special resource "unique-resource" to prevent
// our dummy VM (see below) from scheduling on any worker
// other than this one.
_, err := devClient.Workers().Create(ctx, v1.Worker{
Meta: v1.Meta{
Name: dummyWorkerName,
},
LastSeen: time.Now(),
MachineID: uuid.New().String(),
Resources: map[string]uint64{
v1.ResourceTartVMs: 1,
"unique-resource": 1,
},
})
require.NoError(t, err)
// Create a dummy VM
err = devClient.VMs().Create(context.Background(), &v1.VM{
Meta: v1.Meta{
Name: dummyVMName,
},
Image: "ghcr.io/cirruslabs/macos-ventura-base:latest",
CPU: 4,
Memory: 8 * 1024,
Headless: true,
Resources: map[string]uint64{
"unique-resource": 1,
},
})
require.NoError(t, err)
// Wait for the dummy VM to get scheduled to a dummy worker
require.True(t, Wait(2*time.Minute, func() bool {
vm, err := devClient.VMs().Get(context.Background(), dummyVMName)
require.NoError(t, err)
t.Logf("Waiting for the VM to be assigned to a dummy worker, current worker: %q", vm.Worker)
return vm.Worker == dummyWorkerName
}), "failed to wait for the dummy VM to be assigned to a dummy worker")
// Delete the dummy worker
err = devClient.Workers().Delete(ctx, dummyWorkerName)
require.NoError(t, err)
// Wait for the scheduler to change the dummy VM's status to "failed"
require.True(t, Wait(2*time.Minute, func() bool {
vm, err := devClient.VMs().Get(context.Background(), dummyVMName)
require.NoError(t, err)
t.Logf("Waiting for the VM to be failed by the scheduler")
return vm.Status == v1.VMStatusFailed
}), "VM was not marked as failed in time")
// Double check VM's status and status message
vm, err := devClient.VMs().Get(context.Background(), dummyVMName)
require.NoError(t, err)
require.Equal(t, v1.VMStatusFailed, vm.Status)
require.Equal(t, "VM is assigned to a worker that doesn't exist anymore", vm.StatusMessage)
}
// TestSchedulerHealthCheckingOfflineWorker ensures that scheduler
// will eventually fail VMs that are scheduled on a worker that had
// gone offline for a long time.
func TestSchedulerHealthCheckingOfflineWorker(t *testing.T) {
ctx := context.Background()
devClient := StartIntegrationTestEnvironmentWithAdditionalOpts(t,
[]controller.Option{controller.WithWorkerOfflineTimeout(1 * time.Minute)}, nil)
const (
dummyWorkerName = "dummy-worker"
dummyVMName = "dummy-vm"
)
// Create a dummy worker that will be eventually marked as offline
// because we won't update the LastSeen field
_, err := devClient.Workers().Create(ctx, v1.Worker{
Meta: v1.Meta{
Name: dummyWorkerName,
},
LastSeen: time.Now(),
MachineID: uuid.New().String(),
Resources: map[string]uint64{
v1.ResourceTartVMs: 1,
"unique-resource": 1,
},
})
require.NoError(t, err)
// Create a dummy VM that will be assigned to our dummy worker
err = devClient.VMs().Create(context.Background(), &v1.VM{
Meta: v1.Meta{
Name: dummyVMName,
},
Image: "ghcr.io/cirruslabs/macos-ventura-base:latest",
CPU: 4,
Memory: 8 * 1024,
Headless: true,
Resources: map[string]uint64{
"unique-resource": 1,
},
})
require.NoError(t, err)
// Wait for the VM to be marked as failed
assert.True(t, Wait(2*time.Minute, func() bool {
vm, err := devClient.VMs().Get(context.Background(), dummyVMName)
require.NoError(t, err)
t.Logf("Waiting for the VM to be marked as failed, current status: %s", vm.Status)
return vm.Status == v1.VMStatusFailed
}), "VM wasn't marked as failed in a reasonable time")
// Double-check the VM's status message
runningVM, err := devClient.VMs().Get(context.Background(), dummyVMName)
require.NoError(t, err)
require.Equal(t, v1.VMStatusFailed, runningVM.Status)
require.Equal(t, "VM is assigned to a worker that lost connection with the controller",
runningVM.StatusMessage)
}
// TestVMGarbageCollection ensures that on-disk Tart VMs that are managed by Orchard
// and are not present in the API anymore are garbage-collected by the Orchard Worker
// at startup.
func TestVMGarbageCollection(t *testing.T) {
ctx := context.Background()
logger, err := zap.NewDevelopment()
require.NoError(t, err)
// Create on-disk Tart VM that looks like it's managed by Orchard
vmName := ondiskname.New("test", uuid.New().String()).String()
_, _, err = tart.Tart(ctx, logger.Sugar(), "clone",
"ghcr.io/cirruslabs/macos-ventura-base:latest", vmName)
require.NoError(t, err)
// Make sure that this VM exists
hasVM := func(name string) bool {
vmInfos, err := tart.List(ctx, logger.Sugar())
require.NoError(t, err)
return slices.ContainsFunc(vmInfos, func(vmInfo tart.VMInfo) bool {
return vmInfo.Name == name
})
}
require.True(t, hasVM(vmName))
// Start the Orchard Worker
_ = StartIntegrationTestEnvironment(t)
// Wait for the Orchard Worker to garbage-collect this VM
require.True(t, Wait(2*time.Minute, func() bool {
t.Logf("Waiting for the on-disk VM to be cleaned up by the worker")
return !hasVM(vmName)
}), "failed to wait for the VM %s to be garbage-collected", vmName)
}

View File

@ -0,0 +1,58 @@
package ondiskname
import (
"errors"
"fmt"
"strings"
)
var (
ErrNotManagedByOrchard = errors.New("this on-disk VM is not managed by Orchard")
ErrInvalidOnDiskName = errors.New("invalid on-disk VM name")
)
const (
prefix = "orchard"
numHyphensInUUID = 5
)
type OnDiskName struct {
Name string
UID string
}
func New(name string, uid string) OnDiskName {
return OnDiskName{
Name: name,
UID: uid,
}
}
func Parse(s string) (OnDiskName, error) {
splits := strings.Split(s, "-")
if !strings.HasPrefix(s, fmt.Sprintf("%s-", prefix)) {
return OnDiskName{}, ErrNotManagedByOrchard
}
if len(splits) < 7 {
return OnDiskName{}, fmt.Errorf("%w: name should contain at least 7 parts delimited by \"-\"",
ErrInvalidOnDiskName)
}
if splits[0] != prefix {
return OnDiskName{}, fmt.Errorf("%w: name should begin with \"%s\" prefix",
ErrInvalidOnDiskName, prefix)
}
uuidStart := len(splits) - numHyphensInUUID
return OnDiskName{
Name: strings.Join(splits[1:uuidStart], "-"),
UID: strings.Join(splits[uuidStart:], "-"),
}, nil
}
func (odn OnDiskName) String() string {
return fmt.Sprintf("%s-%s-%s", prefix, odn.Name, odn.UID)
}

View File

@ -0,0 +1,29 @@
package ondiskname_test
import (
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"testing"
)
func TestOnDiskNameUUID(t *testing.T) {
onDiskNameOriginal := ondiskname.New("test-vm--", uuid.New().String())
onDiskNameParsed, err := ondiskname.Parse(onDiskNameOriginal.String())
require.NoError(t, err)
require.Equal(t, onDiskNameOriginal, onDiskNameParsed)
}
func TestOnDiskNameNonUUID(t *testing.T) {
onDiskNameOriginal := ondiskname.New("some-vm", "some-uid")
_, err := ondiskname.Parse(onDiskNameOriginal.String())
require.Error(t, err)
}
func TestOnDiskNameNonOrchard(t *testing.T) {
_, err := ondiskname.Parse("ghcr.io/cirruslabs/macos-ventura-base:latest")
require.Error(t, err)
}

View File

@ -1,10 +1,12 @@
package vmmanager
package tart
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"go.uber.org/zap"
"os/exec"
"strings"
)
@ -16,9 +18,14 @@ var (
ErrTartFailed = errors.New("tart command returned non-zero exit code")
)
//nolint:unparam // might use stderr at some point
func (vm *VM) tart(
type VMInfo struct {
Name string
Running bool
}
func Tart(
ctx context.Context,
logger *zap.SugaredLogger,
args ...string,
) (string, string, error) {
cmd := exec.CommandContext(ctx, tartCommandName, args...)
@ -28,7 +35,7 @@ func (vm *VM) tart(
cmd.Stdout = &stdout
cmd.Stderr = &stderr
vm.logger.Debugf("running '%s %s'", tartCommandName, strings.Join(args, " "))
logger.Debugf("running '%s %s'", tartCommandName, strings.Join(args, " "))
err := cmd.Run()
if err != nil {
if errors.Is(err, exec.ErrNotFound) {
@ -37,7 +44,7 @@ func (vm *VM) tart(
}
if exitErr, ok := err.(*exec.ExitError); ok {
vm.logger.Warnf(
logger.Warnf(
"'%s %s' failed with exit code %d: %s",
tartCommandName, strings.Join(args, " "),
exitErr.ExitCode(), firstNonEmptyLine(stderr.String(), stdout.String()),
@ -51,6 +58,21 @@ func (vm *VM) tart(
return stdout.String(), stderr.String(), err
}
func List(ctx context.Context, logger *zap.SugaredLogger) ([]VMInfo, error) {
output, _, err := Tart(ctx, logger, "list", "--format", "json")
if err != nil {
return nil, err
}
var entries []VMInfo
if err := json.Unmarshal([]byte(output), &entries); err != nil {
return nil, err
}
return entries, nil
}
func firstNonEmptyLine(outputs ...string) string {
for _, output := range outputs {
for _, line := range strings.Split(output, "\n") {

View File

@ -6,6 +6,8 @@ import (
"errors"
"fmt"
"github.com/avast/retry-go"
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/internal/worker/tart"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"go.uber.org/zap"
"golang.org/x/crypto/ssh"
@ -34,7 +36,7 @@ func NewVM(ctx context.Context, vmResource v1.VM, logger *zap.SugaredLogger) (*V
vmContext, vmContextCancel := context.WithCancel(context.Background())
vm := &VM{
id: fmt.Sprintf("orchard-%s-%s", vmResource.Name, vmResource.UID),
id: ondiskname.New(vmResource.Name, vmResource.UID).String(),
Resource: vmResource,
logger: logger,
@ -64,20 +66,22 @@ func NewVM(ctx context.Context, vmResource v1.VM, logger *zap.SugaredLogger) (*V
}
func (vm *VM) cloneAndConfigure(ctx context.Context) error {
_, _, err := vm.tart(ctx, "clone", vm.Resource.Image, vm.id)
_, _, err := tart.Tart(ctx, vm.logger, "clone", vm.Resource.Image, vm.id)
if err != nil {
return err
}
if vm.Resource.Memory != 0 {
_, _, err = vm.tart(ctx, "set", "--memory", strconv.FormatUint(vm.Resource.Memory, 10), vm.id)
_, _, err = tart.Tart(ctx, vm.logger, "set", "--memory",
strconv.FormatUint(vm.Resource.Memory, 10), vm.id)
if err != nil {
return err
}
}
if vm.Resource.CPU != 0 {
_, _, err = vm.tart(ctx, "set", "--cpu", strconv.FormatUint(vm.Resource.CPU, 10), vm.id)
_, _, err = tart.Tart(ctx, vm.logger, "set", "--cpu",
strconv.FormatUint(vm.Resource.CPU, 10), vm.id)
if err != nil {
return err
}
@ -97,7 +101,7 @@ func (vm *VM) run(ctx context.Context) error {
}
runArgs = append(runArgs, vm.id)
_, _, err := vm.tart(ctx, runArgs...)
_, _, err := tart.Tart(ctx, vm.logger, runArgs...)
if err != nil {
return err
}
@ -106,7 +110,7 @@ func (vm *VM) run(ctx context.Context) error {
}
func (vm *VM) IP(ctx context.Context) (string, error) {
stdout, _, err := vm.tart(ctx, "ip", "--wait", "60", vm.id)
stdout, _, err := tart.Tart(ctx, vm.logger, "ip", "--wait", "60", vm.id)
if err != nil {
return "", err
}
@ -115,7 +119,7 @@ func (vm *VM) IP(ctx context.Context) (string, error) {
}
func (vm *VM) Stop() error {
_, _, _ = vm.tart(context.Background(), "stop", vm.id)
_, _, _ = tart.Tart(context.Background(), vm.logger, "stop", vm.id)
vm.cancel()
@ -125,7 +129,7 @@ func (vm *VM) Stop() error {
}
func (vm *VM) Delete() error {
_, _, err := vm.tart(context.Background(), "delete", vm.id)
_, _, err := tart.Tart(context.Background(), vm.logger, "delete", vm.id)
if err != nil {
return fmt.Errorf("%w: failed to delete VM %s: %v", ErrFailed, vm.id, err)
}

View File

@ -6,6 +6,8 @@ import (
"fmt"
"github.com/avast/retry-go/v4"
"github.com/cirruslabs/orchard/internal/worker/iokitregistry"
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/internal/worker/tart"
"github.com/cirruslabs/orchard/internal/worker/vmmanager"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
@ -91,6 +93,11 @@ func (worker *Worker) runNewSession(ctx context.Context) error {
}), retry.Context(subCtx), retry.Attempts(0))
}()
// Sync on-disk VMs
if err := worker.syncOnDiskVMs(ctx); err != nil {
return err
}
for {
if err := worker.updateWorker(ctx); err != nil {
worker.logger.Errorf("failed to update worker resource: %v", err)
@ -164,7 +171,7 @@ func (worker *Worker) syncVMs(ctx context.Context) error {
worker.logger.Infof("syncing %d VMs...", len(remoteVMs))
// check if need to stop any of the VMs
// Check if we need to stop any of the VMs
for _, vmResource := range remoteVMs {
if vmResource.Status == v1.VMStatusStopping && worker.vmm.Exists(vmResource) {
if err := worker.stopVM(vmResource); err != nil {
@ -173,7 +180,7 @@ func (worker *Worker) syncVMs(ctx context.Context) error {
}
}
// then, handle pending VMs first
// Handle pending VMs
for _, vmResource := range remoteVMs {
// handle pending VMs
if vmResource.Status == v1.VMStatusPending && !worker.vmm.Exists(vmResource) {
@ -183,7 +190,7 @@ func (worker *Worker) syncVMs(ctx context.Context) error {
}
}
// lastly, try to sync local VMs with the remote ones
// Sync in-memory VMs
for _, vm := range worker.vmm.List() {
remoteVM, ok := remoteVMs[vm.Resource.UID]
if !ok {
@ -204,6 +211,56 @@ func (worker *Worker) syncVMs(ctx context.Context) error {
return nil
}
func (worker *Worker) syncOnDiskVMs(ctx context.Context) error {
remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name)
if err != nil {
return err
}
worker.logger.Infof("syncing on-disk VMs...")
vmInfos, err := tart.List(ctx, worker.logger)
if err != nil {
return err
}
for _, vmInfo := range vmInfos {
if vmInfo.Running {
continue
}
onDiskName, err := ondiskname.Parse(vmInfo.Name)
if err != nil {
if errors.Is(err, ondiskname.ErrNotManagedByOrchard) {
continue
}
return err
}
remoteVM, ok := remoteVMs[onDiskName.UID]
if !ok {
// On-disk VM doesn't exist on the controller, delete it
_, _, err := tart.Tart(ctx, worker.logger, "delete", vmInfo.Name)
if err != nil {
return err
}
} else if remoteVM.Status == v1.VMStatusRunning && !worker.vmm.Exists(v1.VM{UID: onDiskName.UID}) {
// On-disk VM exist on the controller,
// but we don't know about it, so
// mark it as failed
remoteVM.Status = v1.VMStatusFailed
remoteVM.StatusMessage = "Worker lost track of VM"
_, err := worker.client.VMs().Update(ctx, remoteVM)
if err != nil {
return err
}
}
}
return nil
}
func (worker *Worker) deleteVM(vmResource v1.VM) error {
worker.logger.Debugf("deleting VM %s (%s)", vmResource.Name, vmResource.UID)

View File

@ -18,19 +18,6 @@ type Meta struct {
CreatedAt time.Time `json:"createdAt"`
}
type Worker struct {
// LastSeen is set by the Worker and is used by the Controller
// to track unhealthy Workers.
LastSeen time.Time
MachineID string
// Resources available on this Worker.
Resources Resources `json:"resources"`
Meta
}
type VM struct {
Image string `json:"image"`
CPU uint64 `json:"cpu"`

20
pkg/resource/v1/worker.go Normal file
View File

@ -0,0 +1,20 @@
package v1
import "time"
type Worker struct {
// LastSeen is set by the Worker and is used by the Controller
// to track unhealthy Workers.
LastSeen time.Time
MachineID string
// Resources available on this Worker.
Resources Resources `json:"resources"`
Meta
}
func (worker Worker) Offline(workerOfflineTimeout time.Duration) bool {
return time.Since(worker.LastSeen) > workerOfflineTimeout
}