Worker: define VM interface and make tart.VM conform to it (#388)
* Worker: define VM interface and make tart.VM conform to it * Hopefully produce better diff
This commit is contained in:
parent
0e0b8c74b4
commit
ea89d01760
|
|
@ -16,7 +16,7 @@ import (
|
|||
"github.com/cirruslabs/orchard/internal/tests/devcontroller"
|
||||
"github.com/cirruslabs/orchard/internal/tests/wait"
|
||||
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
|
||||
"github.com/cirruslabs/orchard/internal/worker/tart"
|
||||
"github.com/cirruslabs/orchard/internal/worker/vmmanager/tart"
|
||||
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import (
|
|||
"github.com/cirruslabs/orchard/internal/tests/devcontroller"
|
||||
"github.com/cirruslabs/orchard/internal/tests/wait"
|
||||
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
|
||||
"github.com/cirruslabs/orchard/internal/worker/tart"
|
||||
"github.com/cirruslabs/orchard/internal/worker/vmmanager/tart"
|
||||
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
"github.com/samber/lo"
|
||||
"github.com/shirou/gopsutil/v4/process"
|
||||
|
|
|
|||
|
|
@ -3,19 +3,21 @@ package worker
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cirruslabs/orchard/internal/proxy"
|
||||
"github.com/cirruslabs/orchard/internal/worker/vmmanager"
|
||||
"github.com/cirruslabs/orchard/rpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
"time"
|
||||
|
||||
"net"
|
||||
|
||||
//nolint:staticcheck // https://github.com/mitchellh/go-grpc-net-conn/pull/1
|
||||
"github.com/golang/protobuf/proto"
|
||||
grpc_net_conn "github.com/mitchellh/go-grpc-net-conn"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"net"
|
||||
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
|
@ -90,8 +92,8 @@ func (worker *Worker) handlePortForward(
|
|||
host = "localhost"
|
||||
} else {
|
||||
// Port-forwarding request to a VM, find that VM
|
||||
vm, ok := lo.Find(worker.vmm.List(), func(item *vmmanager.VM) bool {
|
||||
return item.Resource.UID == portForwardAction.VmUid
|
||||
vm, ok := lo.Find(worker.vmm.List(), func(item vmmanager.VM) bool {
|
||||
return item.Resource().UID == portForwardAction.VmUid
|
||||
})
|
||||
if !ok {
|
||||
worker.logger.Warnf("port forwarding failed: failed to get the VM: %v", err)
|
||||
|
|
@ -154,8 +156,8 @@ func (worker *Worker) handleGetIP(
|
|||
ctxWithMetadata := metadata.NewOutgoingContext(ctx, grpcMetadata)
|
||||
|
||||
// Find the desired VM
|
||||
vm, ok := lo.Find(worker.vmm.List(), func(item *vmmanager.VM) bool {
|
||||
return item.Resource.UID == resolveIP.VmUid
|
||||
vm, ok := lo.Find(worker.vmm.List(), func(item vmmanager.VM) bool {
|
||||
return item.Resource().UID == resolveIP.VmUid
|
||||
})
|
||||
if !ok {
|
||||
worker.logger.Warnf("failed to resolve IP for the VM with UID %q: VM not found",
|
||||
|
|
|
|||
|
|
@ -3,11 +3,12 @@ package worker
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/cirruslabs/orchard/internal/proxy"
|
||||
"github.com/cirruslabs/orchard/internal/worker/vmmanager"
|
||||
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
"github.com/samber/lo"
|
||||
"net"
|
||||
)
|
||||
|
||||
func (worker *Worker) watchRPCV2(ctx context.Context) error {
|
||||
|
|
@ -81,8 +82,8 @@ func (worker *Worker) handlePortForwardV2Inner(
|
|||
host = "localhost"
|
||||
} else {
|
||||
// Port-forwarding request to a VM, find that VM
|
||||
vm, ok := lo.Find(worker.vmm.List(), func(item *vmmanager.VM) bool {
|
||||
return item.Resource.UID == portForward.VMUID
|
||||
vm, ok := lo.Find(worker.vmm.List(), func(item vmmanager.VM) bool {
|
||||
return item.Resource().UID == portForward.VMUID
|
||||
})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to get the VM: %v", err)
|
||||
|
|
@ -142,8 +143,8 @@ func (worker *Worker) handleGetIPV2Inner(
|
|||
resolveIP *v1.ResolveIPAction,
|
||||
) (string, error) {
|
||||
// Find the desired VM
|
||||
vm, ok := lo.Find(worker.vmm.List(), func(item *vmmanager.VM) bool {
|
||||
return item.Resource.UID == resolveIP.VMUID
|
||||
vm, ok := lo.Find(worker.vmm.List(), func(item vmmanager.VM) bool {
|
||||
return item.Resource().UID == resolveIP.VMUID
|
||||
})
|
||||
if !ok {
|
||||
return "", fmt.Errorf("VM %q not found", resolveIP.VMUID)
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package vmmanager
|
||||
package tart
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/avast/retry-go"
|
||||
"github.com/cirruslabs/chacha/pkg/localnetworkhelper"
|
||||
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
|
||||
"github.com/cirruslabs/orchard/internal/worker/tart"
|
||||
"github.com/cirruslabs/orchard/pkg/client"
|
||||
"github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
mapset "github.com/deckarep/golang-set/v2"
|
||||
|
|
@ -30,7 +29,7 @@ var ErrVMFailed = errors.New("VM failed")
|
|||
|
||||
type VM struct {
|
||||
onDiskName ondiskname.OnDiskName
|
||||
Resource v1.VM
|
||||
resource v1.VM
|
||||
logger *zap.SugaredLogger
|
||||
|
||||
// Backward compatibility with v1.VM specification's "Status" field
|
||||
|
|
@ -74,7 +73,7 @@ func NewVM(
|
|||
|
||||
vm := &VM{
|
||||
onDiskName: ondiskname.NewFromResource(vmResource),
|
||||
Resource: vmResource,
|
||||
resource: vmResource,
|
||||
logger: logger.With(
|
||||
"vm_uid", vmResource.UID,
|
||||
"vm_name", vmResource.Name,
|
||||
|
|
@ -101,7 +100,7 @@ func NewVM(
|
|||
|
||||
pullStartedAt := time.Now()
|
||||
|
||||
_, _, err := tart.Tart(vm.ctx, vm.logger, "pull", vm.Resource.Image)
|
||||
_, _, err := Tart(vm.ctx, vm.logger, "pull", vm.resource.Image)
|
||||
if err != nil {
|
||||
select {
|
||||
case <-vm.ctx.Done():
|
||||
|
|
@ -114,8 +113,8 @@ func NewVM(
|
|||
}
|
||||
|
||||
vmPullTimeHistogram.Record(vm.ctx, time.Since(pullStartedAt).Seconds(), metric.WithAttributes(
|
||||
attribute.String("worker", vm.Resource.Worker),
|
||||
attribute.String("image", vm.Resource.Image),
|
||||
attribute.String("worker", vm.resource.Worker),
|
||||
attribute.String("image", vm.resource.Image),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
@ -141,6 +140,15 @@ func NewVM(
|
|||
return vm
|
||||
}
|
||||
|
||||
func (vm *VM) Resource() v1.VM {
|
||||
return vm.resource
|
||||
}
|
||||
|
||||
func (vm *VM) SetResource(vmResource v1.VM) {
|
||||
vm.resource = vmResource
|
||||
vm.resource.ObservedGeneration = vmResource.Generation
|
||||
}
|
||||
|
||||
func (vm *VM) OnDiskName() ondiskname.OnDiskName {
|
||||
return vm.onDiskName
|
||||
}
|
||||
|
|
@ -220,7 +228,7 @@ func (vm *VM) conditionTypeToCondition(conditionType v1.ConditionType) v1.Condit
|
|||
func (vm *VM) cloneAndConfigure(ctx context.Context) error {
|
||||
vm.setStatusMessage("cloning VM...")
|
||||
|
||||
_, _, err := tart.Tart(ctx, vm.logger, "clone", vm.Resource.Image, vm.id())
|
||||
_, _, err := Tart(ctx, vm.logger, "clone", vm.resource.Image, vm.id())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -228,7 +236,7 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error {
|
|||
vm.conditions.Remove(v1.ConditionTypeCloning)
|
||||
|
||||
// Image FQN feature, see https://github.com/cirruslabs/orchard/issues/164
|
||||
fqnRaw, _, err := tart.Tart(ctx, vm.logger, "fqn", vm.Resource.Image)
|
||||
fqnRaw, _, err := Tart(ctx, vm.logger, "fqn", vm.resource.Image)
|
||||
if err == nil {
|
||||
fqn := strings.TrimSpace(fqnRaw)
|
||||
vm.imageFQN.Store(&fqn)
|
||||
|
|
@ -237,14 +245,14 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error {
|
|||
// Set memory
|
||||
vm.setStatusMessage("configuring VM...")
|
||||
|
||||
memory := vm.Resource.AssignedMemory
|
||||
memory := vm.resource.AssignedMemory
|
||||
|
||||
if memory == 0 {
|
||||
memory = vm.Resource.Memory
|
||||
memory = vm.resource.Memory
|
||||
}
|
||||
|
||||
if memory != 0 {
|
||||
_, _, err = tart.Tart(ctx, vm.logger, "set", "--memory",
|
||||
_, _, err = Tart(ctx, vm.logger, "set", "--memory",
|
||||
strconv.FormatUint(memory, 10), vm.id())
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -252,22 +260,22 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// Set CPU
|
||||
cpu := vm.Resource.AssignedCPU
|
||||
cpu := vm.resource.AssignedCPU
|
||||
|
||||
if cpu == 0 {
|
||||
cpu = vm.Resource.CPU
|
||||
cpu = vm.resource.CPU
|
||||
}
|
||||
|
||||
if cpu != 0 {
|
||||
_, _, err = tart.Tart(ctx, vm.logger, "set", "--cpu",
|
||||
_, _, err = Tart(ctx, vm.logger, "set", "--cpu",
|
||||
strconv.FormatUint(cpu, 10), vm.id())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if diskSize := vm.Resource.DiskSize; diskSize != 0 {
|
||||
_, _, err = tart.Tart(ctx, vm.logger, "set", "--disk-size",
|
||||
if diskSize := vm.resource.DiskSize; diskSize != 0 {
|
||||
_, _, err = Tart(ctx, vm.logger, "set", "--disk-size",
|
||||
strconv.FormatUint(diskSize, 10), vm.id())
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -332,13 +340,13 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error {
|
|||
// to avoid collisions when cloning from an OCI image on multiple hosts[1].
|
||||
//
|
||||
// [1]: https://github.com/cirruslabs/orchard/issues/181
|
||||
_, _, err = tart.Tart(ctx, vm.logger, "set", "--random-mac", vm.id())
|
||||
_, _, err = Tart(ctx, vm.logger, "set", "--random-mac", vm.id())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if vm.Resource.RandomSerial {
|
||||
_, _, err = tart.Tart(ctx, vm.logger, "set", "--random-serial", vm.id())
|
||||
if vm.resource.RandomSerial {
|
||||
_, _, err = Tart(ctx, vm.logger, "set", "--random-serial", vm.id())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -352,47 +360,47 @@ func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) {
|
|||
|
||||
// Launch the startup script goroutine as close as possible
|
||||
// to the VM startup (below) to avoid "tart ip" timing out
|
||||
if vm.Resource.StartupScript != nil {
|
||||
if vm.resource.StartupScript != nil {
|
||||
vm.setStatusMessage("VM started, running startup script...")
|
||||
|
||||
go vm.runScript(vm.Resource.StartupScript, eventStreamer)
|
||||
go vm.runScript(vm.resource.StartupScript, eventStreamer)
|
||||
} else {
|
||||
vm.setStatusMessage("VM started")
|
||||
}
|
||||
|
||||
var runArgs = []string{"run"}
|
||||
|
||||
if vm.Resource.NetSoftnetDeprecated || vm.Resource.NetSoftnet {
|
||||
if vm.resource.NetSoftnetDeprecated || vm.resource.NetSoftnet {
|
||||
runArgs = append(runArgs, "--net-softnet")
|
||||
}
|
||||
if len(vm.Resource.NetSoftnetAllow) != 0 {
|
||||
runArgs = append(runArgs, "--net-softnet-allow", strings.Join(vm.Resource.NetSoftnetAllow, ","))
|
||||
if len(vm.resource.NetSoftnetAllow) != 0 {
|
||||
runArgs = append(runArgs, "--net-softnet-allow", strings.Join(vm.resource.NetSoftnetAllow, ","))
|
||||
}
|
||||
if len(vm.Resource.NetSoftnetBlock) != 0 {
|
||||
runArgs = append(runArgs, "--net-softnet-block", strings.Join(vm.Resource.NetSoftnetBlock, ","))
|
||||
if len(vm.resource.NetSoftnetBlock) != 0 {
|
||||
runArgs = append(runArgs, "--net-softnet-block", strings.Join(vm.resource.NetSoftnetBlock, ","))
|
||||
}
|
||||
if vm.Resource.NetBridged != "" {
|
||||
runArgs = append(runArgs, fmt.Sprintf("--net-bridged=%s", vm.Resource.NetBridged))
|
||||
if vm.resource.NetBridged != "" {
|
||||
runArgs = append(runArgs, fmt.Sprintf("--net-bridged=%s", vm.resource.NetBridged))
|
||||
}
|
||||
|
||||
if vm.Resource.Headless {
|
||||
if vm.resource.Headless {
|
||||
runArgs = append(runArgs, "--no-graphics")
|
||||
}
|
||||
|
||||
if vm.Resource.Nested {
|
||||
if vm.resource.Nested {
|
||||
runArgs = append(runArgs, "--nested")
|
||||
}
|
||||
|
||||
if vm.Resource.Suspendable {
|
||||
if vm.resource.Suspendable {
|
||||
runArgs = append(runArgs, "--suspendable")
|
||||
}
|
||||
|
||||
for _, hostDir := range vm.Resource.HostDirs {
|
||||
for _, hostDir := range vm.resource.HostDirs {
|
||||
runArgs = append(runArgs, fmt.Sprintf("--dir=%s", hostDir.String()))
|
||||
}
|
||||
|
||||
runArgs = append(runArgs, vm.id())
|
||||
_, _, err := tart.Tart(ctx, vm.logger, runArgs...)
|
||||
_, _, err := Tart(ctx, vm.logger, runArgs...)
|
||||
if err != nil {
|
||||
select {
|
||||
case <-vm.ctx.Done():
|
||||
|
|
@ -417,8 +425,8 @@ func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) {
|
|||
func (vm *VM) IP(ctx context.Context) (string, error) {
|
||||
// Bridged networking is problematic, so try with
|
||||
// the agent resolver first using a small timeout
|
||||
if vm.Resource.NetBridged != "" {
|
||||
stdout, _, err := tart.Tart(ctx, vm.logger, "ip", "--wait", "5",
|
||||
if vm.resource.NetBridged != "" {
|
||||
stdout, _, err := Tart(ctx, vm.logger, "ip", "--wait", "5",
|
||||
"--resolver", "agent", vm.id())
|
||||
if err == nil {
|
||||
return strings.TrimSpace(stdout), nil
|
||||
|
|
@ -427,13 +435,13 @@ func (vm *VM) IP(ctx context.Context) (string, error) {
|
|||
|
||||
args := []string{"ip", "--wait", "60"}
|
||||
|
||||
if vm.Resource.NetBridged != "" {
|
||||
if vm.resource.NetBridged != "" {
|
||||
args = append(args, "--resolver", "arp")
|
||||
}
|
||||
|
||||
args = append(args, vm.id())
|
||||
|
||||
stdout, _, err := tart.Tart(ctx, vm.logger, args...)
|
||||
stdout, _, err := Tart(ctx, vm.logger, args...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
@ -458,7 +466,7 @@ func (vm *VM) Suspend() <-chan error {
|
|||
vm.conditions.Add(v1.ConditionTypeSuspending)
|
||||
|
||||
go func() {
|
||||
_, _, err := tart.Tart(context.Background(), zap.NewNop().Sugar(), "suspend", vm.id())
|
||||
_, _, err := Tart(context.Background(), zap.NewNop().Sugar(), "suspend", vm.id())
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to suspend VM: %w", err)
|
||||
vm.setErr(err)
|
||||
|
|
@ -491,7 +499,7 @@ func (vm *VM) Stop() <-chan error {
|
|||
|
||||
go func() {
|
||||
// Try to gracefully terminate the VM
|
||||
_, _, _ = tart.Tart(context.Background(), zap.NewNop().Sugar(), "stop", "--timeout", "5", vm.id())
|
||||
_, _, _ = Tart(context.Background(), zap.NewNop().Sugar(), "stop", "--timeout", "5", vm.id())
|
||||
|
||||
// Terminate the VM goroutine ("tart pull", "tart clone", "tart run", etc.) via the context
|
||||
vm.cancel()
|
||||
|
|
@ -504,11 +512,6 @@ func (vm *VM) Stop() <-chan error {
|
|||
return errCh
|
||||
}
|
||||
|
||||
func (vm *VM) UpdateSpec(vmResource v1.VM) {
|
||||
vm.Resource = vmResource
|
||||
vm.Resource.ObservedGeneration = vmResource.Generation
|
||||
}
|
||||
|
||||
func (vm *VM) Start(eventStreamer *client.EventStreamer) {
|
||||
vm.setStatusMessage("Starting VM")
|
||||
vm.conditions.Add(v1.ConditionTypeRunning)
|
||||
|
|
@ -535,7 +538,7 @@ func (vm *VM) Delete() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
_, _, err := tart.Tart(context.Background(), vm.logger, "delete", vm.id())
|
||||
_, _, err := Tart(context.Background(), vm.logger, "delete", vm.id())
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: failed to delete VM: %v", ErrVMFailed, err)
|
||||
}
|
||||
|
|
@ -685,7 +688,7 @@ func (vm *VM) runScript(script *v1.VMScript, eventStreamer *client.EventStreamer
|
|||
})
|
||||
}
|
||||
|
||||
err := vm.shell(vm.ctx, vm.Resource.Username, vm.Resource.Password,
|
||||
err := vm.shell(vm.ctx, vm.resource.Username, vm.resource.Password,
|
||||
script.ScriptContent, script.Env, consumeLine)
|
||||
if err != nil {
|
||||
vm.setErr(fmt.Errorf("%w: failed to run startup script: %v", ErrVMFailed, err))
|
||||
|
|
@ -1,16 +1,37 @@
|
|||
package vmmanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
|
||||
"github.com/cirruslabs/orchard/pkg/client"
|
||||
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
)
|
||||
|
||||
type VM interface {
|
||||
Resource() v1.VM
|
||||
SetResource(vmResource v1.VM)
|
||||
OnDiskName() ondiskname.OnDiskName
|
||||
ImageFQN() *string
|
||||
Status() v1.VMStatus
|
||||
StatusMessage() string
|
||||
Err() error
|
||||
Conditions() []v1.Condition
|
||||
|
||||
Start(eventStreamer *client.EventStreamer)
|
||||
Suspend() <-chan error
|
||||
IP(ctx context.Context) (string, error)
|
||||
Stop() <-chan error
|
||||
Delete() error
|
||||
}
|
||||
|
||||
type VMManager struct {
|
||||
vms map[ondiskname.OnDiskName]*VM
|
||||
vms map[ondiskname.OnDiskName]VM
|
||||
}
|
||||
|
||||
func New() *VMManager {
|
||||
return &VMManager{
|
||||
vms: map[ondiskname.OnDiskName]*VM{},
|
||||
vms: map[ondiskname.OnDiskName]VM{},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -20,13 +41,13 @@ func (vmm *VMManager) Exists(key ondiskname.OnDiskName) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
func (vmm *VMManager) Get(key ondiskname.OnDiskName) (*VM, bool) {
|
||||
func (vmm *VMManager) Get(key ondiskname.OnDiskName) (VM, bool) {
|
||||
vm, ok := vmm.vms[key]
|
||||
|
||||
return vm, ok
|
||||
}
|
||||
|
||||
func (vmm *VMManager) Put(key ondiskname.OnDiskName, vm *VM) {
|
||||
func (vmm *VMManager) Put(key ondiskname.OnDiskName, vm VM) {
|
||||
vmm.vms[key] = vm
|
||||
}
|
||||
|
||||
|
|
@ -38,8 +59,8 @@ func (vmm *VMManager) Len() int {
|
|||
return len(vmm.vms)
|
||||
}
|
||||
|
||||
func (vmm *VMManager) List() []*VM {
|
||||
var vms []*VM
|
||||
func (vmm *VMManager) List() []VM {
|
||||
var vms []VM
|
||||
|
||||
for _, vm := range vmm.vms {
|
||||
vms = append(vms, vm)
|
||||
|
|
|
|||
|
|
@ -14,8 +14,8 @@ import (
|
|||
"github.com/cirruslabs/orchard/internal/worker/dhcpleasetime"
|
||||
"github.com/cirruslabs/orchard/internal/worker/iokitregistry"
|
||||
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
|
||||
"github.com/cirruslabs/orchard/internal/worker/tart"
|
||||
"github.com/cirruslabs/orchard/internal/worker/vmmanager"
|
||||
"github.com/cirruslabs/orchard/internal/worker/vmmanager/tart"
|
||||
"github.com/cirruslabs/orchard/pkg/client"
|
||||
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
"github.com/cirruslabs/orchard/rpc"
|
||||
|
|
@ -287,7 +287,7 @@ func (worker *Worker) syncVMs(ctx context.Context, updateVM func(context.Context
|
|||
remoteVMsIndex[onDiskName] = &remoteVMCopy
|
||||
}
|
||||
|
||||
localVMsIndex := map[ondiskname.OnDiskName]*vmmanager.VM{}
|
||||
localVMsIndex := map[ondiskname.OnDiskName]vmmanager.VM{}
|
||||
for _, vm := range worker.vmm.List() {
|
||||
onDiskName := vm.OnDiskName()
|
||||
allKeys.Add(onDiskName)
|
||||
|
|
@ -297,7 +297,7 @@ func (worker *Worker) syncVMs(ctx context.Context, updateVM func(context.Context
|
|||
worker.logger.Infof("syncing %d local VMs against %d remote VMs...",
|
||||
len(localVMsIndex), len(remoteVMsIndex))
|
||||
|
||||
var pairs []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM]
|
||||
var pairs []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, vmmanager.VM]
|
||||
|
||||
for onDiskName := range allKeys.Iter() {
|
||||
vmResource := remoteVMsIndex[onDiskName]
|
||||
|
|
@ -362,14 +362,14 @@ func (worker *Worker) syncVMs(ctx context.Context, updateVM func(context.Context
|
|||
return err
|
||||
}
|
||||
case ActionMonitorRunning:
|
||||
if vmResource.Generation != vm.Resource.Generation {
|
||||
if vmResource.Generation != vm.Resource().Generation {
|
||||
// VM specification changed, reboot the VM for the changes to take effect
|
||||
stoppingOrSuspending := v1.ConditionIsTrue(vm.Conditions(), v1.ConditionTypeStopping) ||
|
||||
v1.ConditionIsTrue(vm.Conditions(), v1.ConditionTypeSuspending)
|
||||
|
||||
if v1.ConditionIsTrue(vm.Conditions(), v1.ConditionTypeRunning) && !stoppingOrSuspending {
|
||||
// VM is running, suspend or stop it first
|
||||
shouldStop := vmResource.PowerState == v1.PowerStateStopped || !vm.Resource.Suspendable
|
||||
shouldStop := vmResource.PowerState == v1.PowerStateStopped || !vm.Resource().Suspendable
|
||||
|
||||
if shouldStop {
|
||||
vm.Stop()
|
||||
|
|
@ -380,7 +380,7 @@ func (worker *Worker) syncVMs(ctx context.Context, updateVM func(context.Context
|
|||
|
||||
if v1.ConditionIsFalse(vm.Conditions(), v1.ConditionTypeRunning) && !stoppingOrSuspending {
|
||||
// VM stopped, update its specification
|
||||
vm.UpdateSpec(*vmResource)
|
||||
vm.SetResource(*vmResource)
|
||||
|
||||
if vmResource.PowerState == v1.PowerStateRunning {
|
||||
// Start the VM
|
||||
|
|
@ -398,8 +398,8 @@ func (worker *Worker) syncVMs(ctx context.Context, updateVM func(context.Context
|
|||
updateNeeded = true
|
||||
}
|
||||
|
||||
if vmResource.ObservedGeneration != vm.Resource.ObservedGeneration {
|
||||
vmResource.ObservedGeneration = vm.Resource.ObservedGeneration
|
||||
if vmResource.ObservedGeneration != vm.Resource().ObservedGeneration {
|
||||
vmResource.ObservedGeneration = vm.Resource().ObservedGeneration
|
||||
|
||||
updateNeeded = true
|
||||
}
|
||||
|
|
@ -521,7 +521,7 @@ func (worker *Worker) syncOnDiskVMs(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (worker *Worker) deleteVM(vm *vmmanager.VM) error {
|
||||
func (worker *Worker) deleteVM(vm vmmanager.VM) error {
|
||||
<-vm.Stop()
|
||||
|
||||
if err := vm.Delete(); err != nil {
|
||||
|
|
@ -536,7 +536,7 @@ func (worker *Worker) deleteVM(vm *vmmanager.VM) error {
|
|||
func (worker *Worker) createVM(odn ondiskname.OnDiskName, vmResource v1.VM) {
|
||||
eventStreamer := worker.client.VMs().StreamEvents(vmResource.Name)
|
||||
|
||||
vm := vmmanager.NewVM(vmResource, eventStreamer, worker.vmPullTimeHistogram,
|
||||
vm := tart.NewVM(vmResource, eventStreamer, worker.vmPullTimeHistogram,
|
||||
worker.localNetworkHelper, worker.logger)
|
||||
|
||||
worker.vmm.Put(odn, vm)
|
||||
|
|
@ -558,8 +558,8 @@ func (worker *Worker) requestVMSyncing() {
|
|||
}
|
||||
}
|
||||
|
||||
func sortNonExistentAndFailedFirst(input []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM]) {
|
||||
slices.SortStableFunc(input, func(left, right lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM]) int {
|
||||
func sortNonExistentAndFailedFirst(input []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, vmmanager.VM]) {
|
||||
slices.SortStableFunc(input, func(left, right lo.Tuple3[ondiskname.OnDiskName, *v1.VM, vmmanager.VM]) int {
|
||||
_, leftVM, _ := lo.Unpack3(left)
|
||||
_, rightVM, _ := lo.Unpack3(right)
|
||||
|
||||
|
|
|
|||
|
|
@ -5,17 +5,22 @@ import (
|
|||
|
||||
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
|
||||
"github.com/cirruslabs/orchard/internal/worker/vmmanager"
|
||||
"github.com/cirruslabs/orchard/internal/worker/vmmanager/tart"
|
||||
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSortNonExistentAndFailedFirst(t *testing.T) {
|
||||
newVMTuple := func(name string, vmResource *v1.VM) lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM] {
|
||||
return lo.T3(ondiskname.New(name, name, 0), vmResource, &vmmanager.VM{})
|
||||
newVMTuple := func(name string, vmResource *v1.VM) lo.Tuple3[ondiskname.OnDiskName, *v1.VM, vmmanager.VM] {
|
||||
return lo.T3[ondiskname.OnDiskName, *v1.VM, vmmanager.VM](
|
||||
ondiskname.New(name, name, 0),
|
||||
vmResource,
|
||||
&tart.VM{},
|
||||
)
|
||||
}
|
||||
|
||||
target := []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM]{
|
||||
target := []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, vmmanager.VM]{
|
||||
newVMTuple("test1", &v1.VM{Status: v1.VMStatusFailed}),
|
||||
newVMTuple("test2", &v1.VM{Status: v1.VMStatusPending}),
|
||||
newVMTuple("test3", &v1.VM{Status: v1.VMStatusRunning}),
|
||||
|
|
@ -25,7 +30,7 @@ func TestSortNonExistentAndFailedFirst(t *testing.T) {
|
|||
|
||||
sortNonExistentAndFailedFirst(target)
|
||||
|
||||
expected := []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, *vmmanager.VM]{
|
||||
expected := []lo.Tuple3[ondiskname.OnDiskName, *v1.VM, vmmanager.VM]{
|
||||
newVMTuple("test5", nil),
|
||||
newVMTuple("test1", &v1.VM{Status: v1.VMStatusFailed}),
|
||||
newVMTuple("test4", &v1.VM{Status: v1.VMStatusFailed}),
|
||||
|
|
|
|||
Loading…
Reference in New Issue