Refactored working with a storage (#20)

This commit is contained in:
Fedor Korotkov 2023-02-08 19:36:30 -05:00 committed by GitHub
parent 0b9b96b8c9
commit edb9b3d693
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 173 additions and 201 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.idea

View File

@ -7,9 +7,6 @@ import (
"net/http"
)
type storeTxFunc func(cb func(txn *storepkg.Txn) error) error
type apiTxFunc func(txn *storepkg.Txn) responder.Responder
func (controller *Controller) initAPI() *gin.Engine {
gin.SetMode(gin.DebugMode)
ginEngine := gin.Default()
@ -59,19 +56,26 @@ func (controller *Controller) initAPI() *gin.Engine {
return ginEngine
}
func (controller *Controller) storeView(cb apiTxFunc) responder.Responder {
return mapTxFuncs(controller.store.View, cb)
type storeTransactionFunc func(operation func(txn storepkg.Transaction) error) error
func (controller *Controller) storeView(view func(txn storepkg.Transaction) responder.Responder) responder.Responder {
return adaptResponderToStoreOperation(controller.store.View, view)
}
func (controller *Controller) storeUpdate(cb apiTxFunc) responder.Responder {
return mapTxFuncs(controller.store.Update, cb)
func (controller *Controller) storeUpdate(
update func(txn storepkg.Transaction) responder.Responder,
) responder.Responder {
return adaptResponderToStoreOperation(controller.store.Update, update)
}
func mapTxFuncs(txFunc storeTxFunc, cb apiTxFunc) responder.Responder {
func adaptResponderToStoreOperation(
storeOperation storeTransactionFunc,
responderOperation func(txn storepkg.Transaction) responder.Responder,
) responder.Responder {
var result responder.Responder
if err := txFunc(func(txn *storepkg.Txn) error {
result = cb(txn)
if err := storeOperation(func(txn storepkg.Transaction) error {
result = responderOperation(txn)
return nil
}); err != nil {

View File

@ -28,7 +28,7 @@ func (controller *Controller) createVM(ctx *gin.Context) responder.Responder {
vm.UID = uuid.New().String()
vm.Generation = 0
return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder {
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
// Does the VM resource with this name already exists?
_, err := txn.GetVM(vm.Name)
if !errors.Is(err, storepkg.ErrNotFound) {
@ -54,14 +54,10 @@ func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder {
return responder.Code(http.StatusPreconditionFailed)
}
return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder {
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
dbVM, err := txn.GetVM(userVM.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
return responder.Code(http.StatusNotFound)
}
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
dbVM.Status = userVM.Status
@ -78,14 +74,10 @@ func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder {
func (controller *Controller) getVM(ctx *gin.Context) responder.Responder {
name := ctx.Param("name")
return controller.storeView(func(txn *storepkg.Txn) responder.Responder {
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
vm, err := txn.GetVM(name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
return responder.Code(http.StatusNotFound)
}
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
return responder.JSON(http.StatusOK, &vm)
@ -93,14 +85,10 @@ func (controller *Controller) getVM(ctx *gin.Context) responder.Responder {
}
func (controller *Controller) listVMs(_ *gin.Context) responder.Responder {
return controller.storeView(func(txn *storepkg.Txn) responder.Responder {
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
vms, err := txn.ListVMs()
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
return responder.Code(http.StatusNotFound)
}
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
return responder.JSON(http.StatusOK, &vms)
@ -111,33 +99,25 @@ func (controller *Controller) deleteVM(ctx *gin.Context) responder.Responder {
name := ctx.Param("name")
if ctx.Query("force") != "" {
return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder {
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
if err := txn.DeleteVM(name); err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
return responder.Code(http.StatusNotFound)
}
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
return responder.Code(http.StatusOK)
})
}
return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder {
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
vm, err := txn.GetVM(name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
return responder.Code(http.StatusNotFound)
}
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
vm.DeletedAt = time.Now()
if err := txn.SetVM(vm); err != nil {
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
return responder.Code(http.StatusOK)

View File

@ -31,7 +31,7 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder
worker.UID = uuid.New().String()
worker.Generation = 0
return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder {
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
// Does the worker resource with this name already exists?
_, err := txn.GetWorker(worker.Name)
if !errors.Is(err, storepkg.ErrNotFound) {
@ -39,7 +39,7 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder
}
if err := txn.SetWorker(&worker); err != nil {
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
return responder.JSON(200, &worker)
@ -53,14 +53,10 @@ func (controller *Controller) updateWorker(ctx *gin.Context) responder.Responder
return responder.Code(http.StatusBadRequest)
}
return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder {
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
dbWorker, err := txn.GetWorker(userWorker.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
return responder.Code(http.StatusNotFound)
}
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
dbWorker.LastSeen = userWorker.LastSeen
@ -77,14 +73,10 @@ func (controller *Controller) updateWorker(ctx *gin.Context) responder.Responder
func (controller *Controller) getWorker(ctx *gin.Context) responder.Responder {
name := ctx.Param("name")
return controller.storeView(func(txn *storepkg.Txn) responder.Responder {
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
worker, err := txn.GetWorker(name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
return responder.Code(http.StatusNotFound)
}
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
return responder.JSON(http.StatusOK, &worker)
@ -92,14 +84,10 @@ func (controller *Controller) getWorker(ctx *gin.Context) responder.Responder {
}
func (controller *Controller) listWorkers(_ *gin.Context) responder.Responder {
return controller.storeView(func(txn *storepkg.Txn) responder.Responder {
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
workers, err := txn.ListWorkers()
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
return responder.Code(http.StatusNotFound)
}
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
return responder.JSON(http.StatusOK, &workers)
@ -109,13 +97,9 @@ func (controller *Controller) listWorkers(_ *gin.Context) responder.Responder {
func (controller *Controller) deleteWorker(ctx *gin.Context) responder.Responder {
name := ctx.Param("name")
return controller.storeUpdate(func(txn *storepkg.Txn) responder.Responder {
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
if err := txn.DeleteWorker(name); err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
return responder.Code(http.StatusNotFound)
}
return responder.Code(http.StatusInternalServerError)
return responder.Error(err)
}
return responder.Code(http.StatusOK)

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/controller/store/badger"
"go.uber.org/zap"
"net"
"net/http"
@ -25,7 +26,7 @@ type Controller struct {
tlsConfig *tls.Config
listener net.Listener
httpServer *http.Server
store *storepkg.Store
store storepkg.Store
logger *zap.SugaredLogger
}
@ -50,7 +51,7 @@ func New(opts ...Option) (*Controller, error) {
}
// Instantiate controller
store, err := storepkg.New(controller.dataDir.DBPath())
store, err := badger.NewBadgerStore(controller.dataDir.DBPath())
if err != nil {
return nil, err
}

View File

@ -9,7 +9,7 @@ import (
const schedulerInterval = 5 * time.Second
func runScheduler(store *storepkg.Store) error {
func runScheduler(store storepkg.Store) error {
ticker := time.NewTicker(schedulerInterval)
for {
@ -21,12 +21,12 @@ func runScheduler(store *storepkg.Store) error {
}
}
func runSchedulerInner(store *storepkg.Store) error {
func runSchedulerInner(store storepkg.Store) error {
var vms []*v1.VM
var workers []*v1.Worker
var err error
err = store.View(func(txn *storepkg.Txn) error {
err = store.View(func(txn storepkg.Transaction) error {
vms, err = txn.ListVMs()
if err != nil {
return err
@ -61,7 +61,7 @@ func runSchedulerInner(store *storepkg.Store) error {
vm.Worker = worker.Name
err := store.Update(func(txn *storepkg.Txn) error {
err := store.Update(func(txn storepkg.Transaction) error {
return txn.SetVM(vm)
})
if err != nil {

View File

@ -0,0 +1,61 @@
package badger
import (
"errors"
"fmt"
"github.com/cirruslabs/orchard/internal/controller/store"
"github.com/dgraph-io/badger/v3"
)
type Store struct {
db *badger.DB
store.Store
}
type Transaction struct {
badgerTxn *badger.Txn
store.Transaction
}
func NewBadgerStore(dbPath string) (store.Store, error) {
opts := badger.DefaultOptions(dbPath)
opts.SyncWrites = true
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
return &Store{
db: db,
}, nil
}
func (store *Store) View(cb func(txn store.Transaction) error) error {
return store.db.View(func(txn *badger.Txn) error {
return cb(&Transaction{
badgerTxn: txn,
})
})
}
func (store *Store) Update(cb func(txn store.Transaction) error) error {
return store.db.Update(func(txn *badger.Txn) error {
return cb(&Transaction{
badgerTxn: txn,
})
})
}
func mapErr(err error) error {
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return store.ErrNotFound
}
return fmt.Errorf("%w: %v", store.ErrStoreFailed, err)
}
return err
}

View File

@ -1,5 +1,5 @@
//nolint:dupl // maybe we'll figure out how to make DB resource accessors generic in the future
package store
package badger
import (
"encoding/json"
@ -14,7 +14,7 @@ func VMKey(name string) []byte {
return []byte(path.Join(SpaceVMs, name))
}
func (txn *Txn) GetVM(name string) (result *v1.VM, err error) {
func (txn *Transaction) GetVM(name string) (result *v1.VM, err error) {
defer func() {
err = mapErr(err)
}()
@ -41,7 +41,7 @@ func (txn *Txn) GetVM(name string) (result *v1.VM, err error) {
return &vm, nil
}
func (txn *Txn) SetVM(vm *v1.VM) (err error) {
func (txn *Transaction) SetVM(vm *v1.VM) (err error) {
defer func() {
err = mapErr(err)
}()
@ -56,7 +56,7 @@ func (txn *Txn) SetVM(vm *v1.VM) (err error) {
return txn.badgerTxn.Set(key, valueBytes)
}
func (txn *Txn) DeleteVM(name string) (err error) {
func (txn *Transaction) DeleteVM(name string) (err error) {
defer func() {
err = mapErr(err)
}()
@ -66,7 +66,7 @@ func (txn *Txn) DeleteVM(name string) (err error) {
return txn.badgerTxn.Delete(key)
}
func (txn *Txn) ListVMs() (result []*v1.VM, err error) {
func (txn *Transaction) ListVMs() (result []*v1.VM, err error) {
defer func() {
err = mapErr(err)
}()

View File

@ -1,5 +1,5 @@
//nolint:dupl // maybe we'll figure out how to make DB resource accessors generic in the future
package store
package badger
import (
"encoding/json"
@ -14,7 +14,7 @@ func WorkerKey(name string) []byte {
return []byte(path.Join(SpaceWorkers, name))
}
func (txn *Txn) GetWorker(name string) (result *v1.Worker, err error) {
func (txn *Transaction) GetWorker(name string) (result *v1.Worker, err error) {
defer func() {
err = mapErr(err)
}()
@ -41,7 +41,7 @@ func (txn *Txn) GetWorker(name string) (result *v1.Worker, err error) {
return &worker, nil
}
func (txn *Txn) SetWorker(worker *v1.Worker) (err error) {
func (txn *Transaction) SetWorker(worker *v1.Worker) (err error) {
defer func() {
err = mapErr(err)
}()
@ -56,7 +56,7 @@ func (txn *Txn) SetWorker(worker *v1.Worker) (err error) {
return txn.badgerTxn.Set(key, valueBytes)
}
func (txn *Txn) DeleteWorker(name string) (err error) {
func (txn *Transaction) DeleteWorker(name string) (err error) {
defer func() {
err = mapErr(err)
}()
@ -66,7 +66,7 @@ func (txn *Txn) DeleteWorker(name string) (err error) {
return txn.badgerTxn.Delete(key)
}
func (txn *Txn) ListWorkers() (result []*v1.Worker, err error) {
func (txn *Transaction) ListWorkers() (result []*v1.Worker, err error) {
defer func() {
err = mapErr(err)
}()

View File

@ -2,23 +2,10 @@ package store
import (
"errors"
"fmt"
"github.com/dgraph-io/badger/v3"
)
var (
ErrNotFound = errors.New("DB entry not found")
ErrBadgerFailed = errors.New("BadgerDB failed")
ErrNotFound = errors.New("store entry not found")
ErrConflict = errors.New("store conflict")
ErrStoreFailed = errors.New("store failed")
)
func mapErr(err error) error {
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return ErrNotFound
}
return fmt.Errorf("%w: %v", ErrBadgerFailed, err)
}
return err
}

View File

@ -1,24 +1,20 @@
package store
import (
"github.com/dgraph-io/badger/v3"
)
import v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
type Store struct {
db *badger.DB
type Store interface {
View(cb func(txn Transaction) error) error
Update(cb func(txn Transaction) error) error
}
func New(dbPath string) (*Store, error) {
opts := badger.DefaultOptions(dbPath)
type Transaction interface {
GetVM(name string) (result *v1.VM, err error)
SetVM(vm *v1.VM) (err error)
DeleteVM(name string) (err error)
ListVMs() (result []*v1.VM, err error)
opts.SyncWrites = true
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
return &Store{
db: db,
}, nil
GetWorker(name string) (result *v1.Worker, err error)
SetWorker(worker *v1.Worker) (err error)
DeleteWorker(name string) (err error)
ListWorkers() (result []*v1.Worker, err error)
}

View File

@ -1,23 +0,0 @@
package store
import "github.com/dgraph-io/badger/v3"
type Txn struct {
badgerTxn *badger.Txn
}
func (store *Store) View(cb func(txn *Txn) error) error {
return store.db.View(func(txn *badger.Txn) error {
return cb(&Txn{
badgerTxn: txn,
})
})
}
func (store *Store) Update(cb func(txn *Txn) error) error {
return store.db.Update(func(txn *badger.Txn) error {
return cb(&Txn{
badgerTxn: txn,
})
})
}

View File

@ -1,35 +1,20 @@
package responder
import "github.com/gin-gonic/gin"
import (
"github.com/gin-gonic/gin"
)
type CodeResponder struct {
code int
headers map[string]string
DefaultResponder
code int
Responder
}
func Code(code int, opts ...Option) *CodeResponder {
responder := &CodeResponder{
code: code,
headers: map[string]string{},
func Code(code int) Responder {
return &CodeResponder{
code: code,
}
for _, opt := range opts {
opt(responder)
}
return responder
}
func (responder *CodeResponder) SetHeader(key string, value string) {
responder.headers[key] = value
}
func (responder *CodeResponder) Respond(c *gin.Context) {
for key, value := range responder.headers {
c.Header(key, value)
}
c.Status(responder.code)
}

View File

@ -0,0 +1,28 @@
package responder
import (
"errors"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/gin-gonic/gin"
"net/http"
)
type ErrorResponder struct {
err error
Responder
}
func Error(err error) Responder {
return &ErrorResponder{
err: err,
}
}
func (responder *ErrorResponder) Respond(c *gin.Context) {
var code = http.StatusInternalServerError
if errors.Is(responder.err, storepkg.ErrNotFound) {
code = http.StatusNotFound
}
_ = c.Error(responder.err)
c.Status(code)
}

View File

@ -5,35 +5,20 @@ import (
)
type JSONResponder struct {
code int
headers map[string]string
obj interface{}
code int
obj interface{}
DefaultResponder
Responder
}
func JSON(code int, obj interface{}, opts ...Option) *JSONResponder {
func JSON(code int, obj interface{}) *JSONResponder {
responder := &JSONResponder{
code: code,
headers: map[string]string{},
obj: obj,
}
for _, opt := range opts {
opt(responder)
code: code,
obj: obj,
}
return responder
}
func (responder *JSONResponder) SetHeader(key string, value string) {
responder.headers[key] = value
}
func (responder *JSONResponder) Respond(c *gin.Context) {
for key, value := range responder.headers {
c.Header(key, value)
}
c.JSON(responder.code, responder.obj)
}

View File

@ -1,11 +0,0 @@
package responder
type Option func(responder Responder)
func WithHeaders(headers map[string]string) Option {
return func(responder Responder) {
for key, value := range headers {
responder.SetHeader(key, value)
}
}
}

View File

@ -6,10 +6,4 @@ import (
type Responder interface {
Respond(c *gin.Context)
SetHeader(key string, value string)
}
type DefaultResponder struct{}
func (dr DefaultResponder) Respond(c *gin.Context) {}
func (dr DefaultResponder) SetHeader(key string, value string) {}