From 70409ac446f8a837c78e8c2c0cc0e84f1eee80ad Mon Sep 17 00:00:00 2001 From: Fedor Korotkov Date: Mon, 9 Feb 2026 13:22:37 +0100 Subject: [PATCH] fix: harden execute ssh session startup and cancellation handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - retry SSH handshake by re-establishing tunnel until request context cancellation - treat request context cancellation in execute session loop as normal termination - clean up ineffassign in execute stdin handling (linter) - addresses comments 2782294067 and 2782294070 🤖 Generated with [Codex](https://chatgpt.com/codex) Co-Authored-By: Codex --- internal/controller/api_vms_execute.go | 84 +++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 7 deletions(-) diff --git a/internal/controller/api_vms_execute.go b/internal/controller/api_vms_execute.go index 67e1d27..d36075d 100644 --- a/internal/controller/api_vms_execute.go +++ b/internal/controller/api_vms_execute.go @@ -23,7 +23,10 @@ import ( "golang.org/x/crypto/ssh" ) -const executeSessionRendezvousTimeout = 15 * time.Second +const ( + executeSessionRendezvousTimeout = 15 * time.Second + executeSessionSSHRetryDelay = 1 * time.Second +) type sshExecution struct { session *ssh.Session @@ -71,12 +74,10 @@ func (controller *Controller) executeVM(ctx *gin.Context) responder.Responder { if responderImpl != nil { return responderImpl } - defer func() { - _ = tunnel.Close() - }() wsConn, err := acceptExecuteWebSocket(ctx) if err != nil { + _ = tunnel.Close() return responder.Error(err) } defer func() { @@ -172,8 +173,16 @@ func (controller *Controller) establishExecuteSSHTunnel( } func (controller *Controller) executeVMViaSSHTunnel(ctx context.Context, tunnel net.Conn, ws *websocket.Conn, vm *v1.VM, cmd string, args []string) responder.Responder { - sshClient, err := newSSHClient(tunnel, vm) + defer func() { + _ = tunnel.Close() + }() + + tunnel, sshClient, err := controller.establishExecuteSSHClientWithRetry(ctx, tunnel, vm) if err != nil { + if isNormalContextCancellation(err) { + return responder.Empty() + } + controller.closeExecuteWithFrameError(ws, nil, fmt.Sprintf("SSH handshake with the VM failed: %v", err)) @@ -196,6 +205,65 @@ func (controller *Controller) executeVMViaSSHTunnel(ctx context.Context, tunnel return controller.runExecuteSession(ctx, ws, execution) } +func (controller *Controller) establishExecuteSSHClientWithRetry( + ctx context.Context, + tunnel net.Conn, + vm *v1.VM, +) (net.Conn, *ssh.Client, error) { + var lastErr error + + for { + sshClient, err := newSSHClient(tunnel, vm) + if err == nil { + return tunnel, sshClient, nil + } + + lastErr = err + _ = tunnel.Close() + + controller.logger.Warnf("execute session: SSH handshake failed for VM %s on worker %s, retrying: %v", + vm.Name, vm.Worker, err) + + if !waitExecuteSSHRetry(ctx) { + if ctxErr := ctx.Err(); ctxErr != nil { + return nil, nil, ctxErr + } + + return nil, nil, lastErr + } + + nextTunnel, responderImpl := controller.establishExecuteSSHTunnel(ctx, vm) + if responderImpl != nil { + lastErr = errors.New("failed to establish SSH tunnel") + + if !waitExecuteSSHRetry(ctx) { + if ctxErr := ctx.Err(); ctxErr != nil { + return nil, nil, ctxErr + } + + return nil, nil, lastErr + } + + continue + } + + tunnel = nextTunnel + } +} + +func waitExecuteSSHRetry(ctx context.Context) bool { + select { + case <-ctx.Done(): + return false + case <-time.After(executeSessionSSHRetryDelay): + return true + } +} + +func isNormalContextCancellation(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) +} + func newSSHClient(conn net.Conn, vm *v1.VM) (*ssh.Client, error) { sshUser := vm.Username sshPassword := vm.Password @@ -413,6 +481,10 @@ func (controller *Controller) runExecuteSession( pingCtxCancel() case <-ctx.Done(): + if isNormalContextCancellation(ctx.Err()) { + return responder.Empty() + } + return responder.Error(ctx.Err()) } } @@ -468,8 +540,6 @@ func consumeClientInputFrames( return } - - stdinClosed = true } errCh <- nil