Refactor listing VMs (#399)
* Removed unnesesary ListOptions * Refactor genericList to accept string prefixes instead of byte slices * Optimize VM listing logic with singleflight to deduplicate concurrent request * Refactor VM listing logic: rename variables for clarity and update error messages * fix: address PR review feedback - use singleflight DoChan with context cancellation for list VMs 🤖 Generated with [Codex](https://chatgpt.com/codex) Co-Authored-By: Codex <codex@openai.com> --------- Co-authored-by: Codex <codex@openai.com>
This commit is contained in:
parent
230a83c740
commit
be869f10d4
|
|
@ -299,11 +299,9 @@ func (controller *Controller) listVMs(ctx *gin.Context) responder.Responder {
|
|||
return responder
|
||||
}
|
||||
|
||||
var opts []storepkg.ListOption
|
||||
var filters []v1.Filter
|
||||
|
||||
if filterRaw := ctx.Query("filter"); filterRaw != "" {
|
||||
var filters []v1.Filter
|
||||
|
||||
for _, filterRaw := range strings.Split(filterRaw, ",") {
|
||||
filter, err := v1.NewFilter(filterRaw)
|
||||
if err != nil {
|
||||
|
|
@ -312,23 +310,53 @@ func (controller *Controller) listVMs(ctx *gin.Context) responder.Responder {
|
|||
|
||||
filters = append(filters, filter)
|
||||
}
|
||||
|
||||
if len(filters) > 1 {
|
||||
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("only "+
|
||||
"a single filter is currently supported"))
|
||||
}
|
||||
|
||||
opts = append(opts, storepkg.WithListFilters(filters...))
|
||||
}
|
||||
|
||||
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
|
||||
vms, err := txn.ListVMs(opts...)
|
||||
if err != nil {
|
||||
return responder.Error(err)
|
||||
}
|
||||
resultCh := controller.single.DoChan("list-vms", func() (interface{}, error) {
|
||||
var vms []v1.VM
|
||||
|
||||
return responder.JSON(http.StatusOK, vms)
|
||||
viewErr := controller.store.View(func(txn storepkg.Transaction) (err error) {
|
||||
vms, err = txn.ListVMs()
|
||||
return
|
||||
})
|
||||
|
||||
return vms, viewErr
|
||||
})
|
||||
|
||||
var computedVMs interface{}
|
||||
var err error
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return responder.Empty()
|
||||
case result := <-resultCh:
|
||||
computedVMs = result.Val
|
||||
err = result.Err
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return responder.Error(err)
|
||||
}
|
||||
|
||||
allVMs, ok := computedVMs.([]v1.VM)
|
||||
if !ok {
|
||||
controller.logger.Errorf("failed to compute vms: %T", computedVMs)
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
vms := make([]v1.VM, 0, len(allVMs))
|
||||
|
||||
Outer:
|
||||
for _, vm := range allVMs {
|
||||
for _, filter := range filters {
|
||||
if !vm.Match(filter) {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
vms = append(vms, vm)
|
||||
}
|
||||
|
||||
return responder.JSON(http.StatusOK, vms)
|
||||
}
|
||||
|
||||
func (controller *Controller) deleteVM(ctx *gin.Context) responder.Responder {
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import (
|
|||
"golang.org/x/crypto/ssh"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/h2c"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
|
@ -73,6 +74,8 @@ type Controller struct {
|
|||
sshNoClientAuth bool
|
||||
sshServer *sshserver.SSHServer
|
||||
|
||||
single singleflight.Group
|
||||
|
||||
rpc.UnimplementedControllerServer
|
||||
}
|
||||
|
||||
|
|
@ -83,6 +86,7 @@ func New(opts ...Option) (*Controller, error) {
|
|||
workerOfflineTimeout: 3 * time.Minute,
|
||||
maxWorkersPerLicense: maxWorkersPerDefaultLicense,
|
||||
pingInterval: 30 * time.Second,
|
||||
single: singleflight.Group{},
|
||||
}
|
||||
|
||||
// Apply options
|
||||
|
|
|
|||
|
|
@ -3,8 +3,6 @@ package badger
|
|||
import (
|
||||
"encoding/json"
|
||||
|
||||
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
|
||||
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
)
|
||||
|
||||
|
|
@ -53,30 +51,21 @@ func genericGet[T any, PT interface {
|
|||
|
||||
func genericList[T any, PT interface {
|
||||
SetVersion(uint64)
|
||||
Match(v1.Filter) bool
|
||||
*T
|
||||
}](txn *Transaction, prefix []byte, opts ...storepkg.ListOption) (_ []T, err error) {
|
||||
}](txn *Transaction, prefix string) (_ []T, err error) {
|
||||
defer func() {
|
||||
err = mapErr(err)
|
||||
}()
|
||||
|
||||
// Apply options
|
||||
listInput := &storepkg.ListInput{}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(listInput)
|
||||
}
|
||||
|
||||
// Declare an empty, non-nil slice to
|
||||
// return [] when no objects are found
|
||||
result := []T{}
|
||||
|
||||
it := txn.badgerTxn.NewIterator(badger.IteratorOptions{
|
||||
Prefix: prefix,
|
||||
Prefix: []byte(prefix),
|
||||
})
|
||||
defer it.Close()
|
||||
|
||||
Outer:
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
|
||||
|
|
@ -91,12 +80,6 @@ Outer:
|
|||
return nil, err
|
||||
}
|
||||
|
||||
for _, filter := range listInput.Filters {
|
||||
if !PT(&obj).Match(filter) {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
|
||||
PT(&obj).SetVersion(item.Version())
|
||||
|
||||
result = append(result, obj)
|
||||
|
|
|
|||
|
|
@ -25,5 +25,5 @@ func (txn *Transaction) DeleteServiceAccount(name string) error {
|
|||
}
|
||||
|
||||
func (txn *Transaction) ListServiceAccounts() ([]v1.ServiceAccount, error) {
|
||||
return genericList[v1.ServiceAccount](txn, []byte(SpaceServiceAccounts))
|
||||
return genericList[v1.ServiceAccount](txn, SpaceServiceAccounts)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ package badger
|
|||
import (
|
||||
"path"
|
||||
|
||||
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
|
||||
"github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
)
|
||||
|
||||
|
|
@ -26,6 +25,6 @@ func (txn *Transaction) DeleteVM(name string) error {
|
|||
return genericDelete(txn, VMKey(name))
|
||||
}
|
||||
|
||||
func (txn *Transaction) ListVMs(opts ...storepkg.ListOption) ([]v1.VM, error) {
|
||||
return genericList[v1.VM](txn, []byte(SpaceVMs), opts...)
|
||||
func (txn *Transaction) ListVMs() ([]v1.VM, error) {
|
||||
return genericList[v1.VM](txn, SpaceVMs)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,5 +26,5 @@ func (txn *Transaction) DeleteWorker(name string) error {
|
|||
}
|
||||
|
||||
func (txn *Transaction) ListWorkers() ([]v1.Worker, error) {
|
||||
return genericList[v1.Worker](txn, []byte(SpaceWorkers))
|
||||
return genericList[v1.Worker](txn, SpaceWorkers)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,15 +0,0 @@
|
|||
package store
|
||||
|
||||
import v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
||||
|
||||
type ListInput struct {
|
||||
Filters []v1.Filter
|
||||
}
|
||||
|
||||
type ListOption func(listInput *ListInput)
|
||||
|
||||
func WithListFilters(filters ...v1.Filter) ListOption {
|
||||
return func(listInput *ListInput) {
|
||||
listInput.Filters = filters
|
||||
}
|
||||
}
|
||||
|
|
@ -29,7 +29,7 @@ type Transaction interface {
|
|||
GetVM(name string) (result *v1.VM, err error)
|
||||
SetVM(vm v1.VM) (err error)
|
||||
DeleteVM(name string) (err error)
|
||||
ListVMs(opts ...ListOption) (result []v1.VM, err error)
|
||||
ListVMs() (result []v1.VM, err error)
|
||||
|
||||
GetWorker(name string) (result *v1.Worker, err error)
|
||||
SetWorker(worker v1.Worker) (err error)
|
||||
|
|
|
|||
Loading…
Reference in New Issue