diff --git a/api/openapi.yaml b/api/openapi.yaml index 671c98f..f7eb585 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -256,9 +256,15 @@ paths: parameters: - in: path name: name + description: VM name to retrieve required: true schema: type: string + - in: query + name: watch + description: Watch for changes a VM resource and return them a stream of ADDED, MODIFIED and DELETED notifications + schema: + type: boolean get: summary: "Retrieve a VM" tags: @@ -270,6 +276,15 @@ paths: application/json: schema: $ref: '#/components/schemas/VM' + application/x-ndjson: + schema: + type: object + properties: + type: + type: string + enum: [ADDED, MODIFIED, DELETED] + object: + $ref: '#/components/schemas/VM' '404': description: VM resource with the given name doesn't exist put: diff --git a/internal/controller/api.go b/internal/controller/api.go index 774487c..023b1fa 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -4,6 +4,9 @@ import ( "context" "crypto/subtle" "errors" + "net/http" + "strings" + "github.com/cirruslabs/orchard/api" storepkg "github.com/cirruslabs/orchard/internal/controller/store" "github.com/cirruslabs/orchard/internal/responder" @@ -16,8 +19,6 @@ import ( "github.com/penglongli/gin-metrics/ginmetrics" "go.uber.org/zap" "google.golang.org/grpc/metadata" - "net/http" - "strings" ) const ctxServiceAccountKey = "service-account" diff --git a/internal/controller/api_vms.go b/internal/controller/api_vms.go index 85d1cfc..74e7c09 100644 --- a/internal/controller/api_vms.go +++ b/internal/controller/api_vms.go @@ -1,7 +1,11 @@ package controller import ( + "encoding/json" "errors" + "net/http" + "time" + "github.com/cirruslabs/orchard/internal/controller/lifecycle" storepkg "github.com/cirruslabs/orchard/internal/controller/store" "github.com/cirruslabs/orchard/internal/responder" @@ -10,8 +14,6 @@ import ( "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/samber/lo" - "net/http" - "time" ) func (controller *Controller) createVM(ctx *gin.Context) responder.Responder { @@ -150,6 +152,42 @@ func (controller *Controller) getVM(ctx *gin.Context) responder.Responder { name := ctx.Param("name") + if ctx.Query("watch") == "true" { + ctx.Header("Content-Type", "application/x-ndjson") + + watchCh, errCh, err := controller.store.WatchVM(ctx, name) + if err != nil { + return responder.Error(err) + } + + for { + select { + case watchMessage := <-watchCh: + jsonBytes, err := json.Marshal(watchMessage) + if err != nil { + controller.logger.Errorf("failed to marshal watch message "+ + "for VM %q to JSON: %v", name, err) + + return responder.Empty() + } + + if _, err = ctx.Writer.Write(jsonBytes); err != nil { + return responder.Empty() + } + if _, err := ctx.Writer.WriteString("\n"); err != nil { + return responder.Empty() + } + ctx.Writer.Flush() + case err := <-errCh: + controller.logger.Errorf("failed to watch VM %q in the DB: %v", name, err) + + return responder.Empty() + case <-ctx.Done(): + return responder.Empty() + } + } + } + return controller.storeView(func(txn storepkg.Transaction) responder.Responder { vm, err := txn.GetVM(name) if err != nil { diff --git a/internal/controller/store/badger/badger_vm.go b/internal/controller/store/badger/badger_vm.go index 8a35478..6b8b71d 100644 --- a/internal/controller/store/badger/badger_vm.go +++ b/internal/controller/store/badger/badger_vm.go @@ -3,9 +3,10 @@ package badger import ( "encoding/json" + "path" + "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/dgraph-io/badger/v3" - "path" ) const SpaceVMs = "/vms" @@ -38,6 +39,8 @@ func (txn *Transaction) GetVM(name string) (_ *v1.VM, err error) { return nil, err } + vm.Version = item.Version() + return &vm, nil } @@ -94,6 +97,8 @@ func (txn *Transaction) ListVMs() (_ []v1.VM, err error) { return nil, err } + vm.Version = item.Version() + result = append(result, vm) } diff --git a/internal/controller/store/badger/badger_watch.go b/internal/controller/store/badger/badger_watch.go new file mode 100644 index 0000000..bf5f4d9 --- /dev/null +++ b/internal/controller/store/badger/badger_watch.go @@ -0,0 +1,175 @@ +package badger + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "time" + + storepkg "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v3/pb" +) + +func (store *Store) WatchVM(ctx context.Context, vmName string) (chan storepkg.WatchMessage[v1.VM], chan error, error) { + readyCh := make(chan struct{}, 1) + watchCh := make(chan storepkg.WatchMessage[v1.VM], 1) + errCh := make(chan error, 1) + subCtx, subCtxCancel := context.WithCancel(ctx) + + go func() { + defer subCtxCancel() + defer close(watchCh) + defer close(errCh) + + var initialVM *v1.VM + var checkedInitialVM bool + + if err := store.db.Subscribe(subCtx, func(kvList *badger.KVList) error { + if !checkedInitialVM { + // Notify the caller that we've subscribed, but don't block, + // because we may observe multiple watch barriers, yet + // we only need a single barrier to make things work + select { + case readyCh <- struct{}{}: + default: + } + + // Now that the subscription has started, + // retrieve the initial VM, if any + err := store.View(func(txn storepkg.Transaction) error { + var err error + + initialVM, err = txn.GetVM(vmName) + + return err + }) + if err != nil && !errors.Is(err, storepkg.ErrNotFound) { + return err + } + if initialVM != nil { + notification := storepkg.WatchMessage[v1.VM]{ + Type: storepkg.WatchMessageTypeAdded, + Object: *initialVM, + } + + select { + case watchCh <- notification: + case <-subCtx.Done(): + return subCtx.Err() + } + } + + checkedInitialVM = true + } + + for _, kv := range kvList.GetKv() { + switch { + case bytes.Equal(kv.GetKey(), WatchBarrierKey()): + // We only need watch barriers so that the Subscribe()'s callback + // is called at least once, thus we can simply do nothing here + case bytes.Equal(kv.GetKey(), VMKey(vmName)): + // Skip all KVs with versions before or equal + // to the initial VM's version, if any + if initialVM != nil && kv.GetVersion() <= initialVM.Version { + continue + } + + if kv.GetValue() == nil { + if initialVM == nil { + // VM is already deleted + continue + } + + // VM was deleted + notification := storepkg.WatchMessage[v1.VM]{ + Type: storepkg.WatchMessageTypeDeleted, + } + + select { + case watchCh <- notification: + case <-subCtx.Done(): + return subCtx.Err() + } + + initialVM = nil + } else { + // VM was created or modified + var vm v1.VM + + if err := json.Unmarshal(kv.GetValue(), &vm); err != nil { + return err + } + + vm.Version = kv.GetVersion() + + var watchMessageType storepkg.WatchMessageType + + if initialVM == nil { + watchMessageType = storepkg.WatchMessageTypeAdded + + initialVM = &vm + } else { + watchMessageType = storepkg.WatchMessageTypeModified + } + + notification := storepkg.WatchMessage[v1.VM]{ + Type: watchMessageType, + Object: vm, + } + + select { + case watchCh <- notification: + case <-subCtx.Done(): + return subCtx.Err() + } + } + default: + // Ignore unexpected keys + continue + } + } + + return nil + }, []pb.Match{ + { + Prefix: WatchBarrierKey(), + }, + { + Prefix: VMKey(vmName), + }, + }); err != nil { + errCh <- err + } + }() + + // Trigger the watch barrier so that Subscribe() callback gets invoked + if err := store.notifyWatchBarrier(); err != nil { + subCtxCancel() + + return nil, nil, err + } + + // Wait for the Subscribe() callback to be invoked +Outer: + for { + select { + case <-readyCh: + // Subscription has started + break Outer + case <-time.After(time.Second): + // Possible race with late goroutine start, re-issue watch barrier + if err := store.notifyWatchBarrier(); err != nil { + subCtxCancel() + + return nil, nil, err + } + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } + + return watchCh, errCh, nil +} diff --git a/internal/controller/store/badger/badger_watch_barrier.go b/internal/controller/store/badger/badger_watch_barrier.go new file mode 100644 index 0000000..bb663c1 --- /dev/null +++ b/internal/controller/store/badger/badger_watch_barrier.go @@ -0,0 +1,18 @@ +package badger + +import ( + "github.com/dgraph-io/badger/v3" + "github.com/google/uuid" +) + +const SpaceWatchBarrier = "/watch-barrier" + +func WatchBarrierKey() []byte { + return []byte(SpaceWatchBarrier) +} + +func (store *Store) notifyWatchBarrier() error { + return store.db.Update(func(txn *badger.Txn) error { + return txn.Set(WatchBarrierKey(), []byte(uuid.NewString())) + }) +} diff --git a/internal/controller/store/store.go b/internal/controller/store/store.go index ac04d33..fcb50e6 100644 --- a/internal/controller/store/store.go +++ b/internal/controller/store/store.go @@ -1,10 +1,28 @@ package store -import v1 "github.com/cirruslabs/orchard/pkg/resource/v1" +import ( + "context" + + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" +) + +type WatchMessageType string + +const ( + WatchMessageTypeAdded WatchMessageType = "ADDED" + WatchMessageTypeModified WatchMessageType = "MODIFIED" + WatchMessageTypeDeleted WatchMessageType = "DELETED" +) + +type WatchMessage[T any] struct { + Type WatchMessageType `json:"type,omitempty"` + Object T `json:"object,omitempty"` +} type Store interface { View(cb func(txn Transaction) error) error Update(cb func(txn Transaction) error) error + WatchVM(ctx context.Context, vmName string) (chan WatchMessage[v1.VM], chan error, error) } type Transaction interface { diff --git a/internal/controller/store/store_test.go b/internal/controller/store/store_test.go new file mode 100644 index 0000000..058f08d --- /dev/null +++ b/internal/controller/store/store_test.go @@ -0,0 +1,179 @@ +package store_test + +import ( + "context" + "fmt" + "testing" + "time" + + storepkg "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/cirruslabs/orchard/internal/controller/store/badger" + "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestWatchVM(t *testing.T) { + logger := zap.Must(zap.NewDevelopment()) + + testCases := []struct { + Name string + Run func(t *testing.T, store storepkg.Store) + }{ + { + Name: "simple-vm-already-exists", + Run: func(t *testing.T, store storepkg.Store) { + // Create a VM + const vmName = "test" + + vm := v1.VM{ + Meta: v1.Meta{ + Name: vmName, + }, + } + + err := store.Update(func(txn storepkg.Transaction) error { + return txn.SetVM(vm) + }) + require.NoError(t, err) + + // Start watching a VM + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + watchCh, errCh, err := store.WatchVM(ctx, vmName) + require.NoError(t, err) + + // Ensure that a synthetic VM creation event is emitted + select { + case item := <-watchCh: + require.Equal(t, item.Type, storepkg.WatchMessageTypeAdded) + case err := <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for ADDED watch event") + } + + // Update the VM and ensure that a modification event is emitted + err = store.Update(func(txn storepkg.Transaction) error { + return txn.SetVM(vm) + }) + require.NoError(t, err) + + select { + case item := <-watchCh: + require.Equal(t, item.Type, storepkg.WatchMessageTypeModified) + case err := <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for MODIFIED watch event") + } + + // Delete the VM and ensure that a deletion event is emitted + err = store.Update(func(txn storepkg.Transaction) error { + return txn.DeleteVM(vmName) + }) + require.NoError(t, err) + + select { + case item := <-watchCh: + require.Equal(t, item.Type, storepkg.WatchMessageTypeDeleted) + case err := <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for DELETED watch event") + } + }, + }, + { + Name: "simple-vm-not-yet-exists", + Run: func(t *testing.T, store storepkg.Store) { + // Start watching a VM + const vmName = "test" + + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + watchCh, errCh, err := store.WatchVM(ctx, vmName) + require.NoError(t, err) + + // Create a VM + vm := v1.VM{ + Meta: v1.Meta{ + Name: vmName, + }, + } + + err = store.Update(func(txn storepkg.Transaction) error { + return txn.SetVM(vm) + }) + require.NoError(t, err) + + // Ensure that a VM creation event is emitted + select { + case item := <-watchCh: + require.Equal(t, item.Type, storepkg.WatchMessageTypeAdded) + case err := <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for ADDED watch event") + } + + // Update the VM and ensure that a modification event is emitted + err = store.Update(func(txn storepkg.Transaction) error { + return txn.SetVM(vm) + }) + require.NoError(t, err) + + select { + case item := <-watchCh: + require.Equal(t, item.Type, storepkg.WatchMessageTypeModified) + case err := <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for MODIFIED watch event") + } + + // Delete the VM and ensure that a deletion event is emitted + err = store.Update(func(txn storepkg.Transaction) error { + return txn.DeleteVM(vmName) + }) + require.NoError(t, err) + + select { + case item := <-watchCh: + require.Equal(t, item.Type, storepkg.WatchMessageTypeDeleted) + case err := <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + require.FailNow(t, "timed out waiting for DELETED watch event") + } + }, + }, + } + + storeImpls := []struct { + Name string + Init func() (storepkg.Store, error) + }{ + { + Name: "badger", + Init: func() (storepkg.Store, error) { + return badger.NewBadgerStore(t.TempDir(), true, logger.Sugar()) + }, + }, + } + + for _, testCase := range testCases { + for _, storeImpl := range storeImpls { + name := fmt.Sprintf("%s-%s", testCase.Name, storeImpl.Name) + + t.Run(name, func(t *testing.T) { + store, err := storeImpl.Init() + require.NoError(t, err) + + testCase.Run(t, store) + }) + } + } +} diff --git a/pkg/resource/v1/v1.go b/pkg/resource/v1/v1.go index 8164010..4a7eff7 100644 --- a/pkg/resource/v1/v1.go +++ b/pkg/resource/v1/v1.go @@ -16,6 +16,9 @@ type Meta struct { // It is populated by the Controller with the current time // when receiving a POST request. CreatedAt time.Time `json:"createdAt,omitempty"` + + // Version is a resource version used internally to implement WatchVM(). + Version uint64 `json:"version,omitempty"` } type VM struct {