Rename exec endpoints to execute

This commit is contained in:
Fedor Korotkov 2026-02-08 13:13:38 +01:00
parent 8cf8c68557
commit dc0f5b45d0
5 changed files with 69 additions and 69 deletions

View File

@ -415,7 +415,7 @@ paths:
description: VM resource with the given name doesn't exist
'503':
description: Failed to establish connection with the worker responsible for the specified VM
/vms/{name}/exec:
/vms/{name}/execute:
parameters:
- in: path
name: name
@ -429,8 +429,8 @@ paths:
Frame schema:
* Client -> Server uses `ExecClientFrame`
* Server -> Client uses `ExecServerFrame`
* Client -> Server uses `ExecuteClientFrame`
* Server -> Client uses `ExecuteServerFrame`
`data` is a base64-encoded byte payload in JSON (`format: byte`).
@ -487,7 +487,7 @@ paths:
description: |
WebSocket protocol upgrade succeeded.
After upgrade, messages follow `ExecClientFrame` and `ExecServerFrame` schemas.
After upgrade, messages follow `ExecuteClientFrame` and `ExecuteServerFrame` schemas.
'400':
description: Invalid query parameter specified
'404':
@ -772,8 +772,8 @@ components:
ip:
type: string
description: The resolved IP address
ExecTerminalSize:
title: VM Exec terminal size
ExecuteTerminalSize:
title: VM Execute terminal size
type: object
properties:
rows:
@ -784,8 +784,8 @@ components:
type: integer
minimum: 1
description: Terminal column count
ExecExit:
title: VM Exec exit payload
ExecuteExit:
title: VM Execute exit payload
type: object
required:
- code
@ -794,8 +794,8 @@ components:
type: integer
format: int32
description: Process exit code
ExecClientFrame:
title: VM Exec client WebSocket frame
ExecuteClientFrame:
title: VM Execute client WebSocket frame
type: object
required:
- type
@ -814,12 +814,12 @@ components:
Base64-encoded stdin payload for `type=stdin`.
Empty payload indicates stdin EOF.
terminal:
$ref: '#/components/schemas/ExecTerminalSize'
$ref: '#/components/schemas/ExecuteTerminalSize'
example:
type: stdin
data: aGVsbG8K
ExecServerFrame:
title: VM Exec server WebSocket frame
ExecuteServerFrame:
title: VM Execute server WebSocket frame
type: object
required:
- type
@ -833,7 +833,7 @@ components:
format: byte
description: Base64-encoded output payload for `type=stdout` and `type=stderr`
exit:
$ref: '#/components/schemas/ExecExit'
$ref: '#/components/schemas/ExecuteExit'
error:
type: string
description: Error message for `type=error`

View File

@ -171,8 +171,8 @@ func (controller *Controller) initAPI() *gin.Engine {
v1.GET("/vms/:name/port-forward", func(c *gin.Context) {
controller.portForwardVM(c).Respond(c)
})
v1.GET("/vms/:name/exec", func(c *gin.Context) {
controller.execVM(c).Respond(c)
v1.GET("/vms/:name/execute", func(c *gin.Context) {
controller.executeVM(c).Respond(c)
})
v1.GET("/vms/:name/ip", func(c *gin.Context) {
controller.ip(c).Respond(c)

View File

@ -23,16 +23,16 @@ import (
"golang.org/x/crypto/ssh"
)
const execSessionRendezvousTimeout = 15 * time.Second
const executeSessionRendezvousTimeout = 15 * time.Second
type sshExec struct {
type sshExecution struct {
session *ssh.Session
stdout io.Reader
stderr io.Reader
stdin io.WriteCloser
}
func (controller *Controller) execVM(ctx *gin.Context) responder.Responder {
func (controller *Controller) executeVM(ctx *gin.Context) responder.Responder {
if responder := controller.authorizeAny(ctx, v1.ServiceAccountRoleComputeWrite,
v1.ServiceAccountRoleComputeConnect); responder != nil {
return responder
@ -85,7 +85,7 @@ func (controller *Controller) execVM(ctx *gin.Context) responder.Responder {
return responder.Code(http.StatusServiceUnavailable)
}
timeoutTimer := time.NewTimer(execSessionRendezvousTimeout)
timeoutTimer := time.NewTimer(executeSessionRendezvousTimeout)
defer timeoutTimer.Stop()
select {
@ -116,7 +116,7 @@ func (controller *Controller) execVM(ctx *gin.Context) responder.Responder {
_ = tunnel.Close()
}()
return controller.execVMViaSSHTunnel(ctx, tunnel, ws, vm, command, args)
return controller.executeVMViaSSHTunnel(ctx, tunnel, ws, vm, command, args)
case <-timeoutTimer.C:
return responder.JSON(http.StatusServiceUnavailable, NewErrorResponse(
"timed out waiting for worker %s to establish SSH tunnel", vm.Worker))
@ -125,10 +125,10 @@ func (controller *Controller) execVM(ctx *gin.Context) responder.Responder {
}
}
func (controller *Controller) execVMViaSSHTunnel(ctx context.Context, tunnel net.Conn, ws *websocket.Conn, vm *v1.VM, cmd string, args []string) responder.Responder {
func (controller *Controller) executeVMViaSSHTunnel(ctx context.Context, tunnel net.Conn, ws *websocket.Conn, vm *v1.VM, cmd string, args []string) responder.Responder {
sshClient, err := newSSHClient(tunnel, vm)
if err != nil {
controller.closeExecWithFrameError(ctx, ws, nil,
controller.closeExecuteWithFrameError(ctx, ws, nil,
fmt.Sprintf("SSH handshake with the VM failed: %v", err))
return responder.Empty()
@ -137,17 +137,17 @@ func (controller *Controller) execVMViaSSHTunnel(ctx context.Context, tunnel net
_ = sshClient.Close()
}()
exec, err := startSSHExec(sshClient, cmd, args)
execution, err := startSSHExecution(sshClient, cmd, args)
if err != nil {
controller.closeExecWithFrameError(ctx, ws, nil, err.Error())
controller.closeExecuteWithFrameError(ctx, ws, nil, err.Error())
return responder.Empty()
}
defer func() {
_ = exec.session.Close()
_ = execution.session.Close()
}()
return controller.pumpExecFrames(ctx, ws, exec)
return controller.pumpExecuteFrames(ctx, ws, execution)
}
func newSSHClient(conn net.Conn, vm *v1.VM) (*ssh.Client, error) {
@ -175,7 +175,7 @@ func newSSHClient(conn net.Conn, vm *v1.VM) (*ssh.Client, error) {
return ssh.NewClient(sshConn, chans, reqs), nil
}
func startSSHExec(client *ssh.Client, cmd string, args []string) (*sshExec, error) {
func startSSHExecution(client *ssh.Client, cmd string, args []string) (*sshExecution, error) {
session, err := client.NewSession()
if err != nil {
return nil, fmt.Errorf("failed to open SSH session: %v", err)
@ -209,7 +209,7 @@ func startSSHExec(client *ssh.Client, cmd string, args []string) (*sshExec, erro
return nil, fmt.Errorf("failed to start SSH command: %v", err)
}
return &sshExec{
return &sshExecution{
session: session,
stdout: stdoutPipe,
stderr: stderrPipe,
@ -217,10 +217,10 @@ func startSSHExec(client *ssh.Client, cmd string, args []string) (*sshExec, erro
}, nil
}
func (controller *Controller) pumpExecFrames(
func (controller *Controller) pumpExecuteFrames(
ctx context.Context,
ws *websocket.Conn,
exec *sshExec,
execution *sshExecution,
) responder.Responder {
wsNetConn := websocket.NetConn(ctx, ws, websocket.MessageText)
defer func() {
@ -237,10 +237,10 @@ func (controller *Controller) pumpExecFrames(
exitCh := make(chan int32, 1)
exitErrCh := make(chan error, 1)
go streamExecOutput(exec.stdout, execstream.FrameTypeStdout, outCh, outDoneCh, outErrCh)
go streamExecOutput(exec.stderr, execstream.FrameTypeStderr, outCh, outDoneCh, outErrCh)
go streamExecClientFrames(decoder, exec.stdin, stdinErrCh)
go waitForSSHExecExit(exec.session, exitCh, exitErrCh)
go streamExecuteOutput(execution.stdout, execstream.FrameTypeStdout, outCh, outDoneCh, outErrCh)
go streamExecuteOutput(execution.stderr, execstream.FrameTypeStderr, outCh, outDoneCh, outErrCh)
go streamExecuteClientFrames(decoder, execution.stdin, stdinErrCh)
go waitForSSHExecutionExit(execution.session, exitCh, exitErrCh)
pingTicker := time.NewTicker(controller.pingInterval)
defer pingTicker.Stop()
@ -254,8 +254,8 @@ func (controller *Controller) pumpExecFrames(
for len(outCh) > 0 {
frame := <-outCh
if err := execstream.WriteFrame(encoder, &frame); err != nil {
return controller.wsError(ws, websocket.StatusInternalError, "exec session",
"failed to stream exec output to the client", err)
return controller.wsError(ws, websocket.StatusInternalError, "execute session",
"failed to stream execute output to the client", err)
}
}
@ -263,13 +263,13 @@ func (controller *Controller) pumpExecFrames(
Type: execstream.FrameTypeExit,
Exit: &execstream.Exit{Code: exitCode},
}); err != nil {
return controller.wsError(ws, websocket.StatusInternalError, "exec session",
"failed to send exec exit status to the client", err)
return controller.wsError(ws, websocket.StatusInternalError, "execute session",
"failed to send execute exit status to the client", err)
}
if err := ws.Close(websocket.StatusNormalClosure,
fmt.Sprintf("command exited with code %d", exitCode)); err != nil {
controller.logger.Warnf("exec session: failed to close WebSocket connection: %v", err)
controller.logger.Warnf("execute session: failed to close WebSocket connection: %v", err)
}
return responder.Empty()
@ -278,8 +278,8 @@ func (controller *Controller) pumpExecFrames(
select {
case frame := <-outCh:
if err := execstream.WriteFrame(encoder, &frame); err != nil {
return controller.wsError(ws, websocket.StatusInternalError, "exec session",
"failed to stream exec output to the client", err)
return controller.wsError(ws, websocket.StatusInternalError, "execute session",
"failed to stream execute output to the client", err)
}
case <-outDoneCh:
readersDone++
@ -288,7 +288,7 @@ func (controller *Controller) pumpExecFrames(
continue
}
controller.closeExecWithFrameError(ctx, ws, encoder,
controller.closeExecuteWithFrameError(ctx, ws, encoder,
fmt.Sprintf("failed while streaming command output: %v", err))
return responder.Empty()
@ -302,7 +302,7 @@ func (controller *Controller) pumpExecFrames(
return responder.Empty()
}
controller.closeExecWithFrameError(ctx, ws, encoder,
controller.closeExecuteWithFrameError(ctx, ws, encoder,
fmt.Sprintf("failed while reading command stdin stream: %v", err))
return responder.Empty()
@ -310,7 +310,7 @@ func (controller *Controller) pumpExecFrames(
exitObserved = true
exitCode = code
case err := <-exitErrCh:
controller.closeExecWithFrameError(ctx, ws, encoder,
controller.closeExecuteWithFrameError(ctx, ws, encoder,
fmt.Sprintf("failed while waiting for command completion: %v", err))
return responder.Empty()
@ -318,7 +318,7 @@ func (controller *Controller) pumpExecFrames(
pingCtx, pingCtxCancel := context.WithTimeout(ctx, 5*time.Second)
if err := ws.Ping(pingCtx); err != nil {
controller.logger.Warnf("exec session: failed to ping the client, "+
controller.logger.Warnf("execute session: failed to ping the client, "+
"connection might time out: %v", err)
}
@ -329,7 +329,7 @@ func (controller *Controller) pumpExecFrames(
}
}
func waitForSSHExecExit(sshSession *ssh.Session, exitCodeCh chan<- int32, exitErrCh chan<- error) {
func waitForSSHExecutionExit(sshSession *ssh.Session, exitCodeCh chan<- int32, exitErrCh chan<- error) {
if err := sshSession.Wait(); err != nil {
var exitError *ssh.ExitError
if errors.As(err, &exitError) {
@ -346,7 +346,7 @@ func waitForSSHExecExit(sshSession *ssh.Session, exitCodeCh chan<- int32, exitEr
exitCodeCh <- 0
}
func streamExecClientFrames(
func streamExecuteClientFrames(
decoder *json.Decoder,
stdin io.WriteCloser,
errCh chan<- error,
@ -409,7 +409,7 @@ func streamExecClientFrames(
}
}
func streamExecOutput(
func streamExecuteOutput(
reader io.Reader,
frameType execstream.FrameType,
outputCh chan<- execstream.Frame,
@ -463,7 +463,7 @@ func shellQuoteArg(arg string) string {
return "'" + strings.ReplaceAll(arg, "'", "'\\''") + "'"
}
func (controller *Controller) closeExecWithFrameError(
func (controller *Controller) closeExecuteWithFrameError(
ctx context.Context,
wsConn *websocket.Conn,
encoder *json.Encoder,
@ -474,11 +474,11 @@ func (controller *Controller) closeExecWithFrameError(
Type: execstream.FrameTypeError,
Error: message,
}); err != nil {
controller.logger.Warnf("exec session: failed to send error frame: %v", err)
controller.logger.Warnf("execute session: failed to send error frame: %v", err)
}
}
if err := wsConn.Close(websocket.StatusInternalError, message); err != nil {
controller.logger.Warnf("exec session: failed to close WebSocket connection: %v", err)
controller.logger.Warnf("execute session: failed to close WebSocket connection: %v", err)
}
}

View File

@ -21,7 +21,7 @@ func (writer *recordingWriteCloser) Close() error {
return nil
}
func TestStreamExecClientFramesWritesInputAndClosesOnEOFFrame(t *testing.T) {
func TestStreamExecuteClientFramesWritesInputAndClosesOnEOFFrame(t *testing.T) {
var input bytes.Buffer
encoder := execstream.NewEncoder(&input)
@ -41,14 +41,14 @@ func TestStreamExecClientFramesWritesInputAndClosesOnEOFFrame(t *testing.T) {
stdin := &recordingWriteCloser{}
errCh := make(chan error, 1)
streamExecClientFrames(decoder, stdin, errCh)
streamExecuteClientFrames(decoder, stdin, errCh)
require.NoError(t, <-errCh)
require.True(t, stdin.closed)
require.Equal(t, "hello", stdin.String())
}
func TestStreamExecClientFramesUnsupportedType(t *testing.T) {
func TestStreamExecuteClientFramesUnsupportedType(t *testing.T) {
var input bytes.Buffer
encoder := execstream.NewEncoder(&input)
@ -61,29 +61,29 @@ func TestStreamExecClientFramesUnsupportedType(t *testing.T) {
stdin := &recordingWriteCloser{}
errCh := make(chan error, 1)
streamExecClientFrames(decoder, stdin, errCh)
streamExecuteClientFrames(decoder, stdin, errCh)
require.EqualError(t, <-errCh, "unsupported frame type \"stdout\" received from client")
require.False(t, stdin.closed)
}
func TestStreamExecClientFramesClosesStdinOnDecodeError(t *testing.T) {
func TestStreamExecuteClientFramesClosesStdinOnDecodeError(t *testing.T) {
decoder := execstream.NewDecoder(bytes.NewBuffer(nil))
stdin := &recordingWriteCloser{}
errCh := make(chan error, 1)
streamExecClientFrames(decoder, stdin, errCh)
streamExecuteClientFrames(decoder, stdin, errCh)
require.ErrorIs(t, <-errCh, io.EOF)
require.True(t, stdin.closed)
}
func TestStreamExecOutputEmitsFrameAndSignalsDone(t *testing.T) {
func TestStreamExecuteOutputEmitsFrameAndSignalsDone(t *testing.T) {
outputCh := make(chan execstream.Frame, 1)
doneCh := make(chan struct{}, 1)
errCh := make(chan error, 1)
streamExecOutput(bytes.NewBufferString("payload"),
streamExecuteOutput(bytes.NewBufferString("payload"),
execstream.FrameTypeStderr, outputCh, doneCh, errCh)
select {

View File

@ -19,13 +19,13 @@ import (
"github.com/stretchr/testify/require"
)
func TestVMExec(t *testing.T) {
func TestVMExecute(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
devClient, devController, _ := devcontroller.StartIntegrationTestEnvironment(t)
vmName := "test-vm-exec-" + uuid.NewString()
vmName := "test-vm-execute-" + uuid.NewString()
err := devClient.VMs().Create(ctx, &v1.VM{
Meta: v1.Meta{
@ -51,17 +51,17 @@ func TestVMExec(t *testing.T) {
require.NoError(t, err)
require.Equal(t, v1.VMStatusRunning, vm.Status)
execConn, err := dialExec(ctx, devController.Address(), vmName, "sh", []string{
executeConn, err := dialExecute(ctx, devController.Address(), vmName, "sh", []string{
"-c",
"echo stdout-line; echo stderr-line >&2; IFS= read -r line; echo stdin:$line; exit 7",
})
require.NoError(t, err)
t.Cleanup(func() {
_ = execConn.Close()
_ = executeConn.Close()
})
encoder := execstream.NewEncoder(execConn)
decoder := execstream.NewDecoder(execConn)
encoder := execstream.NewEncoder(executeConn)
decoder := execstream.NewDecoder(executeConn)
require.NoError(t, execstream.WriteFrame(encoder, &execstream.Frame{
Type: execstream.FrameTypeStdin,
@ -105,7 +105,7 @@ func TestVMExec(t *testing.T) {
require.Contains(t, stderr.String(), "stderr-line")
}
func dialExec(
func dialExecute(
ctx context.Context,
controllerAddress string,
vmName string,
@ -121,7 +121,7 @@ func dialExec(
return nil, fmt.Errorf("failed to parse controller address: %w", err)
}
endpointURL = endpointURL.JoinPath("v1", "vms", vmName, "exec")
endpointURL = endpointURL.JoinPath("v1", "vms", vmName, "execute")
if endpointURL.Scheme == "http" {
endpointURL.Scheme = "ws"
} else {
@ -144,7 +144,7 @@ func dialExec(
_ = resp.Body.Close()
}
return nil, fmt.Errorf("failed to establish exec websocket: %w", err)
return nil, fmt.Errorf("failed to establish execute websocket: %w", err)
}
return websocket.NetConn(ctx, wsConn, websocket.MessageText), nil