diff --git a/go.mod b/go.mod index 8156143..49bc8b0 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.2 github.com/pterm/pterm v0.12.82 + github.com/puzpuzpuz/xsync/v4 v4.0.0 github.com/samber/lo v1.52.0 github.com/samber/mo v1.16.0 github.com/sethvargo/go-password v0.3.1 @@ -42,6 +43,7 @@ require ( golang.org/x/crypto v0.47.0 golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa golang.org/x/net v0.49.0 + golang.org/x/sync v0.19.0 golang.org/x/term v0.39.0 google.golang.org/grpc v1.78.0 google.golang.org/protobuf v1.36.11 @@ -123,7 +125,6 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect - github.com/puzpuzpuz/xsync/v4 v4.0.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/spf13/pflag v1.0.9 // indirect github.com/tklauser/go-sysconf v0.3.16 // indirect @@ -142,7 +143,6 @@ require ( go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/arch v0.16.0 // indirect - golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/text v0.33.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect diff --git a/internal/command/dev/dev.go b/internal/command/dev/dev.go index 0771309..6977182 100644 --- a/internal/command/dev/dev.go +++ b/internal/command/dev/dev.go @@ -3,20 +3,26 @@ package dev import ( + "context" "errors" "fmt" + "net" + "net/http" "os" "path" "path/filepath" "github.com/cirruslabs/orchard/internal/config" "github.com/cirruslabs/orchard/internal/controller" + "github.com/cirruslabs/orchard/internal/dialer" + "github.com/cirruslabs/orchard/internal/echoserver" "github.com/cirruslabs/orchard/internal/netconstants" "github.com/cirruslabs/orchard/internal/worker" "github.com/cirruslabs/orchard/pkg/client" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/spf13/cobra" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ErrFailed = errors.New("failed to run development controller and worker") @@ -25,6 +31,9 @@ var devDataDirPath string var apiPrefix string var stringToStringResources map[string]string var experimentalRPCV2 bool +var addressPprof string +var synthetic bool +var workers int func NewCommand() *cobra.Command { command := &cobra.Command{ @@ -42,11 +51,35 @@ func NewCommand() *cobra.Command { "resources that the development worker will provide") command.Flags().BoolVar(&experimentalRPCV2, "experimental-rpc-v2", false, "enable experimental RPC v2 (https://github.com/cirruslabs/orchard/issues/235)") + command.Flags().StringVar(&addressPprof, "listen-pprof", "", + "start pprof HTTP server on localhost:6060 for diagnostic purposes (e.g. \"localhost:6060\")") + command.Flags().BoolVar(&synthetic, "synthetic", false, + "do not instantiate real Tart VM, use synthetic in-memory VMs suitable for load testing") + command.Flags().IntVar(&workers, "workers", 1, "number of workers to start") return command } func runDev(cmd *cobra.Command, args []string) error { + // Initialize the logger + logger, err := zap.NewDevelopment() + if err != nil { + return err + } + defer func() { + if syncErr := logger.Sync(); syncErr != nil && err == nil { + err = syncErr + } + }() + + if addressPprof != "" { + go func() { + if err := http.ListenAndServe(addressPprof, nil); err != nil { + logger.Sugar().Errorf("pprof server failed: %v", err) + } + }() + } + if !filepath.IsAbs(devDataDirPath) { pwd, err := os.Getwd() if err != nil { @@ -71,29 +104,69 @@ func runDev(cmd *cobra.Command, args []string) error { additionalControllerOpts = append(additionalControllerOpts, controller.WithExperimentalRPCV2()) } - devController, devWorker, err := CreateDevControllerAndWorker(devDataDirPath, - fmt.Sprintf(":%d", netconstants.DefaultControllerPort), resources, - additionalControllerOpts, nil) + group, ctx := errgroup.WithContext(cmd.Context()) + + var additionalWorkerOpts []worker.Option + + if synthetic { + // Use TCP echo server to partially emulate VM's TCP/IP stack, + // this way we get port-forwarding working when running in + // synthetic mode + echoServer, err := echoserver.New() + if err != nil { + return err + } + + group.Go(func() error { + return echoServer.Run(ctx) + }) + + dialer := dialer.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) { + dialer := net.Dialer{} + + return dialer.DialContext(ctx, "tcp", echoServer.Addr()) + }) + + additionalControllerOpts = append(additionalControllerOpts, controller.WithSynthetic()) + additionalWorkerOpts = append(additionalWorkerOpts, worker.WithSynthetic(), + worker.WithDialer(dialer)) + } + + devController, devClient, err := CreateDevController(devDataDirPath, fmt.Sprintf(":%d", + netconstants.DefaultControllerPort), additionalControllerOpts, logger) if err != nil { return err } - defer devWorker.Close() - errChan := make(chan error, 2) + group.Go(func() error { + return devController.Run(ctx) + }) - go func() { - if err := devController.Run(cmd.Context()); err != nil { - errChan <- err - } - }() + workerName, err := worker.DefaultName() + if err != nil { + return err + } - go func() { - if err := devWorker.Run(cmd.Context()); err != nil { - errChan <- err - } - }() + for i := range workers { + group.Go(func() error { + workerNameLocal := workerName - return <-errChan + if workers > 1 { + workerNameLocal = fmt.Sprintf("%s-%d", workerName, i) + } + + devWorker, err := CreateDevWorker(devClient, resources, + append(additionalWorkerOpts, worker.WithName(workerNameLocal)), logger) + if err != nil { + return err + } + defer devWorker.Close() + + return devWorker.Run(ctx) + }) + } + + return group.Wait() } func CreateDevControllerAndWorker( @@ -114,6 +187,26 @@ func CreateDevControllerAndWorker( } }() + devController, defaultClient, err := CreateDevController(devDataDirPath, controllerListenAddr, + additionalControllerOpts, logger) + if err != nil { + return nil, nil, err + } + + devWorker, err := CreateDevWorker(defaultClient, resources, additionalWorkerOpts, logger) + if err != nil { + return nil, nil, err + } + + return devController, devWorker, nil +} + +func CreateDevController( + devDataDirPath string, + controllerListenAddr string, + additionalControllerOpts []controller.Option, + logger *zap.Logger, +) (*controller.Controller, *client.Client, error) { dataDir, err := controller.NewDataDir(devDataDirPath) if err != nil { return nil, nil, err @@ -139,18 +232,6 @@ func CreateDevControllerAndWorker( return nil, nil, err } - workerOpts := []worker.Option{ - worker.WithResources(resources), - worker.WithLogger(logger), - } - - workerOpts = append(workerOpts, additionalWorkerOpts...) - - devWorker, err := worker.New(defaultClient, workerOpts...) - if err != nil { - return nil, nil, err - } - // set local-dev context as active configHandle, err := config.NewHandle() if err != nil { @@ -166,5 +247,21 @@ func CreateDevControllerAndWorker( return nil, nil, err } - return devController, devWorker, nil + return devController, defaultClient, nil +} + +func CreateDevWorker( + client *client.Client, + resources v1.Resources, + additionalWorkerOpts []worker.Option, + logger *zap.Logger, +) (*worker.Worker, error) { + workerOpts := []worker.Option{ + worker.WithResources(resources), + worker.WithLogger(logger), + } + + workerOpts = append(workerOpts, additionalWorkerOpts...) + + return worker.New(client, workerOpts...) } diff --git a/internal/command/worker/run.go b/internal/command/worker/run.go index f704d14..87752f4 100644 --- a/internal/command/worker/run.go +++ b/internal/command/worker/run.go @@ -3,11 +3,20 @@ package worker import ( + "context" "errors" "fmt" + "io" + "net" + "net/http" + _ "net/http/pprof" + "os" + "strings" + "github.com/cirruslabs/chacha/pkg/localnetworkhelper" "github.com/cirruslabs/chacha/pkg/privdrop" "github.com/cirruslabs/orchard/internal/bootstraptoken" + "github.com/cirruslabs/orchard/internal/dialer" "github.com/cirruslabs/orchard/internal/netconstants" "github.com/cirruslabs/orchard/internal/worker" "github.com/cirruslabs/orchard/pkg/client" @@ -16,11 +25,6 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "gopkg.in/natefinch/lumberjack.v2" - "io" - "net/http" - _ "net/http/pprof" - "os" - "strings" ) var ( @@ -98,8 +102,12 @@ func runWorker(cmd *cobra.Command, args []string) (err error) { return err } - clientOpts = append(clientOpts, client.WithDialContext(localNetworkHelper.PrivilegedDialContext)) - workerOpts = append(workerOpts, worker.WithLocalNetworkHelper(localNetworkHelper)) + dialer := dialer.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) { + return localNetworkHelper.PrivilegedDialContext(ctx, network, addr) + }) + + clientOpts = append(clientOpts, client.WithDialer(dialer)) + workerOpts = append(workerOpts, worker.WithDialer(dialer)) if err := privdrop.Drop(username); err != nil { return err diff --git a/internal/controller/api_workers.go b/internal/controller/api_workers.go index e7480d1..009fbde 100644 --- a/internal/controller/api_workers.go +++ b/internal/controller/api_workers.go @@ -59,7 +59,7 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder return responder.Code(http.StatusInternalServerError) } - if uint(len(workers)+1) > controller.maxWorkersPerLicense { + if !controller.synthetic && uint(len(workers)+1) > controller.maxWorkersPerLicense { return responder.JSON(http.StatusConflict, NewErrorResponse("cannot register a new worker "+ "because the license capacity of %d workers has been reached, "+ "consider upgrading at https://tart.run/licensing/", controller.maxWorkersPerLicense)) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 778aaf1..9ade7af 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -66,6 +66,7 @@ type Controller struct { disableDBCompression bool pingInterval time.Duration prometheusMetrics bool + synthetic bool sshListenAddr string sshSigner ssh.Signer diff --git a/internal/controller/option.go b/internal/controller/option.go index a005d68..efcfb09 100644 --- a/internal/controller/option.go +++ b/internal/controller/option.go @@ -84,6 +84,12 @@ func WithPrometheusMetrics() Option { } } +func WithSynthetic() Option { + return func(controller *Controller) { + controller.synthetic = true + } +} + func WithLogger(logger *zap.Logger) Option { return func(controller *Controller) { controller.logger = logger.Sugar() diff --git a/internal/dialer/dialer.go b/internal/dialer/dialer.go new file mode 100644 index 0000000..6570b57 --- /dev/null +++ b/internal/dialer/dialer.go @@ -0,0 +1,21 @@ +package dialer + +import ( + "context" + "net" +) + +type Dialer interface { + Dial(network, addr string) (net.Conn, error) + DialContext(ctx context.Context, network, addr string) (net.Conn, error) +} + +type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) + +func (f DialFunc) Dial(network, addr string) (net.Conn, error) { + return f(context.Background(), network, addr) +} + +func (f DialFunc) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + return f(ctx, network, addr) +} diff --git a/internal/echoserver/echoserver.go b/internal/echoserver/echoserver.go new file mode 100644 index 0000000..19331b7 --- /dev/null +++ b/internal/echoserver/echoserver.go @@ -0,0 +1,82 @@ +package echoserver + +import ( + "context" + "errors" + "io" + "net" + "strings" + "syscall" + + "golang.org/x/sync/errgroup" +) + +type EchoServer struct { + listener net.Listener +} + +func New() (*EchoServer, error) { + listener, err := net.Listen("tcp", ":0") + if err != nil { + return nil, err + } + + return &EchoServer{ + listener: listener, + }, nil +} + +func (echoServer *EchoServer) Addr() string { + return strings.ReplaceAll(echoServer.listener.Addr().String(), "[::]", "127.0.0.1") +} + +func (echoServer *EchoServer) Run(ctx context.Context) error { + group, ctx := errgroup.WithContext(ctx) + + group.Go(func() error { + <-ctx.Done() + + return echoServer.listener.Close() + }) + + group.Go(func() error { + for { + conn, err := echoServer.listener.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return nil + } + + return err + } + + group.Go(func() error { + defer conn.Close() + + buf := make([]byte, 4096) + + for { + n, err := conn.Read(buf) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + + return err + } + + _, err = conn.Write(buf[:n]) + if err != nil { + if errors.Is(err, syscall.EPIPE) { + return nil + } + + return err + } + } + }) + } + }) + + return group.Wait() +} diff --git a/internal/echoserver/echoserver_test.go b/internal/echoserver/echoserver_test.go new file mode 100644 index 0000000..9de3a7e --- /dev/null +++ b/internal/echoserver/echoserver_test.go @@ -0,0 +1,54 @@ +package echoserver_test + +import ( + "context" + "crypto/rand" + "io" + "net" + "testing" + + "github.com/cirruslabs/orchard/internal/echoserver" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestServer(t *testing.T) { + echoServer, err := echoserver.New() + require.NoError(t, err) + + subCtx, subCtxCancel := context.WithCancel(t.Context()) + group, ctx := errgroup.WithContext(subCtx) + group.Go(func() error { + return echoServer.Run(ctx) + }) + + netConn, err := net.Dial("tcp", echoServer.Addr()) + require.NoError(t, err) + + const bufSizeBytes = 64 * 1024 + outgoingBuf := make([]byte, bufSizeBytes) + incomingBuf := make([]byte, bufSizeBytes) + + // Prepare the outgoing buffer + n, err := rand.Read(outgoingBuf) + require.NoError(t, err) + require.Equal(t, len(outgoingBuf), n) + + // Send the outgoing buffer + n, err = netConn.Write(outgoingBuf) + require.NoError(t, err) + require.Equal(t, len(outgoingBuf), n) + + // Receive the incoming buffer + n, err = io.ReadFull(netConn, incomingBuf) + require.NoError(t, err) + require.Equal(t, len(incomingBuf), n) + + // Compare outgoing and incoming buffers + require.Equal(t, outgoingBuf, incomingBuf) + + // Ensure clean shutdown + require.NoError(t, netConn.Close()) + subCtxCancel() + require.NoError(t, group.Wait()) +} diff --git a/internal/worker/option.go b/internal/worker/option.go index 83697bc..404c38a 100644 --- a/internal/worker/option.go +++ b/internal/worker/option.go @@ -1,7 +1,7 @@ package worker import ( - "github.com/cirruslabs/chacha/pkg/localnetworkhelper" + "github.com/cirruslabs/orchard/internal/dialer" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" "go.uber.org/zap" ) @@ -33,9 +33,15 @@ func WithDefaultCPUAndMemory(defaultCPU uint64, defaultMemory uint64) Option { } } -func WithLocalNetworkHelper(localNetworkHelper *localnetworkhelper.LocalNetworkHelper) Option { +func WithDialer(dialer dialer.Dialer) Option { return func(worker *Worker) { - worker.localNetworkHelper = localNetworkHelper + worker.dialer = dialer + } +} + +func WithSynthetic() Option { + return func(worker *Worker) { + worker.synthetic = true } } diff --git a/internal/worker/rpc.go b/internal/worker/rpc.go index 86e48b3..dad5850 100644 --- a/internal/worker/rpc.go +++ b/internal/worker/rpc.go @@ -113,8 +113,8 @@ func (worker *Worker) handlePortForward( // Connect to the VM's port var vmConn net.Conn - if worker.localNetworkHelper != nil { - vmConn, err = worker.localNetworkHelper.PrivilegedDialContext(ctx, "tcp", + if worker.dialer != nil { + vmConn, err = worker.dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", host, portForwardAction.Port)) } else { dialer := net.Dialer{} diff --git a/internal/worker/rpcv2.go b/internal/worker/rpcv2.go index 40df62b..f9d714d 100644 --- a/internal/worker/rpcv2.go +++ b/internal/worker/rpcv2.go @@ -100,8 +100,8 @@ func (worker *Worker) handlePortForwardV2Inner( var vmConn net.Conn - if worker.localNetworkHelper != nil { - vmConn, err = worker.localNetworkHelper.PrivilegedDialContext(ctx, "tcp", + if worker.dialer != nil { + vmConn, err = worker.dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", host, portForward.Port)) } else { dialer := net.Dialer{} diff --git a/internal/worker/vmmanager/base/base.go b/internal/worker/vmmanager/base/base.go new file mode 100644 index 0000000..5b8d2ce --- /dev/null +++ b/internal/worker/vmmanager/base/base.go @@ -0,0 +1,111 @@ +package base + +import ( + "sync/atomic" + + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + mapset "github.com/deckarep/golang-set/v2" + "go.uber.org/zap" +) + +type VM struct { + // Backward compatibility with v1.VM specification's "Status" field + // + // "started" is always true after the first "tart run", + // whereas ConditionReady can be used to tell if a VM + // is really running or not. + started atomic.Bool + + // A more orthogonal alternative to v1.VM specification's "Status" field, + // which allows a VM to have more than one state. + // + // For example, a VM can be both in ConditionReady and ConditionSuspending/ + // ConditionStopping states for a short time. This way in run() we know + // that we're in a process of rebooting a VM, so we can avoid throwing + // an error about unexpected VM termination. + conditions mapset.Set[v1.ConditionType] + + statusMessage atomic.Pointer[string] + err atomic.Pointer[error] + + logger *zap.SugaredLogger +} + +func NewVM(logger *zap.SugaredLogger) *VM { + return &VM{ + conditions: mapset.NewSet(v1.ConditionTypeCloning), + logger: logger, + } +} + +func (vm *VM) SetStarted(val bool) { + vm.started.Store(val) +} + +func (vm *VM) Status() v1.VMStatus { + if vm.Err() != nil { + return v1.VMStatusFailed + } + + if vm.started.Load() { + return v1.VMStatusRunning + } + + return v1.VMStatusPending +} + +func (vm *VM) StatusMessage() string { + status := vm.statusMessage.Load() + + if status != nil { + return *status + } + + return "" +} + +func (vm *VM) SetStatusMessage(status string) { + vm.logger.Debugf(status) + vm.statusMessage.Store(&status) +} + +func (vm *VM) Err() error { + if err := vm.err.Load(); err != nil { + return *err + } + + return nil +} + +func (vm *VM) SetErr(err error) { + if vm.err.CompareAndSwap(nil, &err) { + vm.logger.Error(err) + } +} + +func (vm *VM) ConditionsSet() mapset.Set[v1.ConditionType] { + return vm.conditions +} + +func (vm *VM) Conditions() []v1.Condition { + // Only expose a minimum amount of conditions necessary + // for the Orchard Controller to make decisions + return []v1.Condition{ + vm.conditionTypeToCondition(v1.ConditionTypeRunning), + } +} + +func (vm *VM) conditionTypeToCondition(conditionType v1.ConditionType) v1.Condition { + var conditionState v1.ConditionState + + if vm.ConditionsSet().ContainsOne(conditionType) { + conditionState = v1.ConditionStateTrue + } else { + conditionState = v1.ConditionStateFalse + } + + return v1.Condition{ + Type: conditionType, + State: conditionState, + } +} diff --git a/internal/worker/vmmanager/synthetic/synthetic.go b/internal/worker/vmmanager/synthetic/synthetic.go new file mode 100644 index 0000000..ea82286 --- /dev/null +++ b/internal/worker/vmmanager/synthetic/synthetic.go @@ -0,0 +1,202 @@ +package synthetic + +import ( + "context" + "math/rand" + "strings" + "sync" + "time" + + "github.com/cirruslabs/orchard/internal/worker/ondiskname" + "github.com/cirruslabs/orchard/internal/worker/vmmanager/base" + "github.com/cirruslabs/orchard/pkg/client" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.uber.org/zap" +) + +type VM struct { + onDiskName ondiskname.OnDiskName + resource v1.VM + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + logger *zap.SugaredLogger + + *base.VM +} + +func NewVM( + vmResource v1.VM, + eventStreamer *client.EventStreamer, + vmPullTimeHistogram metric.Float64Histogram, + logger *zap.SugaredLogger, +) *VM { + ctx, cancel := context.WithCancel(context.Background()) + + logger = logger.With( + "vm_uid", vmResource.UID, + "vm_name", vmResource.Name, + "vm_restart_count", vmResource.RestartCount, + ) + + vm := &VM{ + onDiskName: ondiskname.NewFromResource(vmResource), + resource: vmResource, + ctx: ctx, + cancel: cancel, + logger: logger, + VM: base.NewVM(logger), + } + + vm.wg.Add(1) + + go func() { + defer vm.wg.Done() + + if vmResource.ImagePullPolicy == v1.ImagePullPolicyAlways { + vm.SetStatusMessage("pulling VM image...") + + pullStartedAt := time.Now() + + // Pull + time.Sleep(randomDelay()) + + vmPullTimeHistogram.Record(vm.ctx, time.Since(pullStartedAt).Seconds(), metric.WithAttributes( + attribute.String("worker", vm.resource.Worker), + attribute.String("image", vm.resource.Image), + )) + } + + // Clone and configure + time.Sleep(randomDelay()) + + // Backward compatibility with v1.VM specification's "Status" field + vm.SetStarted(true) + + vm.ConditionsSet().Add(v1.ConditionTypeRunning) + + vm.run(vm.ctx, eventStreamer) + }() + + return vm +} + +func (vm *VM) Resource() v1.VM { + return vm.resource +} + +func (vm *VM) SetResource(vmResource v1.VM) { + vm.resource = vmResource + vm.resource.ObservedGeneration = vmResource.Generation +} + +func (vm *VM) OnDiskName() ondiskname.OnDiskName { + return vm.onDiskName +} + +func (vm *VM) ImageFQN() *string { + return nil +} + +func (vm *VM) Start(eventStreamer *client.EventStreamer) { + vm.SetStatusMessage("Starting VM") + vm.ConditionsSet().Add(v1.ConditionTypeRunning) + + vm.cancel() + + vm.ctx, vm.cancel = context.WithCancel(context.Background()) + vm.wg.Add(1) + + go func() { + defer vm.wg.Done() + + vm.run(vm.ctx, eventStreamer) + }() +} + +func (vm *VM) Suspend() <-chan error { + errChan := make(chan error, 1) + + errChan <- nil + + return errChan +} + +func (vm *VM) IP(ctx context.Context) (string, error) { + time.Sleep(randomDelay()) + + return "127.0.0.1", nil +} + +func (vm *VM) Stop() <-chan error { + errChan := make(chan error, 1) + + errChan <- nil + + return errChan +} + +func (vm *VM) Delete() error { + vm.cancel() + + if vm.ConditionsSet().Contains(v1.ConditionTypeCloning) { + // Not cloned yet, nothing to delete + return nil + } + + time.Sleep(randomDelay()) + + return nil +} + +func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) { + defer vm.ConditionsSet().RemoveAll(v1.ConditionTypeRunning, v1.ConditionTypeSuspending, v1.ConditionTypeStopping) + + // Launch the startup script goroutine as close as possible + // to the VM startup (below) to avoid "tart ip" timing out + if vm.resource.StartupScript != nil { + vm.SetStatusMessage("VM started, running startup script...") + + go vm.runScript(vm.resource.StartupScript, eventStreamer) + } else { + vm.SetStatusMessage("VM started") + } + + <-ctx.Done() +} + +func (vm *VM) runScript(script *v1.VMScript, eventStreamer *client.EventStreamer) { + if eventStreamer != nil { + defer func() { + if err := eventStreamer.Close(); err != nil { + vm.logger.Errorf("errored during streaming events for startup script: %v", err) + } + }() + } + + consumeLine := func(line string) { + if eventStreamer == nil { + return + } + + eventStreamer.Stream(v1.Event{ + Kind: v1.EventKindLogLine, + Timestamp: time.Now().Unix(), + Payload: line, + }) + } + + for line := range strings.Lines(script.ScriptContent) { + consumeLine(line) + + randomDelay() + } +} + +func randomDelay() time.Duration { + const jitterBaseDelay = 2500 * time.Millisecond + + return time.Duration(rand.Float64() * float64(jitterBaseDelay)) +} diff --git a/internal/worker/vmmanager/tart/tart.go b/internal/worker/vmmanager/tart/tart.go index 8d1ed9b..36abaa8 100644 --- a/internal/worker/vmmanager/tart/tart.go +++ b/internal/worker/vmmanager/tart/tart.go @@ -14,11 +14,11 @@ import ( "time" "github.com/avast/retry-go" - "github.com/cirruslabs/chacha/pkg/localnetworkhelper" + "github.com/cirruslabs/orchard/internal/dialer" "github.com/cirruslabs/orchard/internal/worker/ondiskname" + "github.com/cirruslabs/orchard/internal/worker/vmmanager/base" "github.com/cirruslabs/orchard/pkg/client" "github.com/cirruslabs/orchard/pkg/resource/v1" - mapset "github.com/deckarep/golang-set/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" @@ -32,41 +32,24 @@ type VM struct { resource v1.VM logger *zap.SugaredLogger - // Backward compatibility with v1.VM specification's "Status" field - // - // "started" is always true after the first "tart run", - // whereas ConditionReady can be used to tell if a VM - // is really running or not. - started atomic.Bool - - // A more orthogonal alternative to v1.VM specification's "Status" field, - // which allows a VM to have more than one state. - // - // For example, a VM can be both in ConditionReady and ConditionSuspending/ - // ConditionStopping states for a short time. This way in run() we know - // that we're in a process of rebooting a VM, so we can avoid throwing - // an error about unexpected VM termination. - conditions mapset.Set[v1.ConditionType] - // Image FQN feature, see https://github.com/cirruslabs/orchard/issues/164 imageFQN atomic.Pointer[string] - statusMessage atomic.Pointer[string] - err atomic.Pointer[error] - ctx context.Context cancel context.CancelFunc wg *sync.WaitGroup - localNetworkHelper *localnetworkhelper.LocalNetworkHelper + dialer dialer.Dialer + + *base.VM } func NewVM( vmResource v1.VM, eventStreamer *client.EventStreamer, vmPullTimeHistogram metric.Float64Histogram, - localNetworkHelper *localnetworkhelper.LocalNetworkHelper, + dialer dialer.Dialer, logger *zap.SugaredLogger, ) *VM { vmContext, vmContextCancel := context.WithCancel(context.Background()) @@ -80,14 +63,14 @@ func NewVM( "vm_restart_count", vmResource.RestartCount, ), - conditions: mapset.NewSet(v1.ConditionTypeCloning), - ctx: vmContext, cancel: vmContextCancel, wg: &sync.WaitGroup{}, - localNetworkHelper: localNetworkHelper, + dialer: dialer, + + VM: base.NewVM(logger), } vm.wg.Add(1) @@ -96,7 +79,7 @@ func NewVM( defer vm.wg.Done() if vmResource.ImagePullPolicy == v1.ImagePullPolicyAlways { - vm.setStatusMessage("pulling VM image...") + vm.SetStatusMessage("pulling VM image...") pullStartedAt := time.Now() @@ -106,7 +89,7 @@ func NewVM( case <-vm.ctx.Done(): // Do not return an error because it's the user's intent to cancel this VM operation default: - vm.setErr(fmt.Errorf("failed to pull the VM: %w", err)) + vm.SetErr(fmt.Errorf("failed to pull the VM: %w", err)) } return @@ -123,16 +106,16 @@ func NewVM( case <-vm.ctx.Done(): // Do not return an error because it's the user's intent to cancel this VM operation default: - vm.setErr(fmt.Errorf("failed to clone the VM: %w", err)) + vm.SetErr(fmt.Errorf("failed to clone the VM: %w", err)) } return } // Backward compatibility with v1.VM specification's "Status" field - vm.started.Store(true) + vm.SetStarted(true) - vm.conditions.Add(v1.ConditionTypeRunning) + vm.ConditionsSet().Add(v1.ConditionTypeRunning) vm.run(vm.ctx, eventStreamer) }() @@ -161,79 +144,15 @@ func (vm *VM) id() string { return vm.onDiskName.String() } -func (vm *VM) Status() v1.VMStatus { - if vm.Err() != nil { - return v1.VMStatusFailed - } - - if vm.started.Load() { - return v1.VMStatusRunning - } - - return v1.VMStatusPending -} - -func (vm *VM) StatusMessage() string { - status := vm.statusMessage.Load() - - if status != nil { - return *status - } - - return "" -} - -func (vm *VM) setStatusMessage(status string) { - vm.logger.Debugf(status) - vm.statusMessage.Store(&status) -} - -func (vm *VM) Err() error { - if err := vm.err.Load(); err != nil { - return *err - } - - return nil -} - -func (vm *VM) setErr(err error) { - if vm.err.CompareAndSwap(nil, &err) { - vm.logger.Error(err) - } -} - -func (vm *VM) Conditions() []v1.Condition { - // Only expose a minimum amount of conditions necessary - // for the Orchard Controller to make decisions - return []v1.Condition{ - vm.conditionTypeToCondition(v1.ConditionTypeRunning), - } -} - -func (vm *VM) conditionTypeToCondition(conditionType v1.ConditionType) v1.Condition { - var conditionState v1.ConditionState - - if vm.conditions.ContainsOne(conditionType) { - conditionState = v1.ConditionStateTrue - } else { - conditionState = v1.ConditionStateFalse - } - - return v1.Condition{ - Type: conditionType, - State: conditionState, - } -} - func (vm *VM) cloneAndConfigure(ctx context.Context) error { - vm.setStatusMessage("cloning VM...") + vm.SetStatusMessage("cloning VM...") _, _, err := Tart(ctx, vm.logger, "clone", vm.resource.Image, vm.id()) if err != nil { return err } - vm.conditions.Remove(v1.ConditionTypeCloning) + vm.ConditionsSet().Remove(v1.ConditionTypeCloning) // Image FQN feature, see https://github.com/cirruslabs/orchard/issues/164 fqnRaw, _, err := Tart(ctx, vm.logger, "fqn", vm.resource.Image) @@ -243,7 +162,7 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error { } // Set memory - vm.setStatusMessage("configuring VM...") + vm.SetStatusMessage("configuring VM...") memory := vm.resource.AssignedMemory @@ -356,16 +275,16 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error { } func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) { - defer vm.conditions.RemoveAll(v1.ConditionTypeRunning, v1.ConditionTypeSuspending, v1.ConditionTypeStopping) + defer vm.ConditionsSet().RemoveAll(v1.ConditionTypeRunning, v1.ConditionTypeSuspending, v1.ConditionTypeStopping) // Launch the startup script goroutine as close as possible // to the VM startup (below) to avoid "tart ip" timing out if vm.resource.StartupScript != nil { - vm.setStatusMessage("VM started, running startup script...") + vm.SetStatusMessage("VM started, running startup script...") go vm.runScript(vm.resource.StartupScript, eventStreamer) } else { - vm.setStatusMessage("VM started") + vm.SetStatusMessage("VM started") } var runArgs = []string{"run"} @@ -406,7 +325,7 @@ func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) { case <-vm.ctx.Done(): // Do not return an error because it's the user's intent to cancel this VM default: - vm.setErr(fmt.Errorf("%w: %v", ErrVMFailed, err)) + vm.SetErr(fmt.Errorf("%w: %v", ErrVMFailed, err)) } return @@ -416,8 +335,8 @@ func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) { case <-vm.ctx.Done(): // Do not return an error because it's the user's intent to cancel this VM default: - if !vm.conditions.ContainsAny(v1.ConditionTypeSuspending, v1.ConditionTypeStopping) { - vm.setErr(fmt.Errorf("%w: VM exited unexpectedly", ErrVMFailed)) + if !vm.ConditionsSet().ContainsAny(v1.ConditionTypeSuspending, v1.ConditionTypeStopping) { + vm.SetErr(fmt.Errorf("%w: VM exited unexpectedly", ErrVMFailed)) } } } @@ -462,14 +381,14 @@ func (vm *VM) Suspend() <-chan error { // VM is still running } - vm.setStatusMessage("Suspending VM") - vm.conditions.Add(v1.ConditionTypeSuspending) + vm.SetStatusMessage("Suspending VM") + vm.ConditionsSet().Add(v1.ConditionTypeSuspending) go func() { _, _, err := Tart(context.Background(), zap.NewNop().Sugar(), "suspend", vm.id()) if err != nil { err := fmt.Errorf("failed to suspend VM: %w", err) - vm.setErr(err) + vm.SetErr(err) errCh <- err return @@ -494,8 +413,8 @@ func (vm *VM) Stop() <-chan error { // VM is still running } - vm.setStatusMessage("Stopping VM") - vm.conditions.Add(v1.ConditionTypeStopping) + vm.SetStatusMessage("Stopping VM") + vm.ConditionsSet().Add(v1.ConditionTypeStopping) go func() { // Try to gracefully terminate the VM @@ -513,8 +432,8 @@ func (vm *VM) Stop() <-chan error { } func (vm *VM) Start(eventStreamer *client.EventStreamer) { - vm.setStatusMessage("Starting VM") - vm.conditions.Add(v1.ConditionTypeRunning) + vm.SetStatusMessage("Starting VM") + vm.ConditionsSet().Add(v1.ConditionTypeRunning) vm.cancel() @@ -533,7 +452,7 @@ func (vm *VM) Delete() error { // (e.g. "tart clone", "tart run", etc.) vm.cancel() - if vm.conditions.Contains(v1.ConditionTypeCloning) { + if vm.ConditionsSet().Contains(v1.ConditionTypeCloning) { // Not cloned yet, nothing to delete return nil } @@ -586,8 +505,8 @@ func (vm *VM) shell( var netConn net.Conn - if vm.localNetworkHelper != nil { - netConn, err = vm.localNetworkHelper.PrivilegedDialContext(dialCtx, "tcp", addr) + if vm.dialer != nil { + netConn, err = vm.dialer.DialContext(dialCtx, "tcp", addr) } else { dialer := net.Dialer{} @@ -691,6 +610,6 @@ func (vm *VM) runScript(script *v1.VMScript, eventStreamer *client.EventStreamer err := vm.shell(vm.ctx, vm.resource.Username, vm.resource.Password, script.ScriptContent, script.Env, consumeLine) if err != nil { - vm.setErr(fmt.Errorf("%w: failed to run startup script: %v", ErrVMFailed, err)) + vm.SetErr(fmt.Errorf("%w: failed to run startup script: %v", ErrVMFailed, err)) } } diff --git a/internal/worker/vmmanager/vmmanager.go b/internal/worker/vmmanager/vmmanager.go index 829c01e..6c8a198 100644 --- a/internal/worker/vmmanager/vmmanager.go +++ b/internal/worker/vmmanager/vmmanager.go @@ -6,6 +6,7 @@ import ( "github.com/cirruslabs/orchard/internal/worker/ondiskname" "github.com/cirruslabs/orchard/pkg/client" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/puzpuzpuz/xsync/v4" ) type VM interface { @@ -26,45 +27,47 @@ type VM interface { } type VMManager struct { - vms map[ondiskname.OnDiskName]VM + vms *xsync.Map[ondiskname.OnDiskName, VM] } func New() *VMManager { return &VMManager{ - vms: map[ondiskname.OnDiskName]VM{}, + vms: xsync.NewMap[ondiskname.OnDiskName, VM](), } } func (vmm *VMManager) Exists(key ondiskname.OnDiskName) bool { - _, ok := vmm.vms[key] + _, ok := vmm.vms.Load(key) return ok } func (vmm *VMManager) Get(key ondiskname.OnDiskName) (VM, bool) { - vm, ok := vmm.vms[key] + vm, ok := vmm.vms.Load(key) return vm, ok } func (vmm *VMManager) Put(key ondiskname.OnDiskName, vm VM) { - vmm.vms[key] = vm + vmm.vms.Store(key, vm) } func (vmm *VMManager) Delete(key ondiskname.OnDiskName) { - delete(vmm.vms, key) + vmm.vms.Delete(key) } func (vmm *VMManager) Len() int { - return len(vmm.vms) + return vmm.vms.Size() } func (vmm *VMManager) List() []VM { var vms []VM - for _, vm := range vmm.vms { + vmm.vms.Range(func(odn ondiskname.OnDiskName, vm VM) bool { vms = append(vms, vm) - } + + return true + }) return vms } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index f36efe6..859e7fe 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -9,12 +9,13 @@ import ( "time" "github.com/avast/retry-go/v4" - "github.com/cirruslabs/chacha/pkg/localnetworkhelper" + "github.com/cirruslabs/orchard/internal/dialer" "github.com/cirruslabs/orchard/internal/opentelemetry" "github.com/cirruslabs/orchard/internal/worker/dhcpleasetime" "github.com/cirruslabs/orchard/internal/worker/iokitregistry" "github.com/cirruslabs/orchard/internal/worker/ondiskname" "github.com/cirruslabs/orchard/internal/worker/vmmanager" + "github.com/cirruslabs/orchard/internal/worker/vmmanager/synthetic" "github.com/cirruslabs/orchard/internal/worker/vmmanager/tart" "github.com/cirruslabs/orchard/pkg/client" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" @@ -47,9 +48,11 @@ type Worker struct { defaultCPU uint64 defaultMemory uint64 + synthetic bool + vmPullTimeHistogram metric.Float64Histogram - localNetworkHelper *localnetworkhelper.LocalNetworkHelper + dialer dialer.Dialer logger *zap.SugaredLogger } @@ -69,12 +72,12 @@ func New(client *client.Client, opts ...Option) (*Worker, error) { // Apply defaults if worker.name == "" { - hostname, err := os.Hostname() + name, err := DefaultName() if err != nil { return nil, err } - worker.name = hostname + worker.name = name } defaultResources := v1.Resources{ @@ -225,6 +228,10 @@ func (worker *Worker) runNewSession(ctx context.Context) error { } } +func DefaultName() (string, error) { + return os.Hostname() +} + func (worker *Worker) registerWorker(ctx context.Context) error { platformUUID, err := iokitregistry.PlatformUUID() if err != nil { @@ -459,6 +466,11 @@ func (worker *Worker) syncVMs(ctx context.Context, updateVM func(context.Context //nolint:nestif,gocognit // complexity is tolerable for now func (worker *Worker) syncOnDiskVMs(ctx context.Context) error { + if worker.synthetic { + // There's no on-disk VMs when using synthetic VMs + return nil + } + remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name) if err != nil { return err @@ -536,8 +548,14 @@ func (worker *Worker) deleteVM(vm vmmanager.VM) error { func (worker *Worker) createVM(odn ondiskname.OnDiskName, vmResource v1.VM) { eventStreamer := worker.client.VMs().StreamEvents(vmResource.Name) - vm := tart.NewVM(vmResource, eventStreamer, worker.vmPullTimeHistogram, - worker.localNetworkHelper, worker.logger) + var vm vmmanager.VM + + if worker.synthetic { + vm = synthetic.NewVM(vmResource, eventStreamer, worker.vmPullTimeHistogram, worker.logger) + } else { + vm = tart.NewVM(vmResource, eventStreamer, worker.vmPullTimeHistogram, + worker.dialer, worker.logger) + } worker.vmm.Put(odn, vm) } diff --git a/pkg/client/client.go b/pkg/client/client.go index a5f846b..ff8d6e5 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -16,6 +16,7 @@ import ( "time" "github.com/cirruslabs/orchard/internal/config" + "github.com/cirruslabs/orchard/internal/dialer" "github.com/cirruslabs/orchard/internal/version" "github.com/cirruslabs/orchard/rpc" "github.com/coder/websocket" @@ -42,7 +43,7 @@ type Client struct { serviceAccountName string serviceAccountToken string - dialContext func(ctx context.Context, network, addr string) (net.Conn, error) + dialer dialer.Dialer } type Config struct { @@ -84,8 +85,8 @@ func New(opts ...Option) (*Client, error) { TLSClientConfig: client.tlsConfig, } - if client.dialContext != nil { - transport.DialContext = client.dialContext + if client.dialer != nil { + transport.DialContext = client.dialer.DialContext } client.httpClient = &http.Client{ diff --git a/pkg/client/option.go b/pkg/client/option.go index cd5c203..b792716 100644 --- a/pkg/client/option.go +++ b/pkg/client/option.go @@ -1,9 +1,9 @@ package client import ( - "context" "crypto/x509" - "net" + + "github.com/cirruslabs/orchard/internal/dialer" ) type Option func(*Client) @@ -27,8 +27,8 @@ func WithCredentials(serviceAccountName string, serviceAccountToken string) Opti } } -func WithDialContext(dialContext func(ctx context.Context, network, addr string) (net.Conn, error)) Option { +func WithDialer(dialer dialer.Dialer) Option { return func(client *Client) { - client.dialContext = dialContext + client.dialer = dialer } } diff --git a/test/load/vm-lifecycle.js b/test/load/vm-lifecycle.js new file mode 100644 index 0000000..30ae3b8 --- /dev/null +++ b/test/load/vm-lifecycle.js @@ -0,0 +1,120 @@ +import {check} from 'k6'; +import http from 'k6/http'; +import {WebSocket} from 'k6/experimental/websockets'; +import crypto from 'k6/crypto'; +import {uuidv4} from 'https://jslib.k6.io/k6-utils/1.6.0/index.js'; +import {expect} from 'https://jslib.k6.io/k6-testing/0.6.1/index.js'; + +const BASE_URL = __ENV.ORCHARD_BASE_URL || 'http://127.0.0.1:6120/v1'; +const WS_BASE_URL = __ENV.ORCHARD_WS_BASE_URL || BASE_URL.replace(/^http/, 'ws'); +const WS_BYTES = __ENV.WS_BYTES || 64 * 1024; + +export const options = { + scenarios: { + vmLifecycle: { + executor: 'ramping-vus', + stages: [ + {duration: '5m', target: 2_000}, + ], + }, + }, +}; + +export default async function () { + const vmName = `k6-${uuidv4()}`; + + createVM(vmName); + + await portForward(vmName); + + deleteVM(vmName); +} + +function createVM(vmName) { + const loremIpsum = `Lorem ipsum + dolor sit amet, + consectetur adipiscing elit. + Fusce at orci nisi. + Donec lacinia neque et risus elementum, + ut interdum lacus pretium. +`; + + const body = JSON.stringify({ + name: vmName, + image: 'ghcr.io/cirruslabs/macos-tahoe-base:latest', + headless: true, + startup_script: { + script_content: loremIpsum + }, + }); + + const resp = http.post(`${BASE_URL}/vms`, body, { + headers: { + 'Content-Type': 'application/json', + }, + }); + + const ok = check(resp, { + 'VM creation succeeded': (r) => r.status === 200, + }); + + if (!ok) { + console.error(`Failed to create a VM: HTTP ${resp.status}`); + } +} + +async function portForward(vmName) { + const url = `${WS_BASE_URL}/vms/${vmName}/port-forward?port=22&wait=60`; + + console.debug(`connecting to ${url}`); + + const ws = new WebSocket(url); + ws.binaryType = 'arraybuffer'; + + const sentBytes = new Uint8Array(crypto.randomBytes(WS_BYTES)); + let receivedBytes = new Uint8Array(); + + const evt = await new Promise((resolve, reject) => { + ws.onopen = () => { + ws.send(sentBytes); + }; + + ws.onmessage = (event) => { + if (event.data instanceof ArrayBuffer) { + const chunk = new Uint8Array(event.data); + + const combined = new Uint8Array(receivedBytes.length + chunk.length); + combined.set(receivedBytes); + combined.set(chunk, receivedBytes.length); + receivedBytes = combined; + } + + if (receivedBytes.length >= WS_BYTES) { + ws.close(); + } + }; + + ws.onerror = (evt) => { + reject(new Error(`WebSocket error: ${evt.error}`)); + }; + + ws.onclose = (evt) => { + resolve(evt); + }; + }); + + expect(evt.code).toBe(1000); + expect(receivedBytes).toEqual(sentBytes); +} + +function deleteVM(vmName) { + const resp = http.del(`${BASE_URL}/vms/${vmName}`); + + const ok = check(resp, { + 'VM deletion succeeded': (r) => r.status === 200, + }); + + if (!ok) { + console.error(`Failed to delete a VM: HTTP ${resp.status}`); + } +}