Better state syncing and other improvements (#24)
This commit is contained in:
parent
8df31f7c2d
commit
165662bb0a
|
|
@ -1 +1,3 @@
|
|||
.idea
|
||||
|
||||
.dev-data
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ func newCreateVMCommand() *cobra.Command {
|
|||
|
||||
command.PersistentFlags().StringVar(&image, "image", "ghcr.io/cirruslabs/macos-ventura-base:latest", "image to use")
|
||||
command.PersistentFlags().Uint64Var(&cpu, "cpu", 4, "number of CPUs to use")
|
||||
command.PersistentFlags().Uint64Var(&memory, "memory", 8, "gigabytes of memory to use")
|
||||
command.PersistentFlags().Uint64Var(&memory, "memory", 8*1024, "megabytes of memory to use")
|
||||
command.PersistentFlags().BoolVar(&softnet, "softnet", false, "whether to use Softnet network isolation")
|
||||
command.PersistentFlags().BoolVar(&headless, "headless", true, "whether to run without graphics")
|
||||
|
||||
|
|
|
|||
|
|
@ -21,5 +21,5 @@ func runDeleteVM(cmd *cobra.Command, args []string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return client.VMs().Delete(cmd.Context(), name, false)
|
||||
return client.VMs().Delete(cmd.Context(), name)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,8 +6,12 @@ import (
|
|||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
var devDataDirPath string
|
||||
|
||||
func NewCommand() *cobra.Command {
|
||||
command := &cobra.Command{
|
||||
Use: "dev",
|
||||
|
|
@ -15,6 +19,9 @@ func NewCommand() *cobra.Command {
|
|||
RunE: runDev,
|
||||
}
|
||||
|
||||
command.PersistentFlags().StringVarP(&devDataDirPath, "data-dir", "d", ".dev-data",
|
||||
"path to persist data between runs")
|
||||
|
||||
return command
|
||||
}
|
||||
|
||||
|
|
@ -30,12 +37,15 @@ func runDev(cmd *cobra.Command, args []string) error {
|
|||
}
|
||||
}()
|
||||
|
||||
tempDir, err := os.MkdirTemp("", "")
|
||||
if err != nil {
|
||||
return err
|
||||
if !filepath.IsAbs(devDataDirPath) {
|
||||
pwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
devDataDirPath = path.Join(pwd, devDataDirPath)
|
||||
}
|
||||
|
||||
dataDir, err := controller.NewDataDir(tempDir)
|
||||
dataDir, err := controller.NewDataDir(devDataDirPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -46,7 +56,7 @@ func runDev(cmd *cobra.Command, args []string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
worker, err := worker.New(worker.WithDataDirPath(tempDir), worker.WithLogger(logger))
|
||||
worker, err := worker.New(worker.WithDataDirPath(devDataDirPath), worker.WithLogger(logger))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/cirruslabs/orchard/internal/command/dev"
|
||||
"github.com/cirruslabs/orchard/internal/command/get"
|
||||
"github.com/cirruslabs/orchard/internal/command/list"
|
||||
"github.com/cirruslabs/orchard/internal/command/stop"
|
||||
"github.com/cirruslabs/orchard/internal/command/worker"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
|
@ -24,6 +25,7 @@ func NewRootCmd() *cobra.Command {
|
|||
get.NewCommand(),
|
||||
list.NewCommand(),
|
||||
deletepkg.NewCommand(),
|
||||
stop.NewCommand(),
|
||||
)
|
||||
|
||||
addGroupedCommands(command, "Administrative Tasks:",
|
||||
|
|
|
|||
|
|
@ -0,0 +1,16 @@
|
|||
package stop
|
||||
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func NewCommand() *cobra.Command {
|
||||
command := &cobra.Command{
|
||||
Use: "stop",
|
||||
Short: "Stop resources",
|
||||
}
|
||||
|
||||
command.AddCommand(newStopVMCommand())
|
||||
|
||||
return command
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
package stop
|
||||
|
||||
import (
|
||||
"github.com/cirruslabs/orchard/pkg/client"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func newStopVMCommand() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "vm",
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: runStopVM,
|
||||
}
|
||||
}
|
||||
|
||||
func runStopVM(cmd *cobra.Command, args []string) error {
|
||||
name := args[0]
|
||||
|
||||
client, err := client.New()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = client.VMs().Stop(cmd.Context(), name)
|
||||
return err
|
||||
}
|
||||
|
|
@ -31,7 +31,6 @@ func (controller *Controller) createServiceAccount(ctx *gin.Context) responder.R
|
|||
}
|
||||
|
||||
serviceAccount.CreatedAt = time.Now()
|
||||
serviceAccount.DeletedAt = time.Time{}
|
||||
serviceAccount.UID = uuid.New().String()
|
||||
serviceAccount.Generation = 0
|
||||
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ func (controller *Controller) createVM(ctx *gin.Context) responder.Responder {
|
|||
|
||||
vm.Status = v1.VMStatusPending
|
||||
vm.CreatedAt = time.Now()
|
||||
vm.DeletedAt = time.Time{}
|
||||
vm.UID = uuid.New().String()
|
||||
vm.Generation = 0
|
||||
|
||||
|
|
@ -39,7 +38,7 @@ func (controller *Controller) createVM(ctx *gin.Context) responder.Responder {
|
|||
return responder.Code(http.StatusConflict)
|
||||
}
|
||||
|
||||
if err := txn.SetVM(&vm); err != nil {
|
||||
if err := txn.SetVM(vm); err != nil {
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
@ -68,14 +67,18 @@ func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder {
|
|||
return responder.Error(err)
|
||||
}
|
||||
|
||||
if dbVM.TerminalState() && dbVM.Status != userVM.Status {
|
||||
return responder.Code(http.StatusPreconditionFailed)
|
||||
}
|
||||
|
||||
dbVM.Status = userVM.Status
|
||||
dbVM.Generation++
|
||||
|
||||
if err := txn.SetVM(dbVM); err != nil {
|
||||
if err := txn.SetVM(*dbVM); err != nil {
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
return responder.JSON(http.StatusOK, &dbVM)
|
||||
return responder.JSON(http.StatusOK, dbVM)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -92,7 +95,7 @@ func (controller *Controller) getVM(ctx *gin.Context) responder.Responder {
|
|||
return responder.Error(err)
|
||||
}
|
||||
|
||||
return responder.JSON(http.StatusOK, &vm)
|
||||
return responder.JSON(http.StatusOK, vm)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -107,7 +110,7 @@ func (controller *Controller) listVMs(ctx *gin.Context) responder.Responder {
|
|||
return responder.Error(err)
|
||||
}
|
||||
|
||||
return responder.JSON(http.StatusOK, &vms)
|
||||
return responder.JSON(http.StatusOK, vms)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -118,25 +121,8 @@ func (controller *Controller) deleteVM(ctx *gin.Context) responder.Responder {
|
|||
|
||||
name := ctx.Param("name")
|
||||
|
||||
if ctx.Query("force") != "" {
|
||||
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
|
||||
if err := txn.DeleteVM(name); err != nil {
|
||||
return responder.Error(err)
|
||||
}
|
||||
|
||||
return responder.Code(http.StatusOK)
|
||||
})
|
||||
}
|
||||
|
||||
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
|
||||
vm, err := txn.GetVM(name)
|
||||
if err != nil {
|
||||
return responder.Error(err)
|
||||
}
|
||||
|
||||
vm.DeletedAt = time.Now()
|
||||
|
||||
if err := txn.SetVM(vm); err != nil {
|
||||
if err := txn.DeleteVM(name); err != nil {
|
||||
return responder.Error(err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder
|
|||
worker.LastSeen = currentTime
|
||||
}
|
||||
worker.CreatedAt = currentTime
|
||||
worker.DeletedAt = time.Time{}
|
||||
worker.UID = uuid.New().String()
|
||||
worker.Generation = 0
|
||||
|
||||
|
|
@ -42,11 +41,11 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder
|
|||
return responder.Code(http.StatusConflict)
|
||||
}
|
||||
|
||||
if err := txn.SetWorker(&worker); err != nil {
|
||||
if err := txn.SetWorker(worker); err != nil {
|
||||
return responder.Error(err)
|
||||
}
|
||||
|
||||
return responder.JSON(200, &worker)
|
||||
return responder.JSON(200, worker)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -70,7 +69,7 @@ func (controller *Controller) updateWorker(ctx *gin.Context) responder.Responder
|
|||
dbWorker.LastSeen = userWorker.LastSeen
|
||||
dbWorker.Generation++
|
||||
|
||||
if err := txn.SetWorker(dbWorker); err != nil {
|
||||
if err := txn.SetWorker(*dbWorker); err != nil {
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -92,7 +92,6 @@ func (controller *Controller) EnsureServiceAccount(serviceAccount *v1.ServiceAcc
|
|||
}
|
||||
|
||||
serviceAccount.CreatedAt = time.Now()
|
||||
serviceAccount.DeletedAt = time.Time{}
|
||||
serviceAccount.UID = uuid.New().String()
|
||||
serviceAccount.Generation = 0
|
||||
|
||||
|
|
|
|||
|
|
@ -22,8 +22,8 @@ func runScheduler(store storepkg.Store) error {
|
|||
}
|
||||
|
||||
func runSchedulerInner(store storepkg.Store) error {
|
||||
var vms []*v1.VM
|
||||
var workers []*v1.Worker
|
||||
var vms []v1.VM
|
||||
var workers []v1.Worker
|
||||
var err error
|
||||
|
||||
err = store.View(func(txn storepkg.Transaction) error {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
//nolint:dupl // maybe we'll figure out how to make DB resource accessors generic in the future
|
||||
package badger
|
||||
|
||||
import (
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ func (txn *Transaction) GetVM(name string) (result *v1.VM, err error) {
|
|||
return &vm, nil
|
||||
}
|
||||
|
||||
func (txn *Transaction) SetVM(vm *v1.VM) (err error) {
|
||||
func (txn *Transaction) SetVM(vm v1.VM) (err error) {
|
||||
defer func() {
|
||||
err = mapErr(err)
|
||||
}()
|
||||
|
|
@ -66,7 +66,7 @@ func (txn *Transaction) DeleteVM(name string) (err error) {
|
|||
return txn.badgerTxn.Delete(key)
|
||||
}
|
||||
|
||||
func (txn *Transaction) ListVMs() (result []*v1.VM, err error) {
|
||||
func (txn *Transaction) ListVMs() (result []v1.VM, err error) {
|
||||
defer func() {
|
||||
err = mapErr(err)
|
||||
}()
|
||||
|
|
@ -90,7 +90,7 @@ func (txn *Transaction) ListVMs() (result []*v1.VM, err error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
result = append(result, &vm)
|
||||
result = append(result, vm)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ func (txn *Transaction) GetWorker(name string) (result *v1.Worker, err error) {
|
|||
return &worker, nil
|
||||
}
|
||||
|
||||
func (txn *Transaction) SetWorker(worker *v1.Worker) (err error) {
|
||||
func (txn *Transaction) SetWorker(worker v1.Worker) (err error) {
|
||||
defer func() {
|
||||
err = mapErr(err)
|
||||
}()
|
||||
|
|
@ -66,7 +66,7 @@ func (txn *Transaction) DeleteWorker(name string) (err error) {
|
|||
return txn.badgerTxn.Delete(key)
|
||||
}
|
||||
|
||||
func (txn *Transaction) ListWorkers() (result []*v1.Worker, err error) {
|
||||
func (txn *Transaction) ListWorkers() (result []v1.Worker, err error) {
|
||||
defer func() {
|
||||
err = mapErr(err)
|
||||
}()
|
||||
|
|
@ -90,7 +90,7 @@ func (txn *Transaction) ListWorkers() (result []*v1.Worker, err error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
result = append(result, &worker)
|
||||
result = append(result, worker)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
|
|
|||
|
|
@ -9,14 +9,14 @@ type Store interface {
|
|||
|
||||
type Transaction interface {
|
||||
GetVM(name string) (result *v1.VM, err error)
|
||||
SetVM(vm *v1.VM) (err error)
|
||||
SetVM(vm v1.VM) (err error)
|
||||
DeleteVM(name string) (err error)
|
||||
ListVMs() (result []*v1.VM, err error)
|
||||
ListVMs() (result []v1.VM, err error)
|
||||
|
||||
GetWorker(name string) (result *v1.Worker, err error)
|
||||
SetWorker(worker *v1.Worker) (err error)
|
||||
SetWorker(worker v1.Worker) (err error)
|
||||
DeleteWorker(name string) (err error)
|
||||
ListWorkers() (result []*v1.Worker, err error)
|
||||
ListWorkers() (result []v1.Worker, err error)
|
||||
|
||||
GetServiceAccount(name string) (result *v1.ServiceAccount, err error)
|
||||
SetServiceAccount(serviceAccount *v1.ServiceAccount) (err error)
|
||||
|
|
|
|||
|
|
@ -4,12 +4,14 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
"go.uber.org/zap"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type VM struct {
|
||||
id string
|
||||
vmResource *v1.VM
|
||||
id string
|
||||
Resource v1.VM
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
|
@ -17,12 +19,12 @@ type VM struct {
|
|||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewVM(vmResource *v1.VM) *VM {
|
||||
func NewVM(vmResource v1.VM, logger *zap.SugaredLogger) *VM {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
vm := &VM{
|
||||
id: fmt.Sprintf("orchard-%s-%s", vmResource.Name, vmResource.UID),
|
||||
vmResource: vmResource,
|
||||
id: fmt.Sprintf("orchard-%s-%s", vmResource.Name, vmResource.UID),
|
||||
Resource: vmResource,
|
||||
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
|
|
@ -35,8 +37,13 @@ func NewVM(vmResource *v1.VM) *VM {
|
|||
go func() {
|
||||
defer vm.wg.Done()
|
||||
|
||||
// Optimistic set the status to running. Will be synced later by the worker loop.
|
||||
vm.Resource.Status = v1.VMStatusRunning
|
||||
if err := vm.run(vm.ctx); err != nil {
|
||||
vmResource.Status = v1.VMStatusFailed
|
||||
logger.Errorf("VM %s failed: %v", vm.id, err)
|
||||
vm.Resource.Status = v1.VMStatusFailed
|
||||
} else {
|
||||
vm.Resource.Status = v1.VMStatusStopped
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -44,12 +51,37 @@ func NewVM(vmResource *v1.VM) *VM {
|
|||
}
|
||||
|
||||
func (vm *VM) run(ctx context.Context) error {
|
||||
_, _, err := Tart(ctx, "clone", vm.vmResource.Image, vm.id)
|
||||
_, _, err := Tart(ctx, "clone", vm.Resource.Image, vm.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, _, err = Tart(ctx, "run", vm.id)
|
||||
if vm.Resource.Memory != 0 {
|
||||
_, _, err = Tart(ctx, "set", "--memory", strconv.FormatUint(vm.Resource.Memory, 10), vm.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if vm.Resource.CPU != 0 {
|
||||
_, _, err = Tart(ctx, "set", "--cpu", strconv.FormatUint(vm.Resource.CPU, 10), vm.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var runArgs = []string{"run"}
|
||||
|
||||
if vm.Resource.Softnet {
|
||||
runArgs = append(runArgs, "--net-softnet")
|
||||
}
|
||||
|
||||
if vm.Resource.Headless {
|
||||
runArgs = append(runArgs, "--no-graphics")
|
||||
}
|
||||
|
||||
runArgs = append(runArgs, vm.id)
|
||||
_, _, err = Tart(ctx, runArgs...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -57,13 +89,17 @@ func (vm *VM) run(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (vm *VM) Close() error {
|
||||
_, _, _ = Tart(context.Background(), "stop", "--timeout", "5", vm.id)
|
||||
func (vm *VM) Stop() error {
|
||||
_, _, _ = Tart(context.Background(), "stop", vm.id)
|
||||
|
||||
vm.cancel()
|
||||
|
||||
vm.wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vm *VM) Delete() error {
|
||||
_, _, err := Tart(context.Background(), "delete", vm.id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: failed to delete VM %s: %v", ErrFailed, vm.id, err)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var ErrFailed = errors.New("VM manager failed")
|
||||
|
|
@ -18,13 +19,13 @@ func New() *VMManager {
|
|||
}
|
||||
}
|
||||
|
||||
func (vmm *VMManager) Exists(vmResource *v1.VM) bool {
|
||||
func (vmm *VMManager) Exists(vmResource v1.VM) bool {
|
||||
_, ok := vmm.vms[vmResource.UID]
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
func (vmm *VMManager) Get(vmResource *v1.VM) (*VM, error) {
|
||||
func (vmm *VMManager) Get(vmResource v1.VM) (*VM, error) {
|
||||
managedVM, ok := vmm.vms[vmResource.UID]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: VM does not exist", ErrFailed)
|
||||
|
|
@ -33,25 +34,34 @@ func (vmm *VMManager) Get(vmResource *v1.VM) (*VM, error) {
|
|||
return managedVM, nil
|
||||
}
|
||||
|
||||
func (vmm *VMManager) Create(vmResource *v1.VM) (*VM, error) {
|
||||
func (vmm *VMManager) Create(vmResource v1.VM, logger *zap.SugaredLogger) (*VM, error) {
|
||||
if _, ok := vmm.vms[vmResource.UID]; ok {
|
||||
return nil, fmt.Errorf("%w: VM already exists", ErrFailed)
|
||||
}
|
||||
|
||||
managedVM := NewVM(vmResource)
|
||||
managedVM := NewVM(vmResource, logger)
|
||||
|
||||
vmm.vms[vmResource.UID] = managedVM
|
||||
|
||||
return managedVM, nil
|
||||
}
|
||||
|
||||
func (vmm *VMManager) Delete(vmResource *v1.VM) error {
|
||||
func (vmm *VMManager) Stop(vmResource v1.VM) error {
|
||||
managedVM, ok := vmm.vms[vmResource.UID]
|
||||
if !ok {
|
||||
return fmt.Errorf("%w: VM does not exist", ErrFailed)
|
||||
}
|
||||
|
||||
if err := managedVM.Close(); err != nil {
|
||||
return managedVM.Stop()
|
||||
}
|
||||
|
||||
func (vmm *VMManager) Delete(vmResource v1.VM) error {
|
||||
managedVM, ok := vmm.vms[vmResource.UID]
|
||||
if !ok {
|
||||
return fmt.Errorf("%w: VM does not exist", ErrFailed)
|
||||
}
|
||||
|
||||
if err := managedVM.Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -59,3 +69,13 @@ func (vmm *VMManager) Delete(vmResource *v1.VM) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vmm *VMManager) List() []*VM {
|
||||
var vms []*VM
|
||||
|
||||
for _, vm := range vmm.vms {
|
||||
vms = append(vms, vm)
|
||||
}
|
||||
|
||||
return vms
|
||||
}
|
||||
|
|
|
|||
|
|
@ -113,14 +113,12 @@ func (worker *Worker) Run(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (worker *Worker) registerWorker(ctx context.Context) error {
|
||||
workerResource := &v1.Worker{
|
||||
workerResource, err := worker.client.Workers().Create(ctx, v1.Worker{
|
||||
Meta: v1.Meta{
|
||||
Name: worker.name,
|
||||
},
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
|
||||
workerResource, err := worker.client.Workers().Create(ctx, workerResource)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -147,7 +145,7 @@ func (worker *Worker) updateWorker(ctx context.Context) error {
|
|||
|
||||
workerResource.LastSeen = time.Now()
|
||||
|
||||
if err := worker.client.Workers().Update(ctx, workerResource); err != nil {
|
||||
if _, err := worker.client.Workers().Update(ctx, *workerResource); err != nil {
|
||||
return fmt.Errorf("%w: failed to update worker in the API: %v", ErrPollFailed, err)
|
||||
}
|
||||
|
||||
|
|
@ -157,26 +155,44 @@ func (worker *Worker) updateWorker(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (worker *Worker) syncVMs(ctx context.Context) error {
|
||||
vms, err := worker.client.VMs().List(ctx)
|
||||
remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
worker.logger.Infof("syncing %d VMs...", len(vms))
|
||||
worker.logger.Infof("syncing %d VMs...", len(remoteVMs))
|
||||
|
||||
for _, vmResource := range vms {
|
||||
vmResource := vmResource
|
||||
|
||||
if vmResource.Worker != worker.name {
|
||||
continue
|
||||
}
|
||||
|
||||
if !vmResource.DeletedAt.IsZero() {
|
||||
if err := worker.deleteVM(ctx, vmResource); err != nil {
|
||||
// first try to sync local VMs with the remote ones
|
||||
for _, vm := range worker.vmm.List() {
|
||||
remoteVM, ok := remoteVMs[vm.Resource.UID]
|
||||
if !ok {
|
||||
if err := worker.deleteVM(vm.Resource); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !worker.vmm.Exists(&vmResource) {
|
||||
if err := worker.createVM(ctx, vmResource); err != nil {
|
||||
} else if remoteVM.Status != vm.Resource.Status {
|
||||
updatedVM, err := worker.client.VMs().Update(ctx, vm.Resource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
remoteVMs[vm.Resource.UID] = *updatedVM
|
||||
vm.Resource = *updatedVM
|
||||
}
|
||||
}
|
||||
|
||||
// check if need to stop any of the VMs
|
||||
for _, vmResource := range remoteVMs {
|
||||
if vmResource.Status == v1.VMStatusStopping && worker.vmm.Exists(vmResource) {
|
||||
if err := worker.stopVM(vmResource); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// finally, handle pending VMs first
|
||||
for _, vmResource := range remoteVMs {
|
||||
// handle pending VMs
|
||||
if vmResource.Status == v1.VMStatusPending && !worker.vmm.Exists(vmResource) {
|
||||
if err := worker.createVM(vmResource); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -185,19 +201,20 @@ func (worker *Worker) syncVMs(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (worker *Worker) deleteVM(ctx context.Context, vmResource v1.VM) error {
|
||||
func (worker *Worker) deleteVM(vmResource v1.VM) error {
|
||||
worker.logger.Debugf("deleting VM %s (%s)", vmResource.Name, vmResource.UID)
|
||||
|
||||
// Delete VM locally, report to the controller
|
||||
if worker.vmm.Exists(&vmResource) {
|
||||
if err := worker.vmm.Delete(&vmResource); err != nil {
|
||||
if !vmResource.TerminalState() {
|
||||
if err := worker.stopVM(vmResource); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := worker.client.VMs().Delete(ctx, vmResource.Name, true); err != nil {
|
||||
return fmt.Errorf("%w: failed to delete VM %s (%s) from the API: %v",
|
||||
ErrPollFailed, vmResource.Name, vmResource.UID, err)
|
||||
// Delete VM locally, report to the controller
|
||||
if worker.vmm.Exists(vmResource) {
|
||||
if err := worker.vmm.Delete(vmResource); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
worker.logger.Infof("deleted VM %s (%s)", vmResource.Name, vmResource.UID)
|
||||
|
|
@ -205,23 +222,30 @@ func (worker *Worker) deleteVM(ctx context.Context, vmResource v1.VM) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (worker *Worker) createVM(ctx context.Context, vmResource v1.VM) error {
|
||||
func (worker *Worker) createVM(vmResource v1.VM) error {
|
||||
worker.logger.Debugf("creating VM %s (%s)", vmResource.Name, vmResource.UID)
|
||||
|
||||
// Create or update VM locally, report to controller
|
||||
_, err := worker.vmm.Create(&vmResource)
|
||||
// Create or update VM locally
|
||||
_, err := worker.vmm.Create(vmResource, worker.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
vmResource.Status = v1.VMStatusRunning
|
||||
|
||||
if err := worker.client.VMs().Update(ctx, &vmResource); err != nil {
|
||||
return fmt.Errorf("%w: failed to update VM %s (%s) in the API: %v",
|
||||
ErrPollFailed, vmResource.Name, vmResource.UID, err)
|
||||
}
|
||||
|
||||
worker.logger.Infof("spawned VM %s (%s)", vmResource.Name, vmResource.UID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (worker *Worker) stopVM(vmResource v1.VM) error {
|
||||
worker.logger.Debugf("stopping VM %s (%s)", vmResource.Name, vmResource.UID)
|
||||
|
||||
// Create or update VM locally
|
||||
if worker.vmm.Exists(vmResource) {
|
||||
if err := worker.vmm.Stop(vmResource); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Stop VM locally
|
||||
return worker.vmm.Stop(vmResource)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,8 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
ErrFailed = errors.New("API client failed")
|
||||
ErrFailed = errors.New("API client failed")
|
||||
ErrInvalidState = errors.New("invalid state")
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
//nolint:dupl // maybe we'll figure out how to make client API accessors generic in the future
|
||||
package client
|
||||
|
||||
import (
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
//nolint:dupl // maybe we'll figure out how to make client API accessors generic in the future
|
||||
package client
|
||||
|
||||
import (
|
||||
|
|
@ -22,6 +21,23 @@ func (service *VMsService) Create(ctx context.Context, vm *v1.VM) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (service *VMsService) FindForWorker(ctx context.Context, workerName string) (map[string]v1.VM, error) {
|
||||
allVms, err := service.List(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var filteredVms = make(map[string]v1.VM)
|
||||
for _, vmResource := range allVms {
|
||||
if vmResource.Worker != workerName {
|
||||
continue
|
||||
}
|
||||
filteredVms[vmResource.UID] = vmResource
|
||||
}
|
||||
return filteredVms, nil
|
||||
}
|
||||
|
||||
func (service *VMsService) List(ctx context.Context) ([]v1.VM, error) {
|
||||
var vms []v1.VM
|
||||
|
||||
|
|
@ -46,25 +62,34 @@ func (service *VMsService) Get(ctx context.Context, name string) (*v1.VM, error)
|
|||
return &vm, nil
|
||||
}
|
||||
|
||||
func (service *VMsService) Update(ctx context.Context, vm *v1.VM) error {
|
||||
func (service *VMsService) Stop(ctx context.Context, name string) (*v1.VM, error) {
|
||||
var vm v1.VM
|
||||
|
||||
err := service.client.request(ctx, http.MethodGet, fmt.Sprintf("vms/%s", name),
|
||||
nil, &vm, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vm.Status = v1.VMStatusStopping
|
||||
|
||||
return service.Update(ctx, vm)
|
||||
}
|
||||
|
||||
func (service *VMsService) Update(ctx context.Context, vm v1.VM) (*v1.VM, error) {
|
||||
var updatedVM v1.VM
|
||||
err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("vms/%s", vm.Name),
|
||||
vm, nil, nil)
|
||||
vm, &updatedVM, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
return &updatedVM, err
|
||||
}
|
||||
|
||||
return nil
|
||||
return &updatedVM, nil
|
||||
}
|
||||
|
||||
func (service *VMsService) Delete(ctx context.Context, name string, force bool) error {
|
||||
params := map[string]string{}
|
||||
|
||||
if force {
|
||||
params["force"] = "true"
|
||||
}
|
||||
|
||||
func (service *VMsService) Delete(ctx context.Context, name string) error {
|
||||
err := service.client.request(ctx, http.MethodDelete, fmt.Sprintf("vms/%s", name),
|
||||
nil, nil, params)
|
||||
nil, nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,14 +11,14 @@ type WorkersService struct {
|
|||
client *Client
|
||||
}
|
||||
|
||||
func (service *WorkersService) Create(ctx context.Context, worker *v1.Worker) (*v1.Worker, error) {
|
||||
func (service *WorkersService) Create(ctx context.Context, worker v1.Worker) (*v1.Worker, error) {
|
||||
err := service.client.request(ctx, http.MethodPost, "workers",
|
||||
worker, &worker, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return worker, nil
|
||||
return &worker, nil
|
||||
}
|
||||
|
||||
func (service *WorkersService) List(ctx context.Context) ([]v1.Worker, error) {
|
||||
|
|
@ -45,14 +45,14 @@ func (service *WorkersService) Get(ctx context.Context, name string) (*v1.Worker
|
|||
return &worker, nil
|
||||
}
|
||||
|
||||
func (service *WorkersService) Update(ctx context.Context, worker *v1.Worker) error {
|
||||
func (service *WorkersService) Update(ctx context.Context, worker v1.Worker) (*v1.Worker, error) {
|
||||
err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("workers/%s", worker.Name),
|
||||
worker, nil, nil)
|
||||
worker, &worker, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
return &worker, nil
|
||||
}
|
||||
|
||||
func (service *WorkersService) Delete(ctx context.Context, name string) error {
|
||||
|
|
|
|||
|
|
@ -17,12 +17,6 @@ type Meta struct {
|
|||
// when receiving a POST request.
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
|
||||
// DeletedAt is a useful field for graceful resource termination.
|
||||
//
|
||||
// It is populated by the Controller with the current time
|
||||
// when receiving a DELETE request.
|
||||
DeletedAt time.Time `json:"deletedAt"`
|
||||
|
||||
// UID is a useful field for avoiding data races within a single Name.
|
||||
//
|
||||
// It is populated by the Controller when receiving a POST request.
|
||||
|
|
@ -58,6 +52,10 @@ type VM struct {
|
|||
Meta
|
||||
}
|
||||
|
||||
func (vm VM) TerminalState() bool {
|
||||
return vm.Status == VMStatusStopped || vm.Status == VMStatusFailed
|
||||
}
|
||||
|
||||
type VMStatus string
|
||||
|
||||
const (
|
||||
|
|
@ -71,4 +69,11 @@ const (
|
|||
// VMStatusFailed is set by both the Controller and the Worker to indicate a failure
|
||||
// that prevented the VM resource from reaching the VMStatusRunning state.
|
||||
VMStatusFailed VMStatus = "failed"
|
||||
|
||||
// VMStatusStopping is set by the Controller to indicate that a VM resource needs to be stopped but not deleted.
|
||||
VMStatusStopping VMStatus = "stopping"
|
||||
|
||||
// VMStatusStopped is set by both the Worker to indicate that a particular VM resource has been stopped successfully
|
||||
// (either via API or from within a VM via `sudo shutdown -now`).
|
||||
VMStatusStopped VMStatus = "stopped"
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue