Worker: better cleanup mechanisms (#139)

* Always Close() the Worker instance

* orchard list vms: show assigned worker for each of the VMs

* Stop the failed VMs before we schedule new VMs

To avoid violating resource constraints.

* syncOnDiskVMs: don't ignore running VMs

* Worker: show correct remote and local VM counts
This commit is contained in:
Nikolay Edigaryev 2023-09-28 23:09:43 +04:00 committed by GitHub
parent 40f58e4aee
commit 063405672f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 36 deletions

View File

@ -54,10 +54,10 @@ func runDev(cmd *cobra.Command, args []string) error {
devController, devWorker, err := CreateDevControllerAndWorker(devDataDirPath,
fmt.Sprintf(":%d", netconstants.DefaultControllerPort), resources,
nil, nil)
if err != nil {
return err
}
defer devWorker.Close()
errChan := make(chan error, 2)

View File

@ -40,13 +40,13 @@ func runListVMs(cmd *cobra.Command, args []string) error {
table := uitable.New()
table.AddRow("Name", "Created", "Image", "Status", "Restart policy")
table.AddRow("Name", "Created", "Image", "Status", "Restart policy", "Assigned worker")
for _, vm := range vms {
restartPolicyInfo := fmt.Sprintf("%s (%d restarts)", vm.RestartPolicy, vm.RestartCount)
createdAtInfo := humanize.RelTime(vm.CreatedAt, time.Now(), "ago", "in the future")
table.AddRow(vm.Name, createdAtInfo, vm.Image, vm.Status, restartPolicyInfo)
table.AddRow(vm.Name, createdAtInfo, vm.Image, vm.Status, restartPolicyInfo, vm.Worker)
}
fmt.Println(table)

View File

@ -105,6 +105,7 @@ func runWorker(cmd *cobra.Command, args []string) (err error) {
if err != nil {
return err
}
defer workerInstance.Close()
return workerInstance.Run(cmd.Context())
}

View File

@ -190,22 +190,23 @@ func (vm *VM) IP(ctx context.Context) (string, error) {
return strings.TrimSpace(stdout), nil
}
func (vm *VM) Stop() error {
func (vm *VM) Stop() {
if !vm.cloned {
return nil
return
}
vm.logger.Debugf("stopping VM")
_, _, _ = tart.Tart(context.Background(), vm.logger, "stop", vm.id())
_, _, err := tart.Tart(context.Background(), vm.logger, "stop", vm.id())
if err != nil {
vm.logger.Warnf("failed to stop VM: %v", err)
}
vm.logger.Debugf("VM stopped")
vm.cancel()
vm.wg.Wait()
return nil
}
func (vm *VM) Delete() error {

View File

@ -86,10 +86,7 @@ func (worker *Worker) Run(ctx context.Context) error {
func (worker *Worker) Close() error {
var result error
for _, vm := range worker.vmm.List() {
err := vm.Stop()
if err != nil {
result = multierror.Append(result, err)
}
vm.Stop()
}
for _, vm := range worker.vmm.List() {
err := vm.Delete()
@ -188,6 +185,7 @@ func (worker *Worker) updateWorker(ctx context.Context) error {
return nil
}
//nolint:nestif // nested "if" complexity is tolerable for now
func (worker *Worker) syncVMs(ctx context.Context) error {
remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name)
if err != nil {
@ -199,8 +197,12 @@ func (worker *Worker) syncVMs(ctx context.Context) error {
}
worker.logger.Infof("syncing %d local VMs against %d remote VMs...",
len(remoteVMsIndex), worker.vmm.Len())
worker.vmm.Len(), len(remoteVMsIndex))
// It's important to check the remote VMs against local ones first
// to stop the failed VMs before we start the new VMs, otherwise we
// risk violating the resource constraints (e.g. a maximum of 2 VMs
// per host)
for _, vm := range worker.vmm.List() {
remoteVM, ok := remoteVMsIndex[vm.OnDiskName()]
if !ok {
@ -211,12 +213,22 @@ func (worker *Worker) syncVMs(ctx context.Context) error {
if err := worker.deleteVM(vm); err != nil {
return err
}
} else if remoteVM.Status != v1.VMStatusFailed && vm.Err() != nil {
// Local VM has failed, update remote VM
remoteVM.Status = v1.VMStatusFailed
remoteVM.StatusMessage = vm.Err().Error()
if _, err := worker.client.VMs().Update(ctx, remoteVM); err != nil {
return err
} else {
if remoteVM.Status == v1.VMStatusFailed {
// VM has failed on the remote side, stop it locally to prevent incorrect
// worker's resources calculation in the Controller's scheduler
vm.Stop()
} else if vm.Err() != nil {
// VM has failed on the local side, stop it before reporting as failed to prevent incorrect
// worker's resources calculation in the Controller's scheduler
vm.Stop()
// Report the VM as failed
remoteVM.Status = v1.VMStatusFailed
remoteVM.StatusMessage = vm.Err().Error()
if _, err := worker.client.VMs().Update(ctx, remoteVM); err != nil {
return err
}
}
}
}
@ -238,6 +250,7 @@ func (worker *Worker) syncVMs(ctx context.Context) error {
return nil
}
//nolint:nestif,gocognit // complexity is tolerable for now
func (worker *Worker) syncOnDiskVMs(ctx context.Context) error {
remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name)
if err != nil {
@ -256,10 +269,6 @@ func (worker *Worker) syncOnDiskVMs(ctx context.Context) error {
}
for _, vmInfo := range vmInfos {
if vmInfo.Running {
continue
}
onDiskName, err := ondiskname.Parse(vmInfo.Name)
if err != nil {
if errors.Is(err, ondiskname.ErrNotManagedByOrchard) {
@ -269,22 +278,44 @@ func (worker *Worker) syncOnDiskVMs(ctx context.Context) error {
return err
}
// VMs that exist in the Worker's VM manager will be handled in the syncVMs()
if worker.vmm.Exists(onDiskName) {
continue
}
remoteVM, ok := remoteVMsIndex[onDiskName]
if !ok {
// On-disk VM doesn't exist on the controller, delete it
// On-disk VM doesn't exist on the controller nor in the Worker's VM manager,
// stop it (if applicable) and delete it
if vmInfo.Running {
_, _, err := tart.Tart(ctx, worker.logger, "stop", vmInfo.Name)
if err != nil {
worker.logger.Warnf("failed to stop")
}
}
_, _, err := tart.Tart(ctx, worker.logger, "delete", vmInfo.Name)
if err != nil {
return err
}
} else if remoteVM.Status == v1.VMStatusRunning && !worker.vmm.Exists(onDiskName) {
// On-disk VM exist on the controller,
// but we don't know about it, so
// mark it as failed
remoteVM.Status = v1.VMStatusFailed
remoteVM.StatusMessage = "Worker lost track of VM"
_, err := worker.client.VMs().Update(ctx, remoteVM)
if err != nil {
return err
} else if remoteVM.Status != v1.VMStatusPending {
// On-disk VM exists on the controller and was acted upon,
// but we've lost track of it, so shut it down (if applicable)
// and report the error (if not failed yet)
if vmInfo.Running {
_, _, err := tart.Tart(ctx, worker.logger, "stop", vmInfo.Name)
if err != nil {
worker.logger.Warnf("failed to stop")
}
}
if remoteVM.Status != v1.VMStatusFailed {
remoteVM.Status = v1.VMStatusFailed
remoteVM.StatusMessage = "Worker lost track of VM"
_, err := worker.client.VMs().Update(ctx, remoteVM)
if err != nil {
return err
}
}
}
}
@ -293,9 +324,7 @@ func (worker *Worker) syncOnDiskVMs(ctx context.Context) error {
}
func (worker *Worker) deleteVM(vm *vmmanager.VM) error {
if err := vm.Stop(); err != nil {
return err
}
vm.Stop()
if err := vm.Delete(); err != nil {
return err