Load testing: synthetic VMs, multiple worker support and Grafana k6 test (#389)

* Load testing: synthetic VMs, multiple worker support and Grafana k6 test

* echoserver: prevent fallthrough when Accept() fails

* Move default local-dev context logic to CreateDevController()

* Synthetic: add a random delay to startup script echoing
This commit is contained in:
Nikolay Edigaryev 2026-01-28 10:54:55 +01:00 committed by GitHub
parent 6fe523ef69
commit 7775515a73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 832 additions and 183 deletions

4
go.mod
View File

@ -26,6 +26,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
github.com/pterm/pterm v0.12.82
github.com/puzpuzpuz/xsync/v4 v4.0.0
github.com/samber/lo v1.52.0
github.com/samber/mo v1.16.0
github.com/sethvargo/go-password v0.3.1
@ -42,6 +43,7 @@ require (
golang.org/x/crypto v0.47.0
golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa
golang.org/x/net v0.49.0
golang.org/x/sync v0.19.0
golang.org/x/term v0.39.0
google.golang.org/grpc v1.78.0
google.golang.org/protobuf v1.36.11
@ -123,7 +125,6 @@ require (
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/puzpuzpuz/xsync/v4 v4.0.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/spf13/pflag v1.0.9 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
@ -142,7 +143,6 @@ require (
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/arch v0.16.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect

View File

@ -3,20 +3,26 @@
package dev
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"path"
"path/filepath"
"github.com/cirruslabs/orchard/internal/config"
"github.com/cirruslabs/orchard/internal/controller"
"github.com/cirruslabs/orchard/internal/dialer"
"github.com/cirruslabs/orchard/internal/echoserver"
"github.com/cirruslabs/orchard/internal/netconstants"
"github.com/cirruslabs/orchard/internal/worker"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var ErrFailed = errors.New("failed to run development controller and worker")
@ -25,6 +31,9 @@ var devDataDirPath string
var apiPrefix string
var stringToStringResources map[string]string
var experimentalRPCV2 bool
var addressPprof string
var synthetic bool
var workers int
func NewCommand() *cobra.Command {
command := &cobra.Command{
@ -42,11 +51,35 @@ func NewCommand() *cobra.Command {
"resources that the development worker will provide")
command.Flags().BoolVar(&experimentalRPCV2, "experimental-rpc-v2", false,
"enable experimental RPC v2 (https://github.com/cirruslabs/orchard/issues/235)")
command.Flags().StringVar(&addressPprof, "listen-pprof", "",
"start pprof HTTP server on localhost:6060 for diagnostic purposes (e.g. \"localhost:6060\")")
command.Flags().BoolVar(&synthetic, "synthetic", false,
"do not instantiate real Tart VM, use synthetic in-memory VMs suitable for load testing")
command.Flags().IntVar(&workers, "workers", 1, "number of workers to start")
return command
}
func runDev(cmd *cobra.Command, args []string) error {
// Initialize the logger
logger, err := zap.NewDevelopment()
if err != nil {
return err
}
defer func() {
if syncErr := logger.Sync(); syncErr != nil && err == nil {
err = syncErr
}
}()
if addressPprof != "" {
go func() {
if err := http.ListenAndServe(addressPprof, nil); err != nil {
logger.Sugar().Errorf("pprof server failed: %v", err)
}
}()
}
if !filepath.IsAbs(devDataDirPath) {
pwd, err := os.Getwd()
if err != nil {
@ -71,29 +104,69 @@ func runDev(cmd *cobra.Command, args []string) error {
additionalControllerOpts = append(additionalControllerOpts, controller.WithExperimentalRPCV2())
}
devController, devWorker, err := CreateDevControllerAndWorker(devDataDirPath,
fmt.Sprintf(":%d", netconstants.DefaultControllerPort), resources,
additionalControllerOpts, nil)
group, ctx := errgroup.WithContext(cmd.Context())
var additionalWorkerOpts []worker.Option
if synthetic {
// Use TCP echo server to partially emulate VM's TCP/IP stack,
// this way we get port-forwarding working when running in
// synthetic mode
echoServer, err := echoserver.New()
if err != nil {
return err
}
group.Go(func() error {
return echoServer.Run(ctx)
})
dialer := dialer.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := net.Dialer{}
return dialer.DialContext(ctx, "tcp", echoServer.Addr())
})
additionalControllerOpts = append(additionalControllerOpts, controller.WithSynthetic())
additionalWorkerOpts = append(additionalWorkerOpts, worker.WithSynthetic(),
worker.WithDialer(dialer))
}
devController, devClient, err := CreateDevController(devDataDirPath, fmt.Sprintf(":%d",
netconstants.DefaultControllerPort), additionalControllerOpts, logger)
if err != nil {
return err
}
defer devWorker.Close()
errChan := make(chan error, 2)
group.Go(func() error {
return devController.Run(ctx)
})
go func() {
if err := devController.Run(cmd.Context()); err != nil {
errChan <- err
}
}()
workerName, err := worker.DefaultName()
if err != nil {
return err
}
go func() {
if err := devWorker.Run(cmd.Context()); err != nil {
errChan <- err
}
}()
for i := range workers {
group.Go(func() error {
workerNameLocal := workerName
return <-errChan
if workers > 1 {
workerNameLocal = fmt.Sprintf("%s-%d", workerName, i)
}
devWorker, err := CreateDevWorker(devClient, resources,
append(additionalWorkerOpts, worker.WithName(workerNameLocal)), logger)
if err != nil {
return err
}
defer devWorker.Close()
return devWorker.Run(ctx)
})
}
return group.Wait()
}
func CreateDevControllerAndWorker(
@ -114,6 +187,26 @@ func CreateDevControllerAndWorker(
}
}()
devController, defaultClient, err := CreateDevController(devDataDirPath, controllerListenAddr,
additionalControllerOpts, logger)
if err != nil {
return nil, nil, err
}
devWorker, err := CreateDevWorker(defaultClient, resources, additionalWorkerOpts, logger)
if err != nil {
return nil, nil, err
}
return devController, devWorker, nil
}
func CreateDevController(
devDataDirPath string,
controllerListenAddr string,
additionalControllerOpts []controller.Option,
logger *zap.Logger,
) (*controller.Controller, *client.Client, error) {
dataDir, err := controller.NewDataDir(devDataDirPath)
if err != nil {
return nil, nil, err
@ -139,18 +232,6 @@ func CreateDevControllerAndWorker(
return nil, nil, err
}
workerOpts := []worker.Option{
worker.WithResources(resources),
worker.WithLogger(logger),
}
workerOpts = append(workerOpts, additionalWorkerOpts...)
devWorker, err := worker.New(defaultClient, workerOpts...)
if err != nil {
return nil, nil, err
}
// set local-dev context as active
configHandle, err := config.NewHandle()
if err != nil {
@ -166,5 +247,21 @@ func CreateDevControllerAndWorker(
return nil, nil, err
}
return devController, devWorker, nil
return devController, defaultClient, nil
}
func CreateDevWorker(
client *client.Client,
resources v1.Resources,
additionalWorkerOpts []worker.Option,
logger *zap.Logger,
) (*worker.Worker, error) {
workerOpts := []worker.Option{
worker.WithResources(resources),
worker.WithLogger(logger),
}
workerOpts = append(workerOpts, additionalWorkerOpts...)
return worker.New(client, workerOpts...)
}

View File

@ -3,11 +3,20 @@
package worker
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
_ "net/http/pprof"
"os"
"strings"
"github.com/cirruslabs/chacha/pkg/localnetworkhelper"
"github.com/cirruslabs/chacha/pkg/privdrop"
"github.com/cirruslabs/orchard/internal/bootstraptoken"
"github.com/cirruslabs/orchard/internal/dialer"
"github.com/cirruslabs/orchard/internal/netconstants"
"github.com/cirruslabs/orchard/internal/worker"
"github.com/cirruslabs/orchard/pkg/client"
@ -16,11 +25,6 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
"io"
"net/http"
_ "net/http/pprof"
"os"
"strings"
)
var (
@ -98,8 +102,12 @@ func runWorker(cmd *cobra.Command, args []string) (err error) {
return err
}
clientOpts = append(clientOpts, client.WithDialContext(localNetworkHelper.PrivilegedDialContext))
workerOpts = append(workerOpts, worker.WithLocalNetworkHelper(localNetworkHelper))
dialer := dialer.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
return localNetworkHelper.PrivilegedDialContext(ctx, network, addr)
})
clientOpts = append(clientOpts, client.WithDialer(dialer))
workerOpts = append(workerOpts, worker.WithDialer(dialer))
if err := privdrop.Drop(username); err != nil {
return err

View File

@ -59,7 +59,7 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder
return responder.Code(http.StatusInternalServerError)
}
if uint(len(workers)+1) > controller.maxWorkersPerLicense {
if !controller.synthetic && uint(len(workers)+1) > controller.maxWorkersPerLicense {
return responder.JSON(http.StatusConflict, NewErrorResponse("cannot register a new worker "+
"because the license capacity of %d workers has been reached, "+
"consider upgrading at https://tart.run/licensing/", controller.maxWorkersPerLicense))

View File

@ -66,6 +66,7 @@ type Controller struct {
disableDBCompression bool
pingInterval time.Duration
prometheusMetrics bool
synthetic bool
sshListenAddr string
sshSigner ssh.Signer

View File

@ -84,6 +84,12 @@ func WithPrometheusMetrics() Option {
}
}
func WithSynthetic() Option {
return func(controller *Controller) {
controller.synthetic = true
}
}
func WithLogger(logger *zap.Logger) Option {
return func(controller *Controller) {
controller.logger = logger.Sugar()

21
internal/dialer/dialer.go Normal file
View File

@ -0,0 +1,21 @@
package dialer
import (
"context"
"net"
)
type Dialer interface {
Dial(network, addr string) (net.Conn, error)
DialContext(ctx context.Context, network, addr string) (net.Conn, error)
}
type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
func (f DialFunc) Dial(network, addr string) (net.Conn, error) {
return f(context.Background(), network, addr)
}
func (f DialFunc) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
return f(ctx, network, addr)
}

View File

@ -0,0 +1,82 @@
package echoserver
import (
"context"
"errors"
"io"
"net"
"strings"
"syscall"
"golang.org/x/sync/errgroup"
)
type EchoServer struct {
listener net.Listener
}
func New() (*EchoServer, error) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
return nil, err
}
return &EchoServer{
listener: listener,
}, nil
}
func (echoServer *EchoServer) Addr() string {
return strings.ReplaceAll(echoServer.listener.Addr().String(), "[::]", "127.0.0.1")
}
func (echoServer *EchoServer) Run(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
<-ctx.Done()
return echoServer.listener.Close()
})
group.Go(func() error {
for {
conn, err := echoServer.listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return nil
}
return err
}
group.Go(func() error {
defer conn.Close()
buf := make([]byte, 4096)
for {
n, err := conn.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
_, err = conn.Write(buf[:n])
if err != nil {
if errors.Is(err, syscall.EPIPE) {
return nil
}
return err
}
}
})
}
})
return group.Wait()
}

View File

@ -0,0 +1,54 @@
package echoserver_test
import (
"context"
"crypto/rand"
"io"
"net"
"testing"
"github.com/cirruslabs/orchard/internal/echoserver"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestServer(t *testing.T) {
echoServer, err := echoserver.New()
require.NoError(t, err)
subCtx, subCtxCancel := context.WithCancel(t.Context())
group, ctx := errgroup.WithContext(subCtx)
group.Go(func() error {
return echoServer.Run(ctx)
})
netConn, err := net.Dial("tcp", echoServer.Addr())
require.NoError(t, err)
const bufSizeBytes = 64 * 1024
outgoingBuf := make([]byte, bufSizeBytes)
incomingBuf := make([]byte, bufSizeBytes)
// Prepare the outgoing buffer
n, err := rand.Read(outgoingBuf)
require.NoError(t, err)
require.Equal(t, len(outgoingBuf), n)
// Send the outgoing buffer
n, err = netConn.Write(outgoingBuf)
require.NoError(t, err)
require.Equal(t, len(outgoingBuf), n)
// Receive the incoming buffer
n, err = io.ReadFull(netConn, incomingBuf)
require.NoError(t, err)
require.Equal(t, len(incomingBuf), n)
// Compare outgoing and incoming buffers
require.Equal(t, outgoingBuf, incomingBuf)
// Ensure clean shutdown
require.NoError(t, netConn.Close())
subCtxCancel()
require.NoError(t, group.Wait())
}

View File

@ -1,7 +1,7 @@
package worker
import (
"github.com/cirruslabs/chacha/pkg/localnetworkhelper"
"github.com/cirruslabs/orchard/internal/dialer"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"go.uber.org/zap"
)
@ -33,9 +33,15 @@ func WithDefaultCPUAndMemory(defaultCPU uint64, defaultMemory uint64) Option {
}
}
func WithLocalNetworkHelper(localNetworkHelper *localnetworkhelper.LocalNetworkHelper) Option {
func WithDialer(dialer dialer.Dialer) Option {
return func(worker *Worker) {
worker.localNetworkHelper = localNetworkHelper
worker.dialer = dialer
}
}
func WithSynthetic() Option {
return func(worker *Worker) {
worker.synthetic = true
}
}

View File

@ -113,8 +113,8 @@ func (worker *Worker) handlePortForward(
// Connect to the VM's port
var vmConn net.Conn
if worker.localNetworkHelper != nil {
vmConn, err = worker.localNetworkHelper.PrivilegedDialContext(ctx, "tcp",
if worker.dialer != nil {
vmConn, err = worker.dialer.DialContext(ctx, "tcp",
fmt.Sprintf("%s:%d", host, portForwardAction.Port))
} else {
dialer := net.Dialer{}

View File

@ -100,8 +100,8 @@ func (worker *Worker) handlePortForwardV2Inner(
var vmConn net.Conn
if worker.localNetworkHelper != nil {
vmConn, err = worker.localNetworkHelper.PrivilegedDialContext(ctx, "tcp",
if worker.dialer != nil {
vmConn, err = worker.dialer.DialContext(ctx, "tcp",
fmt.Sprintf("%s:%d", host, portForward.Port))
} else {
dialer := net.Dialer{}

View File

@ -0,0 +1,111 @@
package base
import (
"sync/atomic"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
mapset "github.com/deckarep/golang-set/v2"
"go.uber.org/zap"
)
type VM struct {
// Backward compatibility with v1.VM specification's "Status" field
//
// "started" is always true after the first "tart run",
// whereas ConditionReady can be used to tell if a VM
// is really running or not.
started atomic.Bool
// A more orthogonal alternative to v1.VM specification's "Status" field,
// which allows a VM to have more than one state.
//
// For example, a VM can be both in ConditionReady and ConditionSuspending/
// ConditionStopping states for a short time. This way in run() we know
// that we're in a process of rebooting a VM, so we can avoid throwing
// an error about unexpected VM termination.
conditions mapset.Set[v1.ConditionType]
statusMessage atomic.Pointer[string]
err atomic.Pointer[error]
logger *zap.SugaredLogger
}
func NewVM(logger *zap.SugaredLogger) *VM {
return &VM{
conditions: mapset.NewSet(v1.ConditionTypeCloning),
logger: logger,
}
}
func (vm *VM) SetStarted(val bool) {
vm.started.Store(val)
}
func (vm *VM) Status() v1.VMStatus {
if vm.Err() != nil {
return v1.VMStatusFailed
}
if vm.started.Load() {
return v1.VMStatusRunning
}
return v1.VMStatusPending
}
func (vm *VM) StatusMessage() string {
status := vm.statusMessage.Load()
if status != nil {
return *status
}
return ""
}
func (vm *VM) SetStatusMessage(status string) {
vm.logger.Debugf(status)
vm.statusMessage.Store(&status)
}
func (vm *VM) Err() error {
if err := vm.err.Load(); err != nil {
return *err
}
return nil
}
func (vm *VM) SetErr(err error) {
if vm.err.CompareAndSwap(nil, &err) {
vm.logger.Error(err)
}
}
func (vm *VM) ConditionsSet() mapset.Set[v1.ConditionType] {
return vm.conditions
}
func (vm *VM) Conditions() []v1.Condition {
// Only expose a minimum amount of conditions necessary
// for the Orchard Controller to make decisions
return []v1.Condition{
vm.conditionTypeToCondition(v1.ConditionTypeRunning),
}
}
func (vm *VM) conditionTypeToCondition(conditionType v1.ConditionType) v1.Condition {
var conditionState v1.ConditionState
if vm.ConditionsSet().ContainsOne(conditionType) {
conditionState = v1.ConditionStateTrue
} else {
conditionState = v1.ConditionStateFalse
}
return v1.Condition{
Type: conditionType,
State: conditionState,
}
}

View File

@ -0,0 +1,202 @@
package synthetic
import (
"context"
"math/rand"
"strings"
"sync"
"time"
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/internal/worker/vmmanager/base"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)
type VM struct {
onDiskName ondiskname.OnDiskName
resource v1.VM
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger *zap.SugaredLogger
*base.VM
}
func NewVM(
vmResource v1.VM,
eventStreamer *client.EventStreamer,
vmPullTimeHistogram metric.Float64Histogram,
logger *zap.SugaredLogger,
) *VM {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.With(
"vm_uid", vmResource.UID,
"vm_name", vmResource.Name,
"vm_restart_count", vmResource.RestartCount,
)
vm := &VM{
onDiskName: ondiskname.NewFromResource(vmResource),
resource: vmResource,
ctx: ctx,
cancel: cancel,
logger: logger,
VM: base.NewVM(logger),
}
vm.wg.Add(1)
go func() {
defer vm.wg.Done()
if vmResource.ImagePullPolicy == v1.ImagePullPolicyAlways {
vm.SetStatusMessage("pulling VM image...")
pullStartedAt := time.Now()
// Pull
time.Sleep(randomDelay())
vmPullTimeHistogram.Record(vm.ctx, time.Since(pullStartedAt).Seconds(), metric.WithAttributes(
attribute.String("worker", vm.resource.Worker),
attribute.String("image", vm.resource.Image),
))
}
// Clone and configure
time.Sleep(randomDelay())
// Backward compatibility with v1.VM specification's "Status" field
vm.SetStarted(true)
vm.ConditionsSet().Add(v1.ConditionTypeRunning)
vm.run(vm.ctx, eventStreamer)
}()
return vm
}
func (vm *VM) Resource() v1.VM {
return vm.resource
}
func (vm *VM) SetResource(vmResource v1.VM) {
vm.resource = vmResource
vm.resource.ObservedGeneration = vmResource.Generation
}
func (vm *VM) OnDiskName() ondiskname.OnDiskName {
return vm.onDiskName
}
func (vm *VM) ImageFQN() *string {
return nil
}
func (vm *VM) Start(eventStreamer *client.EventStreamer) {
vm.SetStatusMessage("Starting VM")
vm.ConditionsSet().Add(v1.ConditionTypeRunning)
vm.cancel()
vm.ctx, vm.cancel = context.WithCancel(context.Background())
vm.wg.Add(1)
go func() {
defer vm.wg.Done()
vm.run(vm.ctx, eventStreamer)
}()
}
func (vm *VM) Suspend() <-chan error {
errChan := make(chan error, 1)
errChan <- nil
return errChan
}
func (vm *VM) IP(ctx context.Context) (string, error) {
time.Sleep(randomDelay())
return "127.0.0.1", nil
}
func (vm *VM) Stop() <-chan error {
errChan := make(chan error, 1)
errChan <- nil
return errChan
}
func (vm *VM) Delete() error {
vm.cancel()
if vm.ConditionsSet().Contains(v1.ConditionTypeCloning) {
// Not cloned yet, nothing to delete
return nil
}
time.Sleep(randomDelay())
return nil
}
func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) {
defer vm.ConditionsSet().RemoveAll(v1.ConditionTypeRunning, v1.ConditionTypeSuspending, v1.ConditionTypeStopping)
// Launch the startup script goroutine as close as possible
// to the VM startup (below) to avoid "tart ip" timing out
if vm.resource.StartupScript != nil {
vm.SetStatusMessage("VM started, running startup script...")
go vm.runScript(vm.resource.StartupScript, eventStreamer)
} else {
vm.SetStatusMessage("VM started")
}
<-ctx.Done()
}
func (vm *VM) runScript(script *v1.VMScript, eventStreamer *client.EventStreamer) {
if eventStreamer != nil {
defer func() {
if err := eventStreamer.Close(); err != nil {
vm.logger.Errorf("errored during streaming events for startup script: %v", err)
}
}()
}
consumeLine := func(line string) {
if eventStreamer == nil {
return
}
eventStreamer.Stream(v1.Event{
Kind: v1.EventKindLogLine,
Timestamp: time.Now().Unix(),
Payload: line,
})
}
for line := range strings.Lines(script.ScriptContent) {
consumeLine(line)
randomDelay()
}
}
func randomDelay() time.Duration {
const jitterBaseDelay = 2500 * time.Millisecond
return time.Duration(rand.Float64() * float64(jitterBaseDelay))
}

View File

@ -14,11 +14,11 @@ import (
"time"
"github.com/avast/retry-go"
"github.com/cirruslabs/chacha/pkg/localnetworkhelper"
"github.com/cirruslabs/orchard/internal/dialer"
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/internal/worker/vmmanager/base"
"github.com/cirruslabs/orchard/pkg/client"
"github.com/cirruslabs/orchard/pkg/resource/v1"
mapset "github.com/deckarep/golang-set/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
@ -32,41 +32,24 @@ type VM struct {
resource v1.VM
logger *zap.SugaredLogger
// Backward compatibility with v1.VM specification's "Status" field
//
// "started" is always true after the first "tart run",
// whereas ConditionReady can be used to tell if a VM
// is really running or not.
started atomic.Bool
// A more orthogonal alternative to v1.VM specification's "Status" field,
// which allows a VM to have more than one state.
//
// For example, a VM can be both in ConditionReady and ConditionSuspending/
// ConditionStopping states for a short time. This way in run() we know
// that we're in a process of rebooting a VM, so we can avoid throwing
// an error about unexpected VM termination.
conditions mapset.Set[v1.ConditionType]
// Image FQN feature, see https://github.com/cirruslabs/orchard/issues/164
imageFQN atomic.Pointer[string]
statusMessage atomic.Pointer[string]
err atomic.Pointer[error]
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
localNetworkHelper *localnetworkhelper.LocalNetworkHelper
dialer dialer.Dialer
*base.VM
}
func NewVM(
vmResource v1.VM,
eventStreamer *client.EventStreamer,
vmPullTimeHistogram metric.Float64Histogram,
localNetworkHelper *localnetworkhelper.LocalNetworkHelper,
dialer dialer.Dialer,
logger *zap.SugaredLogger,
) *VM {
vmContext, vmContextCancel := context.WithCancel(context.Background())
@ -80,14 +63,14 @@ func NewVM(
"vm_restart_count", vmResource.RestartCount,
),
conditions: mapset.NewSet(v1.ConditionTypeCloning),
ctx: vmContext,
cancel: vmContextCancel,
wg: &sync.WaitGroup{},
localNetworkHelper: localNetworkHelper,
dialer: dialer,
VM: base.NewVM(logger),
}
vm.wg.Add(1)
@ -96,7 +79,7 @@ func NewVM(
defer vm.wg.Done()
if vmResource.ImagePullPolicy == v1.ImagePullPolicyAlways {
vm.setStatusMessage("pulling VM image...")
vm.SetStatusMessage("pulling VM image...")
pullStartedAt := time.Now()
@ -106,7 +89,7 @@ func NewVM(
case <-vm.ctx.Done():
// Do not return an error because it's the user's intent to cancel this VM operation
default:
vm.setErr(fmt.Errorf("failed to pull the VM: %w", err))
vm.SetErr(fmt.Errorf("failed to pull the VM: %w", err))
}
return
@ -123,16 +106,16 @@ func NewVM(
case <-vm.ctx.Done():
// Do not return an error because it's the user's intent to cancel this VM operation
default:
vm.setErr(fmt.Errorf("failed to clone the VM: %w", err))
vm.SetErr(fmt.Errorf("failed to clone the VM: %w", err))
}
return
}
// Backward compatibility with v1.VM specification's "Status" field
vm.started.Store(true)
vm.SetStarted(true)
vm.conditions.Add(v1.ConditionTypeRunning)
vm.ConditionsSet().Add(v1.ConditionTypeRunning)
vm.run(vm.ctx, eventStreamer)
}()
@ -161,79 +144,15 @@ func (vm *VM) id() string {
return vm.onDiskName.String()
}
func (vm *VM) Status() v1.VMStatus {
if vm.Err() != nil {
return v1.VMStatusFailed
}
if vm.started.Load() {
return v1.VMStatusRunning
}
return v1.VMStatusPending
}
func (vm *VM) StatusMessage() string {
status := vm.statusMessage.Load()
if status != nil {
return *status
}
return ""
}
func (vm *VM) setStatusMessage(status string) {
vm.logger.Debugf(status)
vm.statusMessage.Store(&status)
}
func (vm *VM) Err() error {
if err := vm.err.Load(); err != nil {
return *err
}
return nil
}
func (vm *VM) setErr(err error) {
if vm.err.CompareAndSwap(nil, &err) {
vm.logger.Error(err)
}
}
func (vm *VM) Conditions() []v1.Condition {
// Only expose a minimum amount of conditions necessary
// for the Orchard Controller to make decisions
return []v1.Condition{
vm.conditionTypeToCondition(v1.ConditionTypeRunning),
}
}
func (vm *VM) conditionTypeToCondition(conditionType v1.ConditionType) v1.Condition {
var conditionState v1.ConditionState
if vm.conditions.ContainsOne(conditionType) {
conditionState = v1.ConditionStateTrue
} else {
conditionState = v1.ConditionStateFalse
}
return v1.Condition{
Type: conditionType,
State: conditionState,
}
}
func (vm *VM) cloneAndConfigure(ctx context.Context) error {
vm.setStatusMessage("cloning VM...")
vm.SetStatusMessage("cloning VM...")
_, _, err := Tart(ctx, vm.logger, "clone", vm.resource.Image, vm.id())
if err != nil {
return err
}
vm.conditions.Remove(v1.ConditionTypeCloning)
vm.ConditionsSet().Remove(v1.ConditionTypeCloning)
// Image FQN feature, see https://github.com/cirruslabs/orchard/issues/164
fqnRaw, _, err := Tart(ctx, vm.logger, "fqn", vm.resource.Image)
@ -243,7 +162,7 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error {
}
// Set memory
vm.setStatusMessage("configuring VM...")
vm.SetStatusMessage("configuring VM...")
memory := vm.resource.AssignedMemory
@ -356,16 +275,16 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error {
}
func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) {
defer vm.conditions.RemoveAll(v1.ConditionTypeRunning, v1.ConditionTypeSuspending, v1.ConditionTypeStopping)
defer vm.ConditionsSet().RemoveAll(v1.ConditionTypeRunning, v1.ConditionTypeSuspending, v1.ConditionTypeStopping)
// Launch the startup script goroutine as close as possible
// to the VM startup (below) to avoid "tart ip" timing out
if vm.resource.StartupScript != nil {
vm.setStatusMessage("VM started, running startup script...")
vm.SetStatusMessage("VM started, running startup script...")
go vm.runScript(vm.resource.StartupScript, eventStreamer)
} else {
vm.setStatusMessage("VM started")
vm.SetStatusMessage("VM started")
}
var runArgs = []string{"run"}
@ -406,7 +325,7 @@ func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) {
case <-vm.ctx.Done():
// Do not return an error because it's the user's intent to cancel this VM
default:
vm.setErr(fmt.Errorf("%w: %v", ErrVMFailed, err))
vm.SetErr(fmt.Errorf("%w: %v", ErrVMFailed, err))
}
return
@ -416,8 +335,8 @@ func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) {
case <-vm.ctx.Done():
// Do not return an error because it's the user's intent to cancel this VM
default:
if !vm.conditions.ContainsAny(v1.ConditionTypeSuspending, v1.ConditionTypeStopping) {
vm.setErr(fmt.Errorf("%w: VM exited unexpectedly", ErrVMFailed))
if !vm.ConditionsSet().ContainsAny(v1.ConditionTypeSuspending, v1.ConditionTypeStopping) {
vm.SetErr(fmt.Errorf("%w: VM exited unexpectedly", ErrVMFailed))
}
}
}
@ -462,14 +381,14 @@ func (vm *VM) Suspend() <-chan error {
// VM is still running
}
vm.setStatusMessage("Suspending VM")
vm.conditions.Add(v1.ConditionTypeSuspending)
vm.SetStatusMessage("Suspending VM")
vm.ConditionsSet().Add(v1.ConditionTypeSuspending)
go func() {
_, _, err := Tart(context.Background(), zap.NewNop().Sugar(), "suspend", vm.id())
if err != nil {
err := fmt.Errorf("failed to suspend VM: %w", err)
vm.setErr(err)
vm.SetErr(err)
errCh <- err
return
@ -494,8 +413,8 @@ func (vm *VM) Stop() <-chan error {
// VM is still running
}
vm.setStatusMessage("Stopping VM")
vm.conditions.Add(v1.ConditionTypeStopping)
vm.SetStatusMessage("Stopping VM")
vm.ConditionsSet().Add(v1.ConditionTypeStopping)
go func() {
// Try to gracefully terminate the VM
@ -513,8 +432,8 @@ func (vm *VM) Stop() <-chan error {
}
func (vm *VM) Start(eventStreamer *client.EventStreamer) {
vm.setStatusMessage("Starting VM")
vm.conditions.Add(v1.ConditionTypeRunning)
vm.SetStatusMessage("Starting VM")
vm.ConditionsSet().Add(v1.ConditionTypeRunning)
vm.cancel()
@ -533,7 +452,7 @@ func (vm *VM) Delete() error {
// (e.g. "tart clone", "tart run", etc.)
vm.cancel()
if vm.conditions.Contains(v1.ConditionTypeCloning) {
if vm.ConditionsSet().Contains(v1.ConditionTypeCloning) {
// Not cloned yet, nothing to delete
return nil
}
@ -586,8 +505,8 @@ func (vm *VM) shell(
var netConn net.Conn
if vm.localNetworkHelper != nil {
netConn, err = vm.localNetworkHelper.PrivilegedDialContext(dialCtx, "tcp", addr)
if vm.dialer != nil {
netConn, err = vm.dialer.DialContext(dialCtx, "tcp", addr)
} else {
dialer := net.Dialer{}
@ -691,6 +610,6 @@ func (vm *VM) runScript(script *v1.VMScript, eventStreamer *client.EventStreamer
err := vm.shell(vm.ctx, vm.resource.Username, vm.resource.Password,
script.ScriptContent, script.Env, consumeLine)
if err != nil {
vm.setErr(fmt.Errorf("%w: failed to run startup script: %v", ErrVMFailed, err))
vm.SetErr(fmt.Errorf("%w: failed to run startup script: %v", ErrVMFailed, err))
}
}

View File

@ -6,6 +6,7 @@ import (
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/puzpuzpuz/xsync/v4"
)
type VM interface {
@ -26,45 +27,47 @@ type VM interface {
}
type VMManager struct {
vms map[ondiskname.OnDiskName]VM
vms *xsync.Map[ondiskname.OnDiskName, VM]
}
func New() *VMManager {
return &VMManager{
vms: map[ondiskname.OnDiskName]VM{},
vms: xsync.NewMap[ondiskname.OnDiskName, VM](),
}
}
func (vmm *VMManager) Exists(key ondiskname.OnDiskName) bool {
_, ok := vmm.vms[key]
_, ok := vmm.vms.Load(key)
return ok
}
func (vmm *VMManager) Get(key ondiskname.OnDiskName) (VM, bool) {
vm, ok := vmm.vms[key]
vm, ok := vmm.vms.Load(key)
return vm, ok
}
func (vmm *VMManager) Put(key ondiskname.OnDiskName, vm VM) {
vmm.vms[key] = vm
vmm.vms.Store(key, vm)
}
func (vmm *VMManager) Delete(key ondiskname.OnDiskName) {
delete(vmm.vms, key)
vmm.vms.Delete(key)
}
func (vmm *VMManager) Len() int {
return len(vmm.vms)
return vmm.vms.Size()
}
func (vmm *VMManager) List() []VM {
var vms []VM
for _, vm := range vmm.vms {
vmm.vms.Range(func(odn ondiskname.OnDiskName, vm VM) bool {
vms = append(vms, vm)
}
return true
})
return vms
}

View File

@ -9,12 +9,13 @@ import (
"time"
"github.com/avast/retry-go/v4"
"github.com/cirruslabs/chacha/pkg/localnetworkhelper"
"github.com/cirruslabs/orchard/internal/dialer"
"github.com/cirruslabs/orchard/internal/opentelemetry"
"github.com/cirruslabs/orchard/internal/worker/dhcpleasetime"
"github.com/cirruslabs/orchard/internal/worker/iokitregistry"
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
"github.com/cirruslabs/orchard/internal/worker/vmmanager"
"github.com/cirruslabs/orchard/internal/worker/vmmanager/synthetic"
"github.com/cirruslabs/orchard/internal/worker/vmmanager/tart"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
@ -47,9 +48,11 @@ type Worker struct {
defaultCPU uint64
defaultMemory uint64
synthetic bool
vmPullTimeHistogram metric.Float64Histogram
localNetworkHelper *localnetworkhelper.LocalNetworkHelper
dialer dialer.Dialer
logger *zap.SugaredLogger
}
@ -69,12 +72,12 @@ func New(client *client.Client, opts ...Option) (*Worker, error) {
// Apply defaults
if worker.name == "" {
hostname, err := os.Hostname()
name, err := DefaultName()
if err != nil {
return nil, err
}
worker.name = hostname
worker.name = name
}
defaultResources := v1.Resources{
@ -225,6 +228,10 @@ func (worker *Worker) runNewSession(ctx context.Context) error {
}
}
func DefaultName() (string, error) {
return os.Hostname()
}
func (worker *Worker) registerWorker(ctx context.Context) error {
platformUUID, err := iokitregistry.PlatformUUID()
if err != nil {
@ -459,6 +466,11 @@ func (worker *Worker) syncVMs(ctx context.Context, updateVM func(context.Context
//nolint:nestif,gocognit // complexity is tolerable for now
func (worker *Worker) syncOnDiskVMs(ctx context.Context) error {
if worker.synthetic {
// There's no on-disk VMs when using synthetic VMs
return nil
}
remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name)
if err != nil {
return err
@ -536,8 +548,14 @@ func (worker *Worker) deleteVM(vm vmmanager.VM) error {
func (worker *Worker) createVM(odn ondiskname.OnDiskName, vmResource v1.VM) {
eventStreamer := worker.client.VMs().StreamEvents(vmResource.Name)
vm := tart.NewVM(vmResource, eventStreamer, worker.vmPullTimeHistogram,
worker.localNetworkHelper, worker.logger)
var vm vmmanager.VM
if worker.synthetic {
vm = synthetic.NewVM(vmResource, eventStreamer, worker.vmPullTimeHistogram, worker.logger)
} else {
vm = tart.NewVM(vmResource, eventStreamer, worker.vmPullTimeHistogram,
worker.dialer, worker.logger)
}
worker.vmm.Put(odn, vm)
}

View File

@ -16,6 +16,7 @@ import (
"time"
"github.com/cirruslabs/orchard/internal/config"
"github.com/cirruslabs/orchard/internal/dialer"
"github.com/cirruslabs/orchard/internal/version"
"github.com/cirruslabs/orchard/rpc"
"github.com/coder/websocket"
@ -42,7 +43,7 @@ type Client struct {
serviceAccountName string
serviceAccountToken string
dialContext func(ctx context.Context, network, addr string) (net.Conn, error)
dialer dialer.Dialer
}
type Config struct {
@ -84,8 +85,8 @@ func New(opts ...Option) (*Client, error) {
TLSClientConfig: client.tlsConfig,
}
if client.dialContext != nil {
transport.DialContext = client.dialContext
if client.dialer != nil {
transport.DialContext = client.dialer.DialContext
}
client.httpClient = &http.Client{

View File

@ -1,9 +1,9 @@
package client
import (
"context"
"crypto/x509"
"net"
"github.com/cirruslabs/orchard/internal/dialer"
)
type Option func(*Client)
@ -27,8 +27,8 @@ func WithCredentials(serviceAccountName string, serviceAccountToken string) Opti
}
}
func WithDialContext(dialContext func(ctx context.Context, network, addr string) (net.Conn, error)) Option {
func WithDialer(dialer dialer.Dialer) Option {
return func(client *Client) {
client.dialContext = dialContext
client.dialer = dialer
}
}

120
test/load/vm-lifecycle.js Normal file
View File

@ -0,0 +1,120 @@
import {check} from 'k6';
import http from 'k6/http';
import {WebSocket} from 'k6/experimental/websockets';
import crypto from 'k6/crypto';
import {uuidv4} from 'https://jslib.k6.io/k6-utils/1.6.0/index.js';
import {expect} from 'https://jslib.k6.io/k6-testing/0.6.1/index.js';
const BASE_URL = __ENV.ORCHARD_BASE_URL || 'http://127.0.0.1:6120/v1';
const WS_BASE_URL = __ENV.ORCHARD_WS_BASE_URL || BASE_URL.replace(/^http/, 'ws');
const WS_BYTES = __ENV.WS_BYTES || 64 * 1024;
export const options = {
scenarios: {
vmLifecycle: {
executor: 'ramping-vus',
stages: [
{duration: '5m', target: 2_000},
],
},
},
};
export default async function () {
const vmName = `k6-${uuidv4()}`;
createVM(vmName);
await portForward(vmName);
deleteVM(vmName);
}
function createVM(vmName) {
const loremIpsum = `Lorem ipsum
dolor sit amet,
consectetur adipiscing elit.
Fusce at orci nisi.
Donec lacinia neque et risus elementum,
ut interdum lacus pretium.
`;
const body = JSON.stringify({
name: vmName,
image: 'ghcr.io/cirruslabs/macos-tahoe-base:latest',
headless: true,
startup_script: {
script_content: loremIpsum
},
});
const resp = http.post(`${BASE_URL}/vms`, body, {
headers: {
'Content-Type': 'application/json',
},
});
const ok = check(resp, {
'VM creation succeeded': (r) => r.status === 200,
});
if (!ok) {
console.error(`Failed to create a VM: HTTP ${resp.status}`);
}
}
async function portForward(vmName) {
const url = `${WS_BASE_URL}/vms/${vmName}/port-forward?port=22&wait=60`;
console.debug(`connecting to ${url}`);
const ws = new WebSocket(url);
ws.binaryType = 'arraybuffer';
const sentBytes = new Uint8Array(crypto.randomBytes(WS_BYTES));
let receivedBytes = new Uint8Array();
const evt = await new Promise((resolve, reject) => {
ws.onopen = () => {
ws.send(sentBytes);
};
ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
const chunk = new Uint8Array(event.data);
const combined = new Uint8Array(receivedBytes.length + chunk.length);
combined.set(receivedBytes);
combined.set(chunk, receivedBytes.length);
receivedBytes = combined;
}
if (receivedBytes.length >= WS_BYTES) {
ws.close();
}
};
ws.onerror = (evt) => {
reject(new Error(`WebSocket error: ${evt.error}`));
};
ws.onclose = (evt) => {
resolve(evt);
};
});
expect(evt.code).toBe(1000);
expect(receivedBytes).toEqual(sentBytes);
}
function deleteVM(vmName) {
const resp = http.del(`${BASE_URL}/vms/${vmName}`);
const ok = check(resp, {
'VM deletion succeeded': (r) => r.status === 200,
});
if (!ok) {
console.error(`Failed to delete a VM: HTTP ${resp.status}`);
}
}