Never list workers in Update()/storeUpdate() transactions (#228)

* POST /v1/workers: do not list workers in a single update txn

* schedulingLoopIteration(): do not list workers in a single update txn

* .golangci.yml: remove mentions of fully deprecated linters
This commit is contained in:
Nikolay Edigaryev 2024-12-05 16:59:50 +04:00 committed by GitHub
parent d94690176e
commit d7b6f477e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 99 additions and 53 deletions

View File

@ -19,8 +19,6 @@ linters:
- dupl
- errcheck
- exhaustive
- exportloopref
- exportloopref
- gochecknoinits
- gocognit
- goconst
@ -97,9 +95,6 @@ linters:
# [1]: https://github.com/mgechev/revive/issues/244#issuecomment-560512162
- revive
# Unfortunately too much false-positives, e.g. for a 0700 umask or number 10 when using strconv.FormatInt()
- gomnd
# Needs package whitelists
- depguard

View File

@ -35,13 +35,48 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder
}
worker.CreatedAt = currentTime
// License capacity check
if err := controller.storeView(func(txn storepkg.Transaction) responder.Responder {
_, err := txn.GetWorker(worker.Name)
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
controller.logger.Errorf("failed to check if the worker "+
"with name %q exists in the DB: %v", worker.Name, err)
return responder.Code(http.StatusInternalServerError)
}
if err == nil {
// We will be re-creating a worker with
// the same name, no capacity change
return nil
}
// We will be adding a new worker, check if the license capacity allows that
workers, err := txn.ListWorkers()
if err != nil {
controller.logger.Errorf("failed to count the number of workers in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
if uint(len(workers)+1) > controller.maxWorkersPerLicense {
return responder.JSON(http.StatusConflict, NewErrorResponse("cannot register a new worker "+
"because the license capacity of %d workers has been reached, "+
"consider upgrading at https://tart.run/licensing/", controller.maxWorkersPerLicense))
}
return nil
}); err != nil {
return err
}
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
// In case there already exist a worker with the same name,
// allow overwriting it if the request comes from a worker
// with the same machine ID
dbWorker, err := txn.GetWorker(worker.Name)
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
controller.logger.Errorf("failed to check if the worker exists in the DB: %v", err)
controller.logger.Errorf("failed to check if the worker "+
"with name %q exists in the DB: %v", worker.Name, err)
return responder.Code(http.StatusInternalServerError)
}
@ -50,21 +85,6 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder
NewErrorResponse("this worker is managed from a different machine ID, "+
"delete this worker first to be able to re-create it"))
}
if errors.Is(err, storepkg.ErrNotFound) {
// We will be adding a new worker, check if the license capacity allows that
workers, err := txn.ListWorkers()
if err != nil {
controller.logger.Errorf("failed to count the number of workers in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
if uint(len(workers)+1) > controller.maxWorkersPerLicense {
return responder.JSON(http.StatusConflict, NewErrorResponse("cannot register a new worker "+
"because the license capacity of %d workers has been reached, "+
"consider upgrading at https://tart.run/licensing/", controller.maxWorkersPerLicense))
}
}
if err := txn.SetWorker(worker); err != nil {
return responder.Error(err)

View File

@ -87,15 +87,19 @@ func (scheduler *Scheduler) Run() {
case <-time.After(schedulerInterval):
}
healthCheckingLoopIterationStart := time.Now()
if err := scheduler.healthCheckingLoopIteration(); err != nil {
scheduler.logger.Errorf("Failed to health-check VMs: %v", err)
}
healthCheckingLoopIterationEnd := time.Now()
schedulingLoopIterationStart := time.Now()
err := scheduler.schedulingLoopIteration()
schedulingLoopIterationEnd := time.Now()
scheduler.logger.Debugf("Scheduling loop iteration took %v",
scheduler.logger.Debugf("Health checking loop iteration took %v, "+
"scheduling loop iteration took %v",
healthCheckingLoopIterationEnd.Sub(healthCheckingLoopIterationStart),
schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart))
if err != nil {
@ -352,46 +356,69 @@ func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) {
}
func (scheduler *Scheduler) healthCheckingLoopIteration() error {
return scheduler.store.Update(func(txn storepkg.Transaction) error {
// Retrieve scheduled VMs
vms, err := txn.ListVMs()
// Get a lagging view of VMs
var vms []v1.VM
if err := scheduler.store.View(func(txn storepkg.Transaction) error {
var err error
vms, err = txn.ListVMs()
if err != nil {
return err
}
var scheduledVMs []v1.VM
for _, vm := range vms {
if vm.Worker != "" {
scheduledVMs = append(scheduledVMs, vm)
}
}
// Retrieve and index workers by name
// Update metrics
workers, err := txn.ListWorkers()
if err != nil {
return err
}
nameToWorker := map[string]v1.Worker{}
for _, worker := range workers {
nameToWorker[worker.Name] = worker
}
scheduler.reportStats(workers, vms)
// Process scheduled VMs
for _, scheduledVM := range scheduledVMs {
if err := scheduler.healthCheckVM(txn, nameToWorker, scheduledVM); err != nil {
return err
}
return nil
}); err != nil {
return err
}
// Process each VM in a lagging list of VMs in an individual
// transaction, re-checking that the VM still exists
// and it is still scheduled
for _, vm := range vms {
if vm.Worker == "" {
// Not a scheduled VM
//
// We'll re-check this below, but this allows us
// to avoid wasting cycles opening a transaction
// for nothing.
continue
}
return nil
})
if err := scheduler.store.Update(func(txn storepkg.Transaction) error {
currentVM, err := txn.GetVM(vm.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
// VM ceased to exist, nothing to do
return nil
}
return err
}
if currentVM.Worker == "" {
// Not a scheduled VM, nothing to do
return nil
}
return scheduler.healthCheckVM(txn, *currentVM)
}); err != nil {
return err
}
}
return nil
}
func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, nameToWorker map[string]v1.Worker, vm v1.VM) error {
func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, vm v1.VM) error {
logger := scheduler.logger.With("vm_name", vm.Name, "vm_uid", vm.UID, "vm_restart_count", vm.RestartCount)
// Schedule a VM restart if the restart policy mandates it
@ -415,13 +442,17 @@ func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, nameToWorker
return txn.SetVM(vm)
}
worker, ok := nameToWorker[vm.Worker]
if !ok {
vm.Status = v1.VMStatusFailed
vm.StatusMessage = "VM is assigned to a worker that " +
"doesn't exist anymore"
worker, err := txn.GetWorker(vm.Worker)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
vm.Status = v1.VMStatusFailed
vm.StatusMessage = "VM is assigned to a worker that " +
"doesn't exist anymore"
return txn.SetVM(vm)
return txn.SetVM(vm)
}
return err
}
if worker.Offline(scheduler.workerOfflineTimeout) && !vm.TerminalState() {