diff --git a/.golangci.yml b/.golangci.yml index 4da1c9c..8d23a82 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -19,8 +19,6 @@ linters: - dupl - errcheck - exhaustive - - exportloopref - - exportloopref - gochecknoinits - gocognit - goconst @@ -97,9 +95,6 @@ linters: # [1]: https://github.com/mgechev/revive/issues/244#issuecomment-560512162 - revive - # Unfortunately too much false-positives, e.g. for a 0700 umask or number 10 when using strconv.FormatInt() - - gomnd - # Needs package whitelists - depguard diff --git a/internal/controller/api_workers.go b/internal/controller/api_workers.go index c3c9efb..2eb1f60 100644 --- a/internal/controller/api_workers.go +++ b/internal/controller/api_workers.go @@ -35,13 +35,48 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder } worker.CreatedAt = currentTime + // License capacity check + if err := controller.storeView(func(txn storepkg.Transaction) responder.Responder { + _, err := txn.GetWorker(worker.Name) + if err != nil && !errors.Is(err, storepkg.ErrNotFound) { + controller.logger.Errorf("failed to check if the worker "+ + "with name %q exists in the DB: %v", worker.Name, err) + + return responder.Code(http.StatusInternalServerError) + } + if err == nil { + // We will be re-creating a worker with + // the same name, no capacity change + return nil + } + + // We will be adding a new worker, check if the license capacity allows that + workers, err := txn.ListWorkers() + if err != nil { + controller.logger.Errorf("failed to count the number of workers in the DB: %v", err) + + return responder.Code(http.StatusInternalServerError) + } + + if uint(len(workers)+1) > controller.maxWorkersPerLicense { + return responder.JSON(http.StatusConflict, NewErrorResponse("cannot register a new worker "+ + "because the license capacity of %d workers has been reached, "+ + "consider upgrading at https://tart.run/licensing/", controller.maxWorkersPerLicense)) + } + + return nil + }); err != nil { + return err + } + return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { // In case there already exist a worker with the same name, // allow overwriting it if the request comes from a worker // with the same machine ID dbWorker, err := txn.GetWorker(worker.Name) if err != nil && !errors.Is(err, storepkg.ErrNotFound) { - controller.logger.Errorf("failed to check if the worker exists in the DB: %v", err) + controller.logger.Errorf("failed to check if the worker "+ + "with name %q exists in the DB: %v", worker.Name, err) return responder.Code(http.StatusInternalServerError) } @@ -50,21 +85,6 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder NewErrorResponse("this worker is managed from a different machine ID, "+ "delete this worker first to be able to re-create it")) } - if errors.Is(err, storepkg.ErrNotFound) { - // We will be adding a new worker, check if the license capacity allows that - workers, err := txn.ListWorkers() - if err != nil { - controller.logger.Errorf("failed to count the number of workers in the DB: %v", err) - - return responder.Code(http.StatusInternalServerError) - } - - if uint(len(workers)+1) > controller.maxWorkersPerLicense { - return responder.JSON(http.StatusConflict, NewErrorResponse("cannot register a new worker "+ - "because the license capacity of %d workers has been reached, "+ - "consider upgrading at https://tart.run/licensing/", controller.maxWorkersPerLicense)) - } - } if err := txn.SetWorker(worker); err != nil { return responder.Error(err) diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index aaab508..0bcfa9a 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -87,15 +87,19 @@ func (scheduler *Scheduler) Run() { case <-time.After(schedulerInterval): } + healthCheckingLoopIterationStart := time.Now() if err := scheduler.healthCheckingLoopIteration(); err != nil { scheduler.logger.Errorf("Failed to health-check VMs: %v", err) } + healthCheckingLoopIterationEnd := time.Now() schedulingLoopIterationStart := time.Now() err := scheduler.schedulingLoopIteration() schedulingLoopIterationEnd := time.Now() - scheduler.logger.Debugf("Scheduling loop iteration took %v", + scheduler.logger.Debugf("Health checking loop iteration took %v, "+ + "scheduling loop iteration took %v", + healthCheckingLoopIterationEnd.Sub(healthCheckingLoopIterationStart), schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart)) if err != nil { @@ -352,46 +356,69 @@ func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) { } func (scheduler *Scheduler) healthCheckingLoopIteration() error { - return scheduler.store.Update(func(txn storepkg.Transaction) error { - // Retrieve scheduled VMs - vms, err := txn.ListVMs() + // Get a lagging view of VMs + var vms []v1.VM + + if err := scheduler.store.View(func(txn storepkg.Transaction) error { + var err error + + vms, err = txn.ListVMs() if err != nil { return err } - var scheduledVMs []v1.VM - - for _, vm := range vms { - if vm.Worker != "" { - scheduledVMs = append(scheduledVMs, vm) - } - } - - // Retrieve and index workers by name + // Update metrics workers, err := txn.ListWorkers() if err != nil { return err } - nameToWorker := map[string]v1.Worker{} - for _, worker := range workers { - nameToWorker[worker.Name] = worker - } - scheduler.reportStats(workers, vms) - // Process scheduled VMs - for _, scheduledVM := range scheduledVMs { - if err := scheduler.healthCheckVM(txn, nameToWorker, scheduledVM); err != nil { - return err - } + return nil + }); err != nil { + return err + } + + // Process each VM in a lagging list of VMs in an individual + // transaction, re-checking that the VM still exists + // and it is still scheduled + for _, vm := range vms { + if vm.Worker == "" { + // Not a scheduled VM + // + // We'll re-check this below, but this allows us + // to avoid wasting cycles opening a transaction + // for nothing. + continue } - return nil - }) + if err := scheduler.store.Update(func(txn storepkg.Transaction) error { + currentVM, err := txn.GetVM(vm.Name) + if err != nil { + if errors.Is(err, storepkg.ErrNotFound) { + // VM ceased to exist, nothing to do + return nil + } + + return err + } + + if currentVM.Worker == "" { + // Not a scheduled VM, nothing to do + return nil + } + + return scheduler.healthCheckVM(txn, *currentVM) + }); err != nil { + return err + } + } + + return nil } -func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, nameToWorker map[string]v1.Worker, vm v1.VM) error { +func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, vm v1.VM) error { logger := scheduler.logger.With("vm_name", vm.Name, "vm_uid", vm.UID, "vm_restart_count", vm.RestartCount) // Schedule a VM restart if the restart policy mandates it @@ -415,13 +442,17 @@ func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, nameToWorker return txn.SetVM(vm) } - worker, ok := nameToWorker[vm.Worker] - if !ok { - vm.Status = v1.VMStatusFailed - vm.StatusMessage = "VM is assigned to a worker that " + - "doesn't exist anymore" + worker, err := txn.GetWorker(vm.Worker) + if err != nil { + if errors.Is(err, storepkg.ErrNotFound) { + vm.Status = v1.VMStatusFailed + vm.StatusMessage = "VM is assigned to a worker that " + + "doesn't exist anymore" - return txn.SetVM(vm) + return txn.SetVM(vm) + } + + return err } if worker.Offline(scheduler.workerOfflineTimeout) && !vm.TerminalState() {