fix: harden execute ssh session startup and cancellation handling
- retry SSH handshake by re-establishing tunnel until request context cancellation - treat request context cancellation in execute session loop as normal termination - clean up ineffassign in execute stdin handling (linter) - addresses comments 2782294067 and 2782294070 🤖 Generated with [Codex](https://chatgpt.com/codex) Co-Authored-By: Codex <codex@openai.com>
This commit is contained in:
parent
86248be003
commit
70409ac446
|
|
@ -23,7 +23,10 @@ import (
|
|||
"golang.org/x/crypto/ssh"
|
||||
)
|
||||
|
||||
const executeSessionRendezvousTimeout = 15 * time.Second
|
||||
const (
|
||||
executeSessionRendezvousTimeout = 15 * time.Second
|
||||
executeSessionSSHRetryDelay = 1 * time.Second
|
||||
)
|
||||
|
||||
type sshExecution struct {
|
||||
session *ssh.Session
|
||||
|
|
@ -71,12 +74,10 @@ func (controller *Controller) executeVM(ctx *gin.Context) responder.Responder {
|
|||
if responderImpl != nil {
|
||||
return responderImpl
|
||||
}
|
||||
defer func() {
|
||||
_ = tunnel.Close()
|
||||
}()
|
||||
|
||||
wsConn, err := acceptExecuteWebSocket(ctx)
|
||||
if err != nil {
|
||||
_ = tunnel.Close()
|
||||
return responder.Error(err)
|
||||
}
|
||||
defer func() {
|
||||
|
|
@ -172,8 +173,16 @@ func (controller *Controller) establishExecuteSSHTunnel(
|
|||
}
|
||||
|
||||
func (controller *Controller) executeVMViaSSHTunnel(ctx context.Context, tunnel net.Conn, ws *websocket.Conn, vm *v1.VM, cmd string, args []string) responder.Responder {
|
||||
sshClient, err := newSSHClient(tunnel, vm)
|
||||
defer func() {
|
||||
_ = tunnel.Close()
|
||||
}()
|
||||
|
||||
tunnel, sshClient, err := controller.establishExecuteSSHClientWithRetry(ctx, tunnel, vm)
|
||||
if err != nil {
|
||||
if isNormalContextCancellation(err) {
|
||||
return responder.Empty()
|
||||
}
|
||||
|
||||
controller.closeExecuteWithFrameError(ws, nil,
|
||||
fmt.Sprintf("SSH handshake with the VM failed: %v", err))
|
||||
|
||||
|
|
@ -196,6 +205,65 @@ func (controller *Controller) executeVMViaSSHTunnel(ctx context.Context, tunnel
|
|||
return controller.runExecuteSession(ctx, ws, execution)
|
||||
}
|
||||
|
||||
func (controller *Controller) establishExecuteSSHClientWithRetry(
|
||||
ctx context.Context,
|
||||
tunnel net.Conn,
|
||||
vm *v1.VM,
|
||||
) (net.Conn, *ssh.Client, error) {
|
||||
var lastErr error
|
||||
|
||||
for {
|
||||
sshClient, err := newSSHClient(tunnel, vm)
|
||||
if err == nil {
|
||||
return tunnel, sshClient, nil
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
_ = tunnel.Close()
|
||||
|
||||
controller.logger.Warnf("execute session: SSH handshake failed for VM %s on worker %s, retrying: %v",
|
||||
vm.Name, vm.Worker, err)
|
||||
|
||||
if !waitExecuteSSHRetry(ctx) {
|
||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
return nil, nil, ctxErr
|
||||
}
|
||||
|
||||
return nil, nil, lastErr
|
||||
}
|
||||
|
||||
nextTunnel, responderImpl := controller.establishExecuteSSHTunnel(ctx, vm)
|
||||
if responderImpl != nil {
|
||||
lastErr = errors.New("failed to establish SSH tunnel")
|
||||
|
||||
if !waitExecuteSSHRetry(ctx) {
|
||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
return nil, nil, ctxErr
|
||||
}
|
||||
|
||||
return nil, nil, lastErr
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
tunnel = nextTunnel
|
||||
}
|
||||
}
|
||||
|
||||
func waitExecuteSSHRetry(ctx context.Context) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-time.After(executeSessionSSHRetryDelay):
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func isNormalContextCancellation(err error) bool {
|
||||
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
||||
}
|
||||
|
||||
func newSSHClient(conn net.Conn, vm *v1.VM) (*ssh.Client, error) {
|
||||
sshUser := vm.Username
|
||||
sshPassword := vm.Password
|
||||
|
|
@ -413,6 +481,10 @@ func (controller *Controller) runExecuteSession(
|
|||
|
||||
pingCtxCancel()
|
||||
case <-ctx.Done():
|
||||
if isNormalContextCancellation(ctx.Err()) {
|
||||
return responder.Empty()
|
||||
}
|
||||
|
||||
return responder.Error(ctx.Err())
|
||||
}
|
||||
}
|
||||
|
|
@ -468,8 +540,6 @@ func consumeClientInputFrames(
|
|||
|
||||
return
|
||||
}
|
||||
|
||||
stdinClosed = true
|
||||
}
|
||||
|
||||
errCh <- nil
|
||||
|
|
|
|||
Loading…
Reference in New Issue