203 lines
4.2 KiB
Go
203 lines
4.2 KiB
Go
package synthetic
|
|
|
|
import (
|
|
"context"
|
|
"math/rand"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cirruslabs/orchard/internal/worker/ondiskname"
|
|
"github.com/cirruslabs/orchard/internal/worker/vmmanager/base"
|
|
"github.com/cirruslabs/orchard/pkg/client"
|
|
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type VM struct {
|
|
onDiskName ondiskname.OnDiskName
|
|
resource v1.VM
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
logger *zap.SugaredLogger
|
|
|
|
*base.VM
|
|
}
|
|
|
|
func NewVM(
|
|
vmResource v1.VM,
|
|
eventStreamer *client.EventStreamer,
|
|
vmPullTimeHistogram metric.Float64Histogram,
|
|
logger *zap.SugaredLogger,
|
|
) *VM {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
logger = logger.With(
|
|
"vm_uid", vmResource.UID,
|
|
"vm_name", vmResource.Name,
|
|
"vm_restart_count", vmResource.RestartCount,
|
|
)
|
|
|
|
vm := &VM{
|
|
onDiskName: ondiskname.NewFromResource(vmResource),
|
|
resource: vmResource,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: logger,
|
|
VM: base.NewVM(logger),
|
|
}
|
|
|
|
vm.wg.Add(1)
|
|
|
|
go func() {
|
|
defer vm.wg.Done()
|
|
|
|
if vmResource.ImagePullPolicy == v1.ImagePullPolicyAlways {
|
|
vm.SetStatusMessage("pulling VM image...")
|
|
|
|
pullStartedAt := time.Now()
|
|
|
|
// Pull
|
|
time.Sleep(randomDelay())
|
|
|
|
vmPullTimeHistogram.Record(vm.ctx, time.Since(pullStartedAt).Seconds(), metric.WithAttributes(
|
|
attribute.String("worker", vm.resource.Worker),
|
|
attribute.String("image", vm.resource.Image),
|
|
))
|
|
}
|
|
|
|
// Clone and configure
|
|
time.Sleep(randomDelay())
|
|
|
|
// Backward compatibility with v1.VM specification's "Status" field
|
|
vm.SetStarted(true)
|
|
|
|
vm.ConditionsSet().Add(v1.ConditionTypeRunning)
|
|
|
|
vm.run(vm.ctx, eventStreamer)
|
|
}()
|
|
|
|
return vm
|
|
}
|
|
|
|
func (vm *VM) Resource() v1.VM {
|
|
return vm.resource
|
|
}
|
|
|
|
func (vm *VM) SetResource(vmResource v1.VM) {
|
|
vm.resource = vmResource
|
|
vm.resource.ObservedGeneration = vmResource.Generation
|
|
}
|
|
|
|
func (vm *VM) OnDiskName() ondiskname.OnDiskName {
|
|
return vm.onDiskName
|
|
}
|
|
|
|
func (vm *VM) ImageFQN() *string {
|
|
return nil
|
|
}
|
|
|
|
func (vm *VM) Start(eventStreamer *client.EventStreamer) {
|
|
vm.SetStatusMessage("Starting VM")
|
|
vm.ConditionsSet().Add(v1.ConditionTypeRunning)
|
|
|
|
vm.cancel()
|
|
|
|
vm.ctx, vm.cancel = context.WithCancel(context.Background())
|
|
vm.wg.Add(1)
|
|
|
|
go func() {
|
|
defer vm.wg.Done()
|
|
|
|
vm.run(vm.ctx, eventStreamer)
|
|
}()
|
|
}
|
|
|
|
func (vm *VM) Suspend() <-chan error {
|
|
errChan := make(chan error, 1)
|
|
|
|
errChan <- nil
|
|
|
|
return errChan
|
|
}
|
|
|
|
func (vm *VM) IP(ctx context.Context) (string, error) {
|
|
time.Sleep(randomDelay())
|
|
|
|
return "127.0.0.1", nil
|
|
}
|
|
|
|
func (vm *VM) Stop() <-chan error {
|
|
errChan := make(chan error, 1)
|
|
|
|
errChan <- nil
|
|
|
|
return errChan
|
|
}
|
|
|
|
func (vm *VM) Delete() error {
|
|
vm.cancel()
|
|
|
|
if vm.ConditionsSet().Contains(v1.ConditionTypeCloning) {
|
|
// Not cloned yet, nothing to delete
|
|
return nil
|
|
}
|
|
|
|
time.Sleep(randomDelay())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (vm *VM) run(ctx context.Context, eventStreamer *client.EventStreamer) {
|
|
defer vm.ConditionsSet().RemoveAll(v1.ConditionTypeRunning, v1.ConditionTypeSuspending, v1.ConditionTypeStopping)
|
|
|
|
// Launch the startup script goroutine as close as possible
|
|
// to the VM startup (below) to avoid "tart ip" timing out
|
|
if vm.resource.StartupScript != nil {
|
|
vm.SetStatusMessage("VM started, running startup script...")
|
|
|
|
go vm.runScript(vm.resource.StartupScript, eventStreamer)
|
|
} else {
|
|
vm.SetStatusMessage("VM started")
|
|
}
|
|
|
|
<-ctx.Done()
|
|
}
|
|
|
|
func (vm *VM) runScript(script *v1.VMScript, eventStreamer *client.EventStreamer) {
|
|
if eventStreamer != nil {
|
|
defer func() {
|
|
if err := eventStreamer.Close(); err != nil {
|
|
vm.logger.Errorf("errored during streaming events for startup script: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
consumeLine := func(line string) {
|
|
if eventStreamer == nil {
|
|
return
|
|
}
|
|
|
|
eventStreamer.Stream(v1.Event{
|
|
Kind: v1.EventKindLogLine,
|
|
Timestamp: time.Now().Unix(),
|
|
Payload: line,
|
|
})
|
|
}
|
|
|
|
for line := range strings.Lines(script.ScriptContent) {
|
|
consumeLine(line)
|
|
|
|
randomDelay()
|
|
}
|
|
}
|
|
|
|
func randomDelay() time.Duration {
|
|
const jitterBaseDelay = 2500 * time.Millisecond
|
|
|
|
return time.Duration(rand.Float64() * float64(jitterBaseDelay))
|
|
}
|