diff --git a/internal/controller/api.go b/internal/controller/api.go index 0aca62f..642b770 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -13,6 +13,7 @@ import ( "github.com/gin-gonic/gin" "github.com/go-openapi/runtime/middleware" "github.com/penglongli/gin-metrics/ginmetrics" + "go.uber.org/zap" "google.golang.org/grpc/metadata" "net/http" "strings" @@ -240,16 +241,17 @@ func (controller *Controller) authorizeGRPC(ctx context.Context, scopes ...v1pkg type storeTransactionFunc func(operation func(txn storepkg.Transaction) error) error func (controller *Controller) storeView(view func(txn storepkg.Transaction) responder.Responder) responder.Responder { - return adaptResponderToStoreOperation(controller.store.View, view) + return adaptResponderToStoreOperation(controller.logger, controller.store.View, view) } func (controller *Controller) storeUpdate( update func(txn storepkg.Transaction) responder.Responder, ) responder.Responder { - return adaptResponderToStoreOperation(controller.store.Update, update) + return adaptResponderToStoreOperation(controller.logger, controller.store.Update, update) } func adaptResponderToStoreOperation( + logger *zap.SugaredLogger, storeOperation storeTransactionFunc, responderOperation func(txn storepkg.Transaction) responder.Responder, ) responder.Responder { @@ -260,6 +262,8 @@ func adaptResponderToStoreOperation( return nil }); err != nil { + logger.Errorf("encountered an error during store operation: %v", err) + return responder.Code(http.StatusInternalServerError) } diff --git a/internal/controller/api_cluster_settings.go b/internal/controller/api_cluster_settings.go index ec3fd30..129da23 100644 --- a/internal/controller/api_cluster_settings.go +++ b/internal/controller/api_cluster_settings.go @@ -16,7 +16,8 @@ func (controller *Controller) getClusterSettings(ctx *gin.Context) responder.Res return controller.storeView(func(txn storepkg.Transaction) responder.Responder { clusterSettings, err := txn.GetClusterSettings() if err != nil { - controller.logger.Errorf("failed to retrieve cluster settings: %v", err) + controller.logger.Errorf("failed to retrieve cluster settings from the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } @@ -45,6 +46,8 @@ func (controller *Controller) updateClusterSettings(ctx *gin.Context) responder. return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { if err := txn.SetClusterSettings(clusterSettings); err != nil { + controller.logger.Errorf("failed to set cluster settings in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } diff --git a/internal/controller/api_service_accounts.go b/internal/controller/api_service_accounts.go index 4718d00..ccb2c38 100644 --- a/internal/controller/api_service_accounts.go +++ b/internal/controller/api_service_accounts.go @@ -45,6 +45,8 @@ func (controller *Controller) createServiceAccount(ctx *gin.Context) responder.R // Does the Service Account resource with this name already exists? _, err := txn.GetServiceAccount(serviceAccount.Name) if err != nil && !errors.Is(err, storepkg.ErrNotFound) { + controller.logger.Errorf("failed to check if the service account exists in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } if err == nil { @@ -53,6 +55,8 @@ func (controller *Controller) createServiceAccount(ctx *gin.Context) responder.R } if err := txn.SetServiceAccount(&serviceAccount); err != nil { + controller.logger.Errorf("failed to create the service account in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } @@ -86,6 +90,8 @@ func (controller *Controller) updateServiceAccount(ctx *gin.Context) responder.R } if err := txn.SetServiceAccount(dbServiceAccount); err != nil { + controller.logger.Errorf("failed to update service account in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } diff --git a/internal/controller/api_vms.go b/internal/controller/api_vms.go index e174124..584b6ed 100644 --- a/internal/controller/api_vms.go +++ b/internal/controller/api_vms.go @@ -79,6 +79,8 @@ func (controller *Controller) createVM(ctx *gin.Context) responder.Responder { // Does the VM resource with this name already exists? _, err := txn.GetVM(vm.Name) if err != nil && !errors.Is(err, storepkg.ErrNotFound) { + controller.logger.Errorf("failed to check if the VM exists in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } if err == nil { @@ -86,6 +88,8 @@ func (controller *Controller) createVM(ctx *gin.Context) responder.Responder { } if err := txn.SetVM(vm); err != nil { + controller.logger.Errorf("failed to create VM in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } @@ -126,6 +130,8 @@ func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder { dbVM.StatusMessage = userVM.StatusMessage if err := txn.SetVM(*dbVM); err != nil { + controller.logger.Errorf("failed to update VM in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } @@ -252,6 +258,8 @@ func (controller *Controller) validateHostDirs(hostDirs []v1.HostDir) responder. return err }) if err != nil { + controller.logger.Errorf("failed to retrieve cluster settings from the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } diff --git a/internal/controller/api_workers.go b/internal/controller/api_workers.go index 163d990..cf0bba3 100644 --- a/internal/controller/api_workers.go +++ b/internal/controller/api_workers.go @@ -37,6 +37,8 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder // with the same machine ID dbWorker, err := txn.GetWorker(worker.Name) if err != nil && !errors.Is(err, storepkg.ErrNotFound) { + controller.logger.Errorf("failed to check if the worker exists in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } if err == nil && worker.MachineID != dbWorker.MachineID { @@ -48,6 +50,8 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder // We will be adding a new worker, check if the license capacity allows that workers, err := txn.ListWorkers() if err != nil { + controller.logger.Errorf("failed to count the number of workers in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } @@ -87,6 +91,8 @@ func (controller *Controller) updateWorker(ctx *gin.Context) responder.Responder dbWorker.SchedulingPaused = userWorker.SchedulingPaused if err := txn.SetWorker(*dbWorker); err != nil { + controller.logger.Errorf("failed to update worker in the DB: %v", err) + return responder.Code(http.StatusInternalServerError) } diff --git a/internal/controller/store/badger/badger_store.go b/internal/controller/store/badger/badger_store.go index 9ce1f02..5d72905 100644 --- a/internal/controller/store/badger/badger_store.go +++ b/internal/controller/store/badger/badger_store.go @@ -3,6 +3,7 @@ package badger import ( "errors" "fmt" + "github.com/avast/retry-go/v4" "github.com/cirruslabs/orchard/internal/controller/store" "github.com/dgraph-io/badger/v3" ) @@ -33,19 +34,27 @@ func NewBadgerStore(dbPath string) (store.Store, error) { } func (store *Store) View(cb func(txn store.Transaction) error) error { - return store.db.View(func(txn *badger.Txn) error { - return cb(&Transaction{ - badgerTxn: txn, + return retry.Do(func() error { + return store.db.View(func(txn *badger.Txn) error { + return cb(&Transaction{ + badgerTxn: txn, + }) }) - }) + }, retry.RetryIf(func(err error) bool { + return errors.Is(err, badger.ErrConflict) + }), retry.Attempts(3), retry.LastErrorOnly(true)) } func (store *Store) Update(cb func(txn store.Transaction) error) error { - return store.db.Update(func(txn *badger.Txn) error { - return cb(&Transaction{ - badgerTxn: txn, + return retry.Do(func() error { + return store.db.Update(func(txn *badger.Txn) error { + return cb(&Transaction{ + badgerTxn: txn, + }) }) - }) + }, retry.RetryIf(func(err error) bool { + return errors.Is(err, badger.ErrConflict) + }), retry.Attempts(3), retry.LastErrorOnly(true)) } func mapErr(err error) error {