API: introduce ability to watch a VM (#351)

* API: introduce ability to watch a VM

* Document ?watch=true for GET /vms/{name} in the OpenAPI specification

* WatchVM: ensure that goroutine is terminated on early return with error

* WatchVM: close channels on goroutine exit

* WatchVM: ensure that we wait for the goroutine after additional barriers

* WatchVM: ignore unexpected keys instead of throwing an error

* WatchVM: perform context-aware writes to a bounded channel

* WatchVM: don't forget to close errCh on goroutine exit too

* WatchVM: don't close readyCh in goroutine to avoid ambiguity

* WatchVM: filter out spurious KVs that signify VM deletion
This commit is contained in:
Nikolay Edigaryev 2025-10-03 19:34:53 +02:00 committed by GitHub
parent cdece3149b
commit c5e0d68a3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 458 additions and 6 deletions

View File

@ -256,9 +256,15 @@ paths:
parameters:
- in: path
name: name
description: VM name to retrieve
required: true
schema:
type: string
- in: query
name: watch
description: Watch for changes a VM resource and return them a stream of ADDED, MODIFIED and DELETED notifications
schema:
type: boolean
get:
summary: "Retrieve a VM"
tags:
@ -270,6 +276,15 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/VM'
application/x-ndjson:
schema:
type: object
properties:
type:
type: string
enum: [ADDED, MODIFIED, DELETED]
object:
$ref: '#/components/schemas/VM'
'404':
description: VM resource with the given name doesn't exist
put:

View File

@ -4,6 +4,9 @@ import (
"context"
"crypto/subtle"
"errors"
"net/http"
"strings"
"github.com/cirruslabs/orchard/api"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/responder"
@ -16,8 +19,6 @@ import (
"github.com/penglongli/gin-metrics/ginmetrics"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
"net/http"
"strings"
)
const ctxServiceAccountKey = "service-account"

View File

@ -1,7 +1,11 @@
package controller
import (
"encoding/json"
"errors"
"net/http"
"time"
"github.com/cirruslabs/orchard/internal/controller/lifecycle"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/responder"
@ -10,8 +14,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/samber/lo"
"net/http"
"time"
)
func (controller *Controller) createVM(ctx *gin.Context) responder.Responder {
@ -150,6 +152,42 @@ func (controller *Controller) getVM(ctx *gin.Context) responder.Responder {
name := ctx.Param("name")
if ctx.Query("watch") == "true" {
ctx.Header("Content-Type", "application/x-ndjson")
watchCh, errCh, err := controller.store.WatchVM(ctx, name)
if err != nil {
return responder.Error(err)
}
for {
select {
case watchMessage := <-watchCh:
jsonBytes, err := json.Marshal(watchMessage)
if err != nil {
controller.logger.Errorf("failed to marshal watch message "+
"for VM %q to JSON: %v", name, err)
return responder.Empty()
}
if _, err = ctx.Writer.Write(jsonBytes); err != nil {
return responder.Empty()
}
if _, err := ctx.Writer.WriteString("\n"); err != nil {
return responder.Empty()
}
ctx.Writer.Flush()
case err := <-errCh:
controller.logger.Errorf("failed to watch VM %q in the DB: %v", name, err)
return responder.Empty()
case <-ctx.Done():
return responder.Empty()
}
}
}
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
vm, err := txn.GetVM(name)
if err != nil {

View File

@ -3,9 +3,10 @@ package badger
import (
"encoding/json"
"path"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/dgraph-io/badger/v3"
"path"
)
const SpaceVMs = "/vms"
@ -38,6 +39,8 @@ func (txn *Transaction) GetVM(name string) (_ *v1.VM, err error) {
return nil, err
}
vm.Version = item.Version()
return &vm, nil
}
@ -94,6 +97,8 @@ func (txn *Transaction) ListVMs() (_ []v1.VM, err error) {
return nil, err
}
vm.Version = item.Version()
result = append(result, vm)
}

View File

@ -0,0 +1,175 @@
package badger
import (
"bytes"
"context"
"encoding/json"
"errors"
"time"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/pb"
)
func (store *Store) WatchVM(ctx context.Context, vmName string) (chan storepkg.WatchMessage[v1.VM], chan error, error) {
readyCh := make(chan struct{}, 1)
watchCh := make(chan storepkg.WatchMessage[v1.VM], 1)
errCh := make(chan error, 1)
subCtx, subCtxCancel := context.WithCancel(ctx)
go func() {
defer subCtxCancel()
defer close(watchCh)
defer close(errCh)
var initialVM *v1.VM
var checkedInitialVM bool
if err := store.db.Subscribe(subCtx, func(kvList *badger.KVList) error {
if !checkedInitialVM {
// Notify the caller that we've subscribed, but don't block,
// because we may observe multiple watch barriers, yet
// we only need a single barrier to make things work
select {
case readyCh <- struct{}{}:
default:
}
// Now that the subscription has started,
// retrieve the initial VM, if any
err := store.View(func(txn storepkg.Transaction) error {
var err error
initialVM, err = txn.GetVM(vmName)
return err
})
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
return err
}
if initialVM != nil {
notification := storepkg.WatchMessage[v1.VM]{
Type: storepkg.WatchMessageTypeAdded,
Object: *initialVM,
}
select {
case watchCh <- notification:
case <-subCtx.Done():
return subCtx.Err()
}
}
checkedInitialVM = true
}
for _, kv := range kvList.GetKv() {
switch {
case bytes.Equal(kv.GetKey(), WatchBarrierKey()):
// We only need watch barriers so that the Subscribe()'s callback
// is called at least once, thus we can simply do nothing here
case bytes.Equal(kv.GetKey(), VMKey(vmName)):
// Skip all KVs with versions before or equal
// to the initial VM's version, if any
if initialVM != nil && kv.GetVersion() <= initialVM.Version {
continue
}
if kv.GetValue() == nil {
if initialVM == nil {
// VM is already deleted
continue
}
// VM was deleted
notification := storepkg.WatchMessage[v1.VM]{
Type: storepkg.WatchMessageTypeDeleted,
}
select {
case watchCh <- notification:
case <-subCtx.Done():
return subCtx.Err()
}
initialVM = nil
} else {
// VM was created or modified
var vm v1.VM
if err := json.Unmarshal(kv.GetValue(), &vm); err != nil {
return err
}
vm.Version = kv.GetVersion()
var watchMessageType storepkg.WatchMessageType
if initialVM == nil {
watchMessageType = storepkg.WatchMessageTypeAdded
initialVM = &vm
} else {
watchMessageType = storepkg.WatchMessageTypeModified
}
notification := storepkg.WatchMessage[v1.VM]{
Type: watchMessageType,
Object: vm,
}
select {
case watchCh <- notification:
case <-subCtx.Done():
return subCtx.Err()
}
}
default:
// Ignore unexpected keys
continue
}
}
return nil
}, []pb.Match{
{
Prefix: WatchBarrierKey(),
},
{
Prefix: VMKey(vmName),
},
}); err != nil {
errCh <- err
}
}()
// Trigger the watch barrier so that Subscribe() callback gets invoked
if err := store.notifyWatchBarrier(); err != nil {
subCtxCancel()
return nil, nil, err
}
// Wait for the Subscribe() callback to be invoked
Outer:
for {
select {
case <-readyCh:
// Subscription has started
break Outer
case <-time.After(time.Second):
// Possible race with late goroutine start, re-issue watch barrier
if err := store.notifyWatchBarrier(); err != nil {
subCtxCancel()
return nil, nil, err
}
case <-ctx.Done():
return nil, nil, ctx.Err()
}
}
return watchCh, errCh, nil
}

View File

@ -0,0 +1,18 @@
package badger
import (
"github.com/dgraph-io/badger/v3"
"github.com/google/uuid"
)
const SpaceWatchBarrier = "/watch-barrier"
func WatchBarrierKey() []byte {
return []byte(SpaceWatchBarrier)
}
func (store *Store) notifyWatchBarrier() error {
return store.db.Update(func(txn *badger.Txn) error {
return txn.Set(WatchBarrierKey(), []byte(uuid.NewString()))
})
}

View File

@ -1,10 +1,28 @@
package store
import v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
import (
"context"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
)
type WatchMessageType string
const (
WatchMessageTypeAdded WatchMessageType = "ADDED"
WatchMessageTypeModified WatchMessageType = "MODIFIED"
WatchMessageTypeDeleted WatchMessageType = "DELETED"
)
type WatchMessage[T any] struct {
Type WatchMessageType `json:"type,omitempty"`
Object T `json:"object,omitempty"`
}
type Store interface {
View(cb func(txn Transaction) error) error
Update(cb func(txn Transaction) error) error
WatchVM(ctx context.Context, vmName string) (chan WatchMessage[v1.VM], chan error, error)
}
type Transaction interface {

View File

@ -0,0 +1,179 @@
package store_test
import (
"context"
"fmt"
"testing"
"time"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/controller/store/badger"
"github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestWatchVM(t *testing.T) {
logger := zap.Must(zap.NewDevelopment())
testCases := []struct {
Name string
Run func(t *testing.T, store storepkg.Store)
}{
{
Name: "simple-vm-already-exists",
Run: func(t *testing.T, store storepkg.Store) {
// Create a VM
const vmName = "test"
vm := v1.VM{
Meta: v1.Meta{
Name: vmName,
},
}
err := store.Update(func(txn storepkg.Transaction) error {
return txn.SetVM(vm)
})
require.NoError(t, err)
// Start watching a VM
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
watchCh, errCh, err := store.WatchVM(ctx, vmName)
require.NoError(t, err)
// Ensure that a synthetic VM creation event is emitted
select {
case item := <-watchCh:
require.Equal(t, item.Type, storepkg.WatchMessageTypeAdded)
case err := <-errCh:
require.NoError(t, err)
case <-ctx.Done():
require.FailNow(t, "timed out waiting for ADDED watch event")
}
// Update the VM and ensure that a modification event is emitted
err = store.Update(func(txn storepkg.Transaction) error {
return txn.SetVM(vm)
})
require.NoError(t, err)
select {
case item := <-watchCh:
require.Equal(t, item.Type, storepkg.WatchMessageTypeModified)
case err := <-errCh:
require.NoError(t, err)
case <-ctx.Done():
require.FailNow(t, "timed out waiting for MODIFIED watch event")
}
// Delete the VM and ensure that a deletion event is emitted
err = store.Update(func(txn storepkg.Transaction) error {
return txn.DeleteVM(vmName)
})
require.NoError(t, err)
select {
case item := <-watchCh:
require.Equal(t, item.Type, storepkg.WatchMessageTypeDeleted)
case err := <-errCh:
require.NoError(t, err)
case <-ctx.Done():
require.FailNow(t, "timed out waiting for DELETED watch event")
}
},
},
{
Name: "simple-vm-not-yet-exists",
Run: func(t *testing.T, store storepkg.Store) {
// Start watching a VM
const vmName = "test"
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
watchCh, errCh, err := store.WatchVM(ctx, vmName)
require.NoError(t, err)
// Create a VM
vm := v1.VM{
Meta: v1.Meta{
Name: vmName,
},
}
err = store.Update(func(txn storepkg.Transaction) error {
return txn.SetVM(vm)
})
require.NoError(t, err)
// Ensure that a VM creation event is emitted
select {
case item := <-watchCh:
require.Equal(t, item.Type, storepkg.WatchMessageTypeAdded)
case err := <-errCh:
require.NoError(t, err)
case <-ctx.Done():
require.FailNow(t, "timed out waiting for ADDED watch event")
}
// Update the VM and ensure that a modification event is emitted
err = store.Update(func(txn storepkg.Transaction) error {
return txn.SetVM(vm)
})
require.NoError(t, err)
select {
case item := <-watchCh:
require.Equal(t, item.Type, storepkg.WatchMessageTypeModified)
case err := <-errCh:
require.NoError(t, err)
case <-ctx.Done():
require.FailNow(t, "timed out waiting for MODIFIED watch event")
}
// Delete the VM and ensure that a deletion event is emitted
err = store.Update(func(txn storepkg.Transaction) error {
return txn.DeleteVM(vmName)
})
require.NoError(t, err)
select {
case item := <-watchCh:
require.Equal(t, item.Type, storepkg.WatchMessageTypeDeleted)
case err := <-errCh:
require.NoError(t, err)
case <-ctx.Done():
require.FailNow(t, "timed out waiting for DELETED watch event")
}
},
},
}
storeImpls := []struct {
Name string
Init func() (storepkg.Store, error)
}{
{
Name: "badger",
Init: func() (storepkg.Store, error) {
return badger.NewBadgerStore(t.TempDir(), true, logger.Sugar())
},
},
}
for _, testCase := range testCases {
for _, storeImpl := range storeImpls {
name := fmt.Sprintf("%s-%s", testCase.Name, storeImpl.Name)
t.Run(name, func(t *testing.T) {
store, err := storeImpl.Init()
require.NoError(t, err)
testCase.Run(t, store)
})
}
}
}

View File

@ -16,6 +16,9 @@ type Meta struct {
// It is populated by the Controller with the current time
// when receiving a POST request.
CreatedAt time.Time `json:"createdAt,omitempty"`
// Version is a resource version used internally to implement WatchVM().
Version uint64 `json:"version,omitempty"`
}
type VM struct {