diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/internal/controller/api.go b/internal/controller/api.go index 3df0805..a0d522f 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -7,9 +7,6 @@ import ( "net/http" ) -type storeTxFunc func(cb func(txn *storepkg.Txn) error) error -type apiTxFunc func(txn *storepkg.Txn) responder.Responder - func (controller *Controller) initAPI() *gin.Engine { gin.SetMode(gin.DebugMode) ginEngine := gin.Default() @@ -59,19 +56,26 @@ func (controller *Controller) initAPI() *gin.Engine { return ginEngine } -func (controller *Controller) storeView(cb apiTxFunc) responder.Responder { - return mapTxFuncs(controller.store.View, cb) +type storeTransactionFunc func(operation func(txn storepkg.Transaction) error) error + +func (controller *Controller) storeView(view func(txn storepkg.Transaction) responder.Responder) responder.Responder { + return adaptResponderToStoreOperation(controller.store.View, view) } -func (controller *Controller) storeUpdate(cb apiTxFunc) responder.Responder { - return mapTxFuncs(controller.store.Update, cb) +func (controller *Controller) storeUpdate( + update func(txn storepkg.Transaction) responder.Responder, +) responder.Responder { + return adaptResponderToStoreOperation(controller.store.Update, update) } -func mapTxFuncs(txFunc storeTxFunc, cb apiTxFunc) responder.Responder { +func adaptResponderToStoreOperation( + storeOperation storeTransactionFunc, + responderOperation func(txn storepkg.Transaction) responder.Responder, +) responder.Responder { var result responder.Responder - if err := txFunc(func(txn *storepkg.Txn) error { - result = cb(txn) + if err := storeOperation(func(txn storepkg.Transaction) error { + result = responderOperation(txn) return nil }); err != nil { diff --git a/internal/controller/api_vms.go b/internal/controller/api_vms.go index 44a6b3a..3dd8c3a 100644 --- a/internal/controller/api_vms.go +++ b/internal/controller/api_vms.go @@ -28,7 +28,7 @@ func (controller *Controller) createVM(ctx *gin.Context) responder.Responder { vm.UID = uuid.New().String() vm.Generation = 0 - return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder { + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { // Does the VM resource with this name already exists? _, err := txn.GetVM(vm.Name) if !errors.Is(err, storepkg.ErrNotFound) { @@ -54,14 +54,10 @@ func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder { return responder.Code(http.StatusPreconditionFailed) } - return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder { + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { dbVM, err := txn.GetVM(userVM.Name) if err != nil { - if errors.Is(err, storepkg.ErrNotFound) { - return responder.Code(http.StatusNotFound) - } - - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } dbVM.Status = userVM.Status @@ -78,14 +74,10 @@ func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder { func (controller *Controller) getVM(ctx *gin.Context) responder.Responder { name := ctx.Param("name") - return controller.storeView(func(txn *storepkg.Txn) responder.Responder { + return controller.storeView(func(txn storepkg.Transaction) responder.Responder { vm, err := txn.GetVM(name) if err != nil { - if errors.Is(err, storepkg.ErrNotFound) { - return responder.Code(http.StatusNotFound) - } - - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } return responder.JSON(http.StatusOK, &vm) @@ -93,14 +85,10 @@ func (controller *Controller) getVM(ctx *gin.Context) responder.Responder { } func (controller *Controller) listVMs(_ *gin.Context) responder.Responder { - return controller.storeView(func(txn *storepkg.Txn) responder.Responder { + return controller.storeView(func(txn storepkg.Transaction) responder.Responder { vms, err := txn.ListVMs() if err != nil { - if errors.Is(err, storepkg.ErrNotFound) { - return responder.Code(http.StatusNotFound) - } - - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } return responder.JSON(http.StatusOK, &vms) @@ -111,33 +99,25 @@ func (controller *Controller) deleteVM(ctx *gin.Context) responder.Responder { name := ctx.Param("name") if ctx.Query("force") != "" { - return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder { + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { if err := txn.DeleteVM(name); err != nil { - if errors.Is(err, storepkg.ErrNotFound) { - return responder.Code(http.StatusNotFound) - } - - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } return responder.Code(http.StatusOK) }) } - return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder { + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { vm, err := txn.GetVM(name) if err != nil { - if errors.Is(err, storepkg.ErrNotFound) { - return responder.Code(http.StatusNotFound) - } - - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } vm.DeletedAt = time.Now() if err := txn.SetVM(vm); err != nil { - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } return responder.Code(http.StatusOK) diff --git a/internal/controller/api_workers.go b/internal/controller/api_workers.go index 689a2e4..eb2a900 100644 --- a/internal/controller/api_workers.go +++ b/internal/controller/api_workers.go @@ -31,7 +31,7 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder worker.UID = uuid.New().String() worker.Generation = 0 - return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder { + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { // Does the worker resource with this name already exists? _, err := txn.GetWorker(worker.Name) if !errors.Is(err, storepkg.ErrNotFound) { @@ -39,7 +39,7 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder } if err := txn.SetWorker(&worker); err != nil { - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } return responder.JSON(200, &worker) @@ -53,14 +53,10 @@ func (controller *Controller) updateWorker(ctx *gin.Context) responder.Responder return responder.Code(http.StatusBadRequest) } - return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder { + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { dbWorker, err := txn.GetWorker(userWorker.Name) if err != nil { - if errors.Is(err, storepkg.ErrNotFound) { - return responder.Code(http.StatusNotFound) - } - - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } dbWorker.LastSeen = userWorker.LastSeen @@ -77,14 +73,10 @@ func (controller *Controller) updateWorker(ctx *gin.Context) responder.Responder func (controller *Controller) getWorker(ctx *gin.Context) responder.Responder { name := ctx.Param("name") - return controller.storeView(func(txn *storepkg.Txn) responder.Responder { + return controller.storeView(func(txn storepkg.Transaction) responder.Responder { worker, err := txn.GetWorker(name) if err != nil { - if errors.Is(err, storepkg.ErrNotFound) { - return responder.Code(http.StatusNotFound) - } - - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } return responder.JSON(http.StatusOK, &worker) @@ -92,14 +84,10 @@ func (controller *Controller) getWorker(ctx *gin.Context) responder.Responder { } func (controller *Controller) listWorkers(_ *gin.Context) responder.Responder { - return controller.storeView(func(txn *storepkg.Txn) responder.Responder { + return controller.storeView(func(txn storepkg.Transaction) responder.Responder { workers, err := txn.ListWorkers() if err != nil { - if errors.Is(err, storepkg.ErrNotFound) { - return responder.Code(http.StatusNotFound) - } - - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } return responder.JSON(http.StatusOK, &workers) @@ -109,13 +97,9 @@ func (controller *Controller) listWorkers(_ *gin.Context) responder.Responder { func (controller *Controller) deleteWorker(ctx *gin.Context) responder.Responder { name := ctx.Param("name") - return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder { + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { if err := txn.DeleteWorker(name); err != nil { - if errors.Is(err, storepkg.ErrNotFound) { - return responder.Code(http.StatusNotFound) - } - - return responder.Code(http.StatusInternalServerError) + return responder.Error(err) } return responder.Code(http.StatusOK) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index dfe494e..f0553a2 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" storepkg "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/cirruslabs/orchard/internal/controller/store/badger" "go.uber.org/zap" "net" "net/http" @@ -25,7 +26,7 @@ type Controller struct { tlsConfig *tls.Config listener net.Listener httpServer *http.Server - store *storepkg.Store + store storepkg.Store logger *zap.SugaredLogger } @@ -50,7 +51,7 @@ func New(opts ...Option) (*Controller, error) { } // Instantiate controller - store, err := storepkg.New(controller.dataDir.DBPath()) + store, err := badger.NewBadgerStore(controller.dataDir.DBPath()) if err != nil { return nil, err } diff --git a/internal/controller/scheduler.go b/internal/controller/scheduler.go index 6eb6318..c5a09dd 100644 --- a/internal/controller/scheduler.go +++ b/internal/controller/scheduler.go @@ -9,7 +9,7 @@ import ( const schedulerInterval = 5 * time.Second -func runScheduler(store *storepkg.Store) error { +func runScheduler(store storepkg.Store) error { ticker := time.NewTicker(schedulerInterval) for { @@ -21,12 +21,12 @@ func runScheduler(store *storepkg.Store) error { } } -func runSchedulerInner(store *storepkg.Store) error { +func runSchedulerInner(store storepkg.Store) error { var vms []*v1.VM var workers []*v1.Worker var err error - err = store.View(func(txn *storepkg.Txn) error { + err = store.View(func(txn storepkg.Transaction) error { vms, err = txn.ListVMs() if err != nil { return err @@ -61,7 +61,7 @@ func runSchedulerInner(store *storepkg.Store) error { vm.Worker = worker.Name - err := store.Update(func(txn *storepkg.Txn) error { + err := store.Update(func(txn storepkg.Transaction) error { return txn.SetVM(vm) }) if err != nil { diff --git a/internal/controller/store/badger/badger_store.go b/internal/controller/store/badger/badger_store.go new file mode 100644 index 0000000..9ce1f02 --- /dev/null +++ b/internal/controller/store/badger/badger_store.go @@ -0,0 +1,61 @@ +package badger + +import ( + "errors" + "fmt" + "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/dgraph-io/badger/v3" +) + +type Store struct { + db *badger.DB + store.Store +} + +type Transaction struct { + badgerTxn *badger.Txn + store.Transaction +} + +func NewBadgerStore(dbPath string) (store.Store, error) { + opts := badger.DefaultOptions(dbPath) + + opts.SyncWrites = true + + db, err := badger.Open(opts) + if err != nil { + return nil, err + } + + return &Store{ + db: db, + }, nil +} + +func (store *Store) View(cb func(txn store.Transaction) error) error { + return store.db.View(func(txn *badger.Txn) error { + return cb(&Transaction{ + badgerTxn: txn, + }) + }) +} + +func (store *Store) Update(cb func(txn store.Transaction) error) error { + return store.db.Update(func(txn *badger.Txn) error { + return cb(&Transaction{ + badgerTxn: txn, + }) + }) +} + +func mapErr(err error) error { + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return store.ErrNotFound + } + + return fmt.Errorf("%w: %v", store.ErrStoreFailed, err) + } + + return err +} diff --git a/internal/controller/store/vm.go b/internal/controller/store/badger/badger_vm.go similarity index 83% rename from internal/controller/store/vm.go rename to internal/controller/store/badger/badger_vm.go index 2cdfe73..6fef86d 100644 --- a/internal/controller/store/vm.go +++ b/internal/controller/store/badger/badger_vm.go @@ -1,5 +1,5 @@ //nolint:dupl // maybe we'll figure out how to make DB resource accessors generic in the future -package store +package badger import ( "encoding/json" @@ -14,7 +14,7 @@ func VMKey(name string) []byte { return []byte(path.Join(SpaceVMs, name)) } -func (txn *Txn) GetVM(name string) (result *v1.VM, err error) { +func (txn *Transaction) GetVM(name string) (result *v1.VM, err error) { defer func() { err = mapErr(err) }() @@ -41,7 +41,7 @@ func (txn *Txn) GetVM(name string) (result *v1.VM, err error) { return &vm, nil } -func (txn *Txn) SetVM(vm *v1.VM) (err error) { +func (txn *Transaction) SetVM(vm *v1.VM) (err error) { defer func() { err = mapErr(err) }() @@ -56,7 +56,7 @@ func (txn *Txn) SetVM(vm *v1.VM) (err error) { return txn.badgerTxn.Set(key, valueBytes) } -func (txn *Txn) DeleteVM(name string) (err error) { +func (txn *Transaction) DeleteVM(name string) (err error) { defer func() { err = mapErr(err) }() @@ -66,7 +66,7 @@ func (txn *Txn) DeleteVM(name string) (err error) { return txn.badgerTxn.Delete(key) } -func (txn *Txn) ListVMs() (result []*v1.VM, err error) { +func (txn *Transaction) ListVMs() (result []*v1.VM, err error) { defer func() { err = mapErr(err) }() diff --git a/internal/controller/store/worker.go b/internal/controller/store/badger/badger_worker.go similarity index 82% rename from internal/controller/store/worker.go rename to internal/controller/store/badger/badger_worker.go index 5c35628..47f4024 100644 --- a/internal/controller/store/worker.go +++ b/internal/controller/store/badger/badger_worker.go @@ -1,5 +1,5 @@ //nolint:dupl // maybe we'll figure out how to make DB resource accessors generic in the future -package store +package badger import ( "encoding/json" @@ -14,7 +14,7 @@ func WorkerKey(name string) []byte { return []byte(path.Join(SpaceWorkers, name)) } -func (txn *Txn) GetWorker(name string) (result *v1.Worker, err error) { +func (txn *Transaction) GetWorker(name string) (result *v1.Worker, err error) { defer func() { err = mapErr(err) }() @@ -41,7 +41,7 @@ func (txn *Txn) GetWorker(name string) (result *v1.Worker, err error) { return &worker, nil } -func (txn *Txn) SetWorker(worker *v1.Worker) (err error) { +func (txn *Transaction) SetWorker(worker *v1.Worker) (err error) { defer func() { err = mapErr(err) }() @@ -56,7 +56,7 @@ func (txn *Txn) SetWorker(worker *v1.Worker) (err error) { return txn.badgerTxn.Set(key, valueBytes) } -func (txn *Txn) DeleteWorker(name string) (err error) { +func (txn *Transaction) DeleteWorker(name string) (err error) { defer func() { err = mapErr(err) }() @@ -66,7 +66,7 @@ func (txn *Txn) DeleteWorker(name string) (err error) { return txn.badgerTxn.Delete(key) } -func (txn *Txn) ListWorkers() (result []*v1.Worker, err error) { +func (txn *Transaction) ListWorkers() (result []*v1.Worker, err error) { defer func() { err = mapErr(err) }() diff --git a/internal/controller/store/errors.go b/internal/controller/store/errors.go index 011e4f9..bed3ce4 100644 --- a/internal/controller/store/errors.go +++ b/internal/controller/store/errors.go @@ -2,23 +2,10 @@ package store import ( "errors" - "fmt" - "github.com/dgraph-io/badger/v3" ) var ( - ErrNotFound = errors.New("DB entry not found") - ErrBadgerFailed = errors.New("BadgerDB failed") + ErrNotFound = errors.New("store entry not found") + ErrConflict = errors.New("store conflict") + ErrStoreFailed = errors.New("store failed") ) - -func mapErr(err error) error { - if err != nil { - if errors.Is(err, badger.ErrKeyNotFound) { - return ErrNotFound - } - - return fmt.Errorf("%w: %v", ErrBadgerFailed, err) - } - - return err -} diff --git a/internal/controller/store/store.go b/internal/controller/store/store.go index 71f8a12..78139eb 100644 --- a/internal/controller/store/store.go +++ b/internal/controller/store/store.go @@ -1,24 +1,20 @@ package store -import ( - "github.com/dgraph-io/badger/v3" -) +import v1 "github.com/cirruslabs/orchard/pkg/resource/v1" -type Store struct { - db *badger.DB +type Store interface { + View(cb func(txn Transaction) error) error + Update(cb func(txn Transaction) error) error } -func New(dbPath string) (*Store, error) { - opts := badger.DefaultOptions(dbPath) +type Transaction interface { + GetVM(name string) (result *v1.VM, err error) + SetVM(vm *v1.VM) (err error) + DeleteVM(name string) (err error) + ListVMs() (result []*v1.VM, err error) - opts.SyncWrites = true - - db, err := badger.Open(opts) - if err != nil { - return nil, err - } - - return &Store{ - db: db, - }, nil + GetWorker(name string) (result *v1.Worker, err error) + SetWorker(worker *v1.Worker) (err error) + DeleteWorker(name string) (err error) + ListWorkers() (result []*v1.Worker, err error) } diff --git a/internal/controller/store/txn.go b/internal/controller/store/txn.go deleted file mode 100644 index 2f1cffd..0000000 --- a/internal/controller/store/txn.go +++ /dev/null @@ -1,23 +0,0 @@ -package store - -import "github.com/dgraph-io/badger/v3" - -type Txn struct { - badgerTxn *badger.Txn -} - -func (store *Store) View(cb func(txn *Txn) error) error { - return store.db.View(func(txn *badger.Txn) error { - return cb(&Txn{ - badgerTxn: txn, - }) - }) -} - -func (store *Store) Update(cb func(txn *Txn) error) error { - return store.db.Update(func(txn *badger.Txn) error { - return cb(&Txn{ - badgerTxn: txn, - }) - }) -} diff --git a/internal/responder/code.go b/internal/responder/code.go index f3b5a46..4d3029d 100644 --- a/internal/responder/code.go +++ b/internal/responder/code.go @@ -1,35 +1,20 @@ package responder -import "github.com/gin-gonic/gin" +import ( + "github.com/gin-gonic/gin" +) type CodeResponder struct { - code int - headers map[string]string - - DefaultResponder + code int + Responder } -func Code(code int, opts ...Option) *CodeResponder { - responder := &CodeResponder{ - code: code, - headers: map[string]string{}, +func Code(code int) Responder { + return &CodeResponder{ + code: code, } - - for _, opt := range opts { - opt(responder) - } - - return responder -} - -func (responder *CodeResponder) SetHeader(key string, value string) { - responder.headers[key] = value } func (responder *CodeResponder) Respond(c *gin.Context) { - for key, value := range responder.headers { - c.Header(key, value) - } - c.Status(responder.code) } diff --git a/internal/responder/error.go b/internal/responder/error.go new file mode 100644 index 0000000..34fd3e3 --- /dev/null +++ b/internal/responder/error.go @@ -0,0 +1,28 @@ +package responder + +import ( + "errors" + storepkg "github.com/cirruslabs/orchard/internal/controller/store" + "github.com/gin-gonic/gin" + "net/http" +) + +type ErrorResponder struct { + err error + Responder +} + +func Error(err error) Responder { + return &ErrorResponder{ + err: err, + } +} + +func (responder *ErrorResponder) Respond(c *gin.Context) { + var code = http.StatusInternalServerError + if errors.Is(responder.err, storepkg.ErrNotFound) { + code = http.StatusNotFound + } + _ = c.Error(responder.err) + c.Status(code) +} diff --git a/internal/responder/json.go b/internal/responder/json.go index 69eaea4..a68d51e 100644 --- a/internal/responder/json.go +++ b/internal/responder/json.go @@ -5,35 +5,20 @@ import ( ) type JSONResponder struct { - code int - headers map[string]string - obj interface{} + code int + obj interface{} - DefaultResponder + Responder } -func JSON(code int, obj interface{}, opts ...Option) *JSONResponder { +func JSON(code int, obj interface{}) *JSONResponder { responder := &JSONResponder{ - code: code, - headers: map[string]string{}, - obj: obj, - } - - for _, opt := range opts { - opt(responder) + code: code, + obj: obj, } return responder } - -func (responder *JSONResponder) SetHeader(key string, value string) { - responder.headers[key] = value -} - func (responder *JSONResponder) Respond(c *gin.Context) { - for key, value := range responder.headers { - c.Header(key, value) - } - c.JSON(responder.code, responder.obj) } diff --git a/internal/responder/option.go b/internal/responder/option.go deleted file mode 100644 index 3f18597..0000000 --- a/internal/responder/option.go +++ /dev/null @@ -1,11 +0,0 @@ -package responder - -type Option func(responder Responder) - -func WithHeaders(headers map[string]string) Option { - return func(responder Responder) { - for key, value := range headers { - responder.SetHeader(key, value) - } - } -} diff --git a/internal/responder/responder.go b/internal/responder/responder.go index 1e217f0..78d2a2a 100644 --- a/internal/responder/responder.go +++ b/internal/responder/responder.go @@ -6,10 +6,4 @@ import ( type Responder interface { Respond(c *gin.Context) - SetHeader(key string, value string) } - -type DefaultResponder struct{} - -func (dr DefaultResponder) Respond(c *gin.Context) {} -func (dr DefaultResponder) SetHeader(key string, value string) {}