Retry DB transactions on badger.ErrConflict (#114)
* Log HTTP 500 errors in more detail * Log errors in storeView and storeUpdate * Retry on badger.ErrConflict
This commit is contained in:
parent
9f4cd5bae7
commit
036eb954be
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/gin-gonic/gin"
|
||||
"github.com/go-openapi/runtime/middleware"
|
||||
"github.com/penglongli/gin-metrics/ginmetrics"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
|
@ -240,16 +241,17 @@ func (controller *Controller) authorizeGRPC(ctx context.Context, scopes ...v1pkg
|
|||
type storeTransactionFunc func(operation func(txn storepkg.Transaction) error) error
|
||||
|
||||
func (controller *Controller) storeView(view func(txn storepkg.Transaction) responder.Responder) responder.Responder {
|
||||
return adaptResponderToStoreOperation(controller.store.View, view)
|
||||
return adaptResponderToStoreOperation(controller.logger, controller.store.View, view)
|
||||
}
|
||||
|
||||
func (controller *Controller) storeUpdate(
|
||||
update func(txn storepkg.Transaction) responder.Responder,
|
||||
) responder.Responder {
|
||||
return adaptResponderToStoreOperation(controller.store.Update, update)
|
||||
return adaptResponderToStoreOperation(controller.logger, controller.store.Update, update)
|
||||
}
|
||||
|
||||
func adaptResponderToStoreOperation(
|
||||
logger *zap.SugaredLogger,
|
||||
storeOperation storeTransactionFunc,
|
||||
responderOperation func(txn storepkg.Transaction) responder.Responder,
|
||||
) responder.Responder {
|
||||
|
|
@ -260,6 +262,8 @@ func adaptResponderToStoreOperation(
|
|||
|
||||
return nil
|
||||
}); err != nil {
|
||||
logger.Errorf("encountered an error during store operation: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,8 @@ func (controller *Controller) getClusterSettings(ctx *gin.Context) responder.Res
|
|||
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
|
||||
clusterSettings, err := txn.GetClusterSettings()
|
||||
if err != nil {
|
||||
controller.logger.Errorf("failed to retrieve cluster settings: %v", err)
|
||||
controller.logger.Errorf("failed to retrieve cluster settings from the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
@ -45,6 +46,8 @@ func (controller *Controller) updateClusterSettings(ctx *gin.Context) responder.
|
|||
|
||||
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
|
||||
if err := txn.SetClusterSettings(clusterSettings); err != nil {
|
||||
controller.logger.Errorf("failed to set cluster settings in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,6 +45,8 @@ func (controller *Controller) createServiceAccount(ctx *gin.Context) responder.R
|
|||
// Does the Service Account resource with this name already exists?
|
||||
_, err := txn.GetServiceAccount(serviceAccount.Name)
|
||||
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
|
||||
controller.logger.Errorf("failed to check if the service account exists in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
if err == nil {
|
||||
|
|
@ -53,6 +55,8 @@ func (controller *Controller) createServiceAccount(ctx *gin.Context) responder.R
|
|||
}
|
||||
|
||||
if err := txn.SetServiceAccount(&serviceAccount); err != nil {
|
||||
controller.logger.Errorf("failed to create the service account in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
@ -86,6 +90,8 @@ func (controller *Controller) updateServiceAccount(ctx *gin.Context) responder.R
|
|||
}
|
||||
|
||||
if err := txn.SetServiceAccount(dbServiceAccount); err != nil {
|
||||
controller.logger.Errorf("failed to update service account in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -79,6 +79,8 @@ func (controller *Controller) createVM(ctx *gin.Context) responder.Responder {
|
|||
// Does the VM resource with this name already exists?
|
||||
_, err := txn.GetVM(vm.Name)
|
||||
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
|
||||
controller.logger.Errorf("failed to check if the VM exists in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
if err == nil {
|
||||
|
|
@ -86,6 +88,8 @@ func (controller *Controller) createVM(ctx *gin.Context) responder.Responder {
|
|||
}
|
||||
|
||||
if err := txn.SetVM(vm); err != nil {
|
||||
controller.logger.Errorf("failed to create VM in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
@ -126,6 +130,8 @@ func (controller *Controller) updateVM(ctx *gin.Context) responder.Responder {
|
|||
dbVM.StatusMessage = userVM.StatusMessage
|
||||
|
||||
if err := txn.SetVM(*dbVM); err != nil {
|
||||
controller.logger.Errorf("failed to update VM in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
@ -252,6 +258,8 @@ func (controller *Controller) validateHostDirs(hostDirs []v1.HostDir) responder.
|
|||
return err
|
||||
})
|
||||
if err != nil {
|
||||
controller.logger.Errorf("failed to retrieve cluster settings from the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,6 +37,8 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder
|
|||
// with the same machine ID
|
||||
dbWorker, err := txn.GetWorker(worker.Name)
|
||||
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
|
||||
controller.logger.Errorf("failed to check if the worker exists in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
if err == nil && worker.MachineID != dbWorker.MachineID {
|
||||
|
|
@ -48,6 +50,8 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder
|
|||
// We will be adding a new worker, check if the license capacity allows that
|
||||
workers, err := txn.ListWorkers()
|
||||
if err != nil {
|
||||
controller.logger.Errorf("failed to count the number of workers in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
@ -87,6 +91,8 @@ func (controller *Controller) updateWorker(ctx *gin.Context) responder.Responder
|
|||
dbWorker.SchedulingPaused = userWorker.SchedulingPaused
|
||||
|
||||
if err := txn.SetWorker(*dbWorker); err != nil {
|
||||
controller.logger.Errorf("failed to update worker in the DB: %v", err)
|
||||
|
||||
return responder.Code(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package badger
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/avast/retry-go/v4"
|
||||
"github.com/cirruslabs/orchard/internal/controller/store"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
)
|
||||
|
|
@ -33,19 +34,27 @@ func NewBadgerStore(dbPath string) (store.Store, error) {
|
|||
}
|
||||
|
||||
func (store *Store) View(cb func(txn store.Transaction) error) error {
|
||||
return store.db.View(func(txn *badger.Txn) error {
|
||||
return cb(&Transaction{
|
||||
badgerTxn: txn,
|
||||
return retry.Do(func() error {
|
||||
return store.db.View(func(txn *badger.Txn) error {
|
||||
return cb(&Transaction{
|
||||
badgerTxn: txn,
|
||||
})
|
||||
})
|
||||
})
|
||||
}, retry.RetryIf(func(err error) bool {
|
||||
return errors.Is(err, badger.ErrConflict)
|
||||
}), retry.Attempts(3), retry.LastErrorOnly(true))
|
||||
}
|
||||
|
||||
func (store *Store) Update(cb func(txn store.Transaction) error) error {
|
||||
return store.db.Update(func(txn *badger.Txn) error {
|
||||
return cb(&Transaction{
|
||||
badgerTxn: txn,
|
||||
return retry.Do(func() error {
|
||||
return store.db.Update(func(txn *badger.Txn) error {
|
||||
return cb(&Transaction{
|
||||
badgerTxn: txn,
|
||||
})
|
||||
})
|
||||
})
|
||||
}, retry.RetryIf(func(err error) bool {
|
||||
return errors.Is(err, badger.ErrConflict)
|
||||
}), retry.Attempts(3), retry.LastErrorOnly(true))
|
||||
}
|
||||
|
||||
func mapErr(err error) error {
|
||||
|
|
|
|||
Loading…
Reference in New Issue