diff --git a/api/openapi.yaml b/api/openapi.yaml index ba84512..34aa6ee 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -415,7 +415,7 @@ paths: description: VM resource with the given name doesn't exist '503': description: Failed to establish connection with the worker responsible for the specified VM - /vms/{name}/exec: + /vms/{name}/execute: parameters: - in: path name: name @@ -429,8 +429,8 @@ paths: Frame schema: - * Client -> Server uses `ExecClientFrame` - * Server -> Client uses `ExecServerFrame` + * Client -> Server uses `ExecuteClientFrame` + * Server -> Client uses `ExecuteServerFrame` `data` is a base64-encoded byte payload in JSON (`format: byte`). @@ -487,7 +487,7 @@ paths: description: | WebSocket protocol upgrade succeeded. - After upgrade, messages follow `ExecClientFrame` and `ExecServerFrame` schemas. + After upgrade, messages follow `ExecuteClientFrame` and `ExecuteServerFrame` schemas. '400': description: Invalid query parameter specified '404': @@ -772,8 +772,8 @@ components: ip: type: string description: The resolved IP address - ExecTerminalSize: - title: VM Exec terminal size + ExecuteTerminalSize: + title: VM Execute terminal size type: object properties: rows: @@ -784,8 +784,8 @@ components: type: integer minimum: 1 description: Terminal column count - ExecExit: - title: VM Exec exit payload + ExecuteExit: + title: VM Execute exit payload type: object required: - code @@ -794,8 +794,8 @@ components: type: integer format: int32 description: Process exit code - ExecClientFrame: - title: VM Exec client WebSocket frame + ExecuteClientFrame: + title: VM Execute client WebSocket frame type: object required: - type @@ -814,12 +814,12 @@ components: Base64-encoded stdin payload for `type=stdin`. Empty payload indicates stdin EOF. terminal: - $ref: '#/components/schemas/ExecTerminalSize' + $ref: '#/components/schemas/ExecuteTerminalSize' example: type: stdin data: aGVsbG8K - ExecServerFrame: - title: VM Exec server WebSocket frame + ExecuteServerFrame: + title: VM Execute server WebSocket frame type: object required: - type @@ -833,7 +833,7 @@ components: format: byte description: Base64-encoded output payload for `type=stdout` and `type=stderr` exit: - $ref: '#/components/schemas/ExecExit' + $ref: '#/components/schemas/ExecuteExit' error: type: string description: Error message for `type=error` diff --git a/internal/controller/api.go b/internal/controller/api.go index 3c31b24..095de72 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -171,8 +171,8 @@ func (controller *Controller) initAPI() *gin.Engine { v1.GET("/vms/:name/port-forward", func(c *gin.Context) { controller.portForwardVM(c).Respond(c) }) - v1.GET("/vms/:name/exec", func(c *gin.Context) { - controller.execVM(c).Respond(c) + v1.GET("/vms/:name/execute", func(c *gin.Context) { + controller.executeVM(c).Respond(c) }) v1.GET("/vms/:name/ip", func(c *gin.Context) { controller.ip(c).Respond(c) diff --git a/internal/controller/api_vms_exec.go b/internal/controller/api_vms_execute.go similarity index 80% rename from internal/controller/api_vms_exec.go rename to internal/controller/api_vms_execute.go index c59e9b1..c915b3e 100644 --- a/internal/controller/api_vms_exec.go +++ b/internal/controller/api_vms_execute.go @@ -23,16 +23,16 @@ import ( "golang.org/x/crypto/ssh" ) -const execSessionRendezvousTimeout = 15 * time.Second +const executeSessionRendezvousTimeout = 15 * time.Second -type sshExec struct { +type sshExecution struct { session *ssh.Session stdout io.Reader stderr io.Reader stdin io.WriteCloser } -func (controller *Controller) execVM(ctx *gin.Context) responder.Responder { +func (controller *Controller) executeVM(ctx *gin.Context) responder.Responder { if responder := controller.authorizeAny(ctx, v1.ServiceAccountRoleComputeWrite, v1.ServiceAccountRoleComputeConnect); responder != nil { return responder @@ -85,7 +85,7 @@ func (controller *Controller) execVM(ctx *gin.Context) responder.Responder { return responder.Code(http.StatusServiceUnavailable) } - timeoutTimer := time.NewTimer(execSessionRendezvousTimeout) + timeoutTimer := time.NewTimer(executeSessionRendezvousTimeout) defer timeoutTimer.Stop() select { @@ -116,7 +116,7 @@ func (controller *Controller) execVM(ctx *gin.Context) responder.Responder { _ = tunnel.Close() }() - return controller.execVMViaSSHTunnel(ctx, tunnel, ws, vm, command, args) + return controller.executeVMViaSSHTunnel(ctx, tunnel, ws, vm, command, args) case <-timeoutTimer.C: return responder.JSON(http.StatusServiceUnavailable, NewErrorResponse( "timed out waiting for worker %s to establish SSH tunnel", vm.Worker)) @@ -125,10 +125,10 @@ func (controller *Controller) execVM(ctx *gin.Context) responder.Responder { } } -func (controller *Controller) execVMViaSSHTunnel(ctx context.Context, tunnel net.Conn, ws *websocket.Conn, vm *v1.VM, cmd string, args []string) responder.Responder { +func (controller *Controller) executeVMViaSSHTunnel(ctx context.Context, tunnel net.Conn, ws *websocket.Conn, vm *v1.VM, cmd string, args []string) responder.Responder { sshClient, err := newSSHClient(tunnel, vm) if err != nil { - controller.closeExecWithFrameError(ctx, ws, nil, + controller.closeExecuteWithFrameError(ctx, ws, nil, fmt.Sprintf("SSH handshake with the VM failed: %v", err)) return responder.Empty() @@ -137,17 +137,17 @@ func (controller *Controller) execVMViaSSHTunnel(ctx context.Context, tunnel net _ = sshClient.Close() }() - exec, err := startSSHExec(sshClient, cmd, args) + execution, err := startSSHExecution(sshClient, cmd, args) if err != nil { - controller.closeExecWithFrameError(ctx, ws, nil, err.Error()) + controller.closeExecuteWithFrameError(ctx, ws, nil, err.Error()) return responder.Empty() } defer func() { - _ = exec.session.Close() + _ = execution.session.Close() }() - return controller.pumpExecFrames(ctx, ws, exec) + return controller.pumpExecuteFrames(ctx, ws, execution) } func newSSHClient(conn net.Conn, vm *v1.VM) (*ssh.Client, error) { @@ -175,7 +175,7 @@ func newSSHClient(conn net.Conn, vm *v1.VM) (*ssh.Client, error) { return ssh.NewClient(sshConn, chans, reqs), nil } -func startSSHExec(client *ssh.Client, cmd string, args []string) (*sshExec, error) { +func startSSHExecution(client *ssh.Client, cmd string, args []string) (*sshExecution, error) { session, err := client.NewSession() if err != nil { return nil, fmt.Errorf("failed to open SSH session: %v", err) @@ -209,7 +209,7 @@ func startSSHExec(client *ssh.Client, cmd string, args []string) (*sshExec, erro return nil, fmt.Errorf("failed to start SSH command: %v", err) } - return &sshExec{ + return &sshExecution{ session: session, stdout: stdoutPipe, stderr: stderrPipe, @@ -217,10 +217,10 @@ func startSSHExec(client *ssh.Client, cmd string, args []string) (*sshExec, erro }, nil } -func (controller *Controller) pumpExecFrames( +func (controller *Controller) pumpExecuteFrames( ctx context.Context, ws *websocket.Conn, - exec *sshExec, + execution *sshExecution, ) responder.Responder { wsNetConn := websocket.NetConn(ctx, ws, websocket.MessageText) defer func() { @@ -237,10 +237,10 @@ func (controller *Controller) pumpExecFrames( exitCh := make(chan int32, 1) exitErrCh := make(chan error, 1) - go streamExecOutput(exec.stdout, execstream.FrameTypeStdout, outCh, outDoneCh, outErrCh) - go streamExecOutput(exec.stderr, execstream.FrameTypeStderr, outCh, outDoneCh, outErrCh) - go streamExecClientFrames(decoder, exec.stdin, stdinErrCh) - go waitForSSHExecExit(exec.session, exitCh, exitErrCh) + go streamExecuteOutput(execution.stdout, execstream.FrameTypeStdout, outCh, outDoneCh, outErrCh) + go streamExecuteOutput(execution.stderr, execstream.FrameTypeStderr, outCh, outDoneCh, outErrCh) + go streamExecuteClientFrames(decoder, execution.stdin, stdinErrCh) + go waitForSSHExecutionExit(execution.session, exitCh, exitErrCh) pingTicker := time.NewTicker(controller.pingInterval) defer pingTicker.Stop() @@ -254,8 +254,8 @@ func (controller *Controller) pumpExecFrames( for len(outCh) > 0 { frame := <-outCh if err := execstream.WriteFrame(encoder, &frame); err != nil { - return controller.wsError(ws, websocket.StatusInternalError, "exec session", - "failed to stream exec output to the client", err) + return controller.wsError(ws, websocket.StatusInternalError, "execute session", + "failed to stream execute output to the client", err) } } @@ -263,13 +263,13 @@ func (controller *Controller) pumpExecFrames( Type: execstream.FrameTypeExit, Exit: &execstream.Exit{Code: exitCode}, }); err != nil { - return controller.wsError(ws, websocket.StatusInternalError, "exec session", - "failed to send exec exit status to the client", err) + return controller.wsError(ws, websocket.StatusInternalError, "execute session", + "failed to send execute exit status to the client", err) } if err := ws.Close(websocket.StatusNormalClosure, fmt.Sprintf("command exited with code %d", exitCode)); err != nil { - controller.logger.Warnf("exec session: failed to close WebSocket connection: %v", err) + controller.logger.Warnf("execute session: failed to close WebSocket connection: %v", err) } return responder.Empty() @@ -278,8 +278,8 @@ func (controller *Controller) pumpExecFrames( select { case frame := <-outCh: if err := execstream.WriteFrame(encoder, &frame); err != nil { - return controller.wsError(ws, websocket.StatusInternalError, "exec session", - "failed to stream exec output to the client", err) + return controller.wsError(ws, websocket.StatusInternalError, "execute session", + "failed to stream execute output to the client", err) } case <-outDoneCh: readersDone++ @@ -288,7 +288,7 @@ func (controller *Controller) pumpExecFrames( continue } - controller.closeExecWithFrameError(ctx, ws, encoder, + controller.closeExecuteWithFrameError(ctx, ws, encoder, fmt.Sprintf("failed while streaming command output: %v", err)) return responder.Empty() @@ -302,7 +302,7 @@ func (controller *Controller) pumpExecFrames( return responder.Empty() } - controller.closeExecWithFrameError(ctx, ws, encoder, + controller.closeExecuteWithFrameError(ctx, ws, encoder, fmt.Sprintf("failed while reading command stdin stream: %v", err)) return responder.Empty() @@ -310,7 +310,7 @@ func (controller *Controller) pumpExecFrames( exitObserved = true exitCode = code case err := <-exitErrCh: - controller.closeExecWithFrameError(ctx, ws, encoder, + controller.closeExecuteWithFrameError(ctx, ws, encoder, fmt.Sprintf("failed while waiting for command completion: %v", err)) return responder.Empty() @@ -318,7 +318,7 @@ func (controller *Controller) pumpExecFrames( pingCtx, pingCtxCancel := context.WithTimeout(ctx, 5*time.Second) if err := ws.Ping(pingCtx); err != nil { - controller.logger.Warnf("exec session: failed to ping the client, "+ + controller.logger.Warnf("execute session: failed to ping the client, "+ "connection might time out: %v", err) } @@ -329,7 +329,7 @@ func (controller *Controller) pumpExecFrames( } } -func waitForSSHExecExit(sshSession *ssh.Session, exitCodeCh chan<- int32, exitErrCh chan<- error) { +func waitForSSHExecutionExit(sshSession *ssh.Session, exitCodeCh chan<- int32, exitErrCh chan<- error) { if err := sshSession.Wait(); err != nil { var exitError *ssh.ExitError if errors.As(err, &exitError) { @@ -346,7 +346,7 @@ func waitForSSHExecExit(sshSession *ssh.Session, exitCodeCh chan<- int32, exitEr exitCodeCh <- 0 } -func streamExecClientFrames( +func streamExecuteClientFrames( decoder *json.Decoder, stdin io.WriteCloser, errCh chan<- error, @@ -409,7 +409,7 @@ func streamExecClientFrames( } } -func streamExecOutput( +func streamExecuteOutput( reader io.Reader, frameType execstream.FrameType, outputCh chan<- execstream.Frame, @@ -463,7 +463,7 @@ func shellQuoteArg(arg string) string { return "'" + strings.ReplaceAll(arg, "'", "'\\''") + "'" } -func (controller *Controller) closeExecWithFrameError( +func (controller *Controller) closeExecuteWithFrameError( ctx context.Context, wsConn *websocket.Conn, encoder *json.Encoder, @@ -474,11 +474,11 @@ func (controller *Controller) closeExecWithFrameError( Type: execstream.FrameTypeError, Error: message, }); err != nil { - controller.logger.Warnf("exec session: failed to send error frame: %v", err) + controller.logger.Warnf("execute session: failed to send error frame: %v", err) } } if err := wsConn.Close(websocket.StatusInternalError, message); err != nil { - controller.logger.Warnf("exec session: failed to close WebSocket connection: %v", err) + controller.logger.Warnf("execute session: failed to close WebSocket connection: %v", err) } } diff --git a/internal/controller/api_vms_exec_test.go b/internal/controller/api_vms_execute_test.go similarity index 82% rename from internal/controller/api_vms_exec_test.go rename to internal/controller/api_vms_execute_test.go index 9980192..96acd0b 100644 --- a/internal/controller/api_vms_exec_test.go +++ b/internal/controller/api_vms_execute_test.go @@ -21,7 +21,7 @@ func (writer *recordingWriteCloser) Close() error { return nil } -func TestStreamExecClientFramesWritesInputAndClosesOnEOFFrame(t *testing.T) { +func TestStreamExecuteClientFramesWritesInputAndClosesOnEOFFrame(t *testing.T) { var input bytes.Buffer encoder := execstream.NewEncoder(&input) @@ -41,14 +41,14 @@ func TestStreamExecClientFramesWritesInputAndClosesOnEOFFrame(t *testing.T) { stdin := &recordingWriteCloser{} errCh := make(chan error, 1) - streamExecClientFrames(decoder, stdin, errCh) + streamExecuteClientFrames(decoder, stdin, errCh) require.NoError(t, <-errCh) require.True(t, stdin.closed) require.Equal(t, "hello", stdin.String()) } -func TestStreamExecClientFramesUnsupportedType(t *testing.T) { +func TestStreamExecuteClientFramesUnsupportedType(t *testing.T) { var input bytes.Buffer encoder := execstream.NewEncoder(&input) @@ -61,29 +61,29 @@ func TestStreamExecClientFramesUnsupportedType(t *testing.T) { stdin := &recordingWriteCloser{} errCh := make(chan error, 1) - streamExecClientFrames(decoder, stdin, errCh) + streamExecuteClientFrames(decoder, stdin, errCh) require.EqualError(t, <-errCh, "unsupported frame type \"stdout\" received from client") require.False(t, stdin.closed) } -func TestStreamExecClientFramesClosesStdinOnDecodeError(t *testing.T) { +func TestStreamExecuteClientFramesClosesStdinOnDecodeError(t *testing.T) { decoder := execstream.NewDecoder(bytes.NewBuffer(nil)) stdin := &recordingWriteCloser{} errCh := make(chan error, 1) - streamExecClientFrames(decoder, stdin, errCh) + streamExecuteClientFrames(decoder, stdin, errCh) require.ErrorIs(t, <-errCh, io.EOF) require.True(t, stdin.closed) } -func TestStreamExecOutputEmitsFrameAndSignalsDone(t *testing.T) { +func TestStreamExecuteOutputEmitsFrameAndSignalsDone(t *testing.T) { outputCh := make(chan execstream.Frame, 1) doneCh := make(chan struct{}, 1) errCh := make(chan error, 1) - streamExecOutput(bytes.NewBufferString("payload"), + streamExecuteOutput(bytes.NewBufferString("payload"), execstream.FrameTypeStderr, outputCh, doneCh, errCh) select { diff --git a/internal/tests/exec_test.go b/internal/tests/execute_test.go similarity index 88% rename from internal/tests/exec_test.go rename to internal/tests/execute_test.go index ac9212e..381f70e 100644 --- a/internal/tests/exec_test.go +++ b/internal/tests/execute_test.go @@ -19,13 +19,13 @@ import ( "github.com/stretchr/testify/require" ) -func TestVMExec(t *testing.T) { +func TestVMExecute(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() devClient, devController, _ := devcontroller.StartIntegrationTestEnvironment(t) - vmName := "test-vm-exec-" + uuid.NewString() + vmName := "test-vm-execute-" + uuid.NewString() err := devClient.VMs().Create(ctx, &v1.VM{ Meta: v1.Meta{ @@ -51,17 +51,17 @@ func TestVMExec(t *testing.T) { require.NoError(t, err) require.Equal(t, v1.VMStatusRunning, vm.Status) - execConn, err := dialExec(ctx, devController.Address(), vmName, "sh", []string{ + executeConn, err := dialExecute(ctx, devController.Address(), vmName, "sh", []string{ "-c", "echo stdout-line; echo stderr-line >&2; IFS= read -r line; echo stdin:$line; exit 7", }) require.NoError(t, err) t.Cleanup(func() { - _ = execConn.Close() + _ = executeConn.Close() }) - encoder := execstream.NewEncoder(execConn) - decoder := execstream.NewDecoder(execConn) + encoder := execstream.NewEncoder(executeConn) + decoder := execstream.NewDecoder(executeConn) require.NoError(t, execstream.WriteFrame(encoder, &execstream.Frame{ Type: execstream.FrameTypeStdin, @@ -105,7 +105,7 @@ func TestVMExec(t *testing.T) { require.Contains(t, stderr.String(), "stderr-line") } -func dialExec( +func dialExecute( ctx context.Context, controllerAddress string, vmName string, @@ -121,7 +121,7 @@ func dialExec( return nil, fmt.Errorf("failed to parse controller address: %w", err) } - endpointURL = endpointURL.JoinPath("v1", "vms", vmName, "exec") + endpointURL = endpointURL.JoinPath("v1", "vms", vmName, "execute") if endpointURL.Scheme == "http" { endpointURL.Scheme = "ws" } else { @@ -144,7 +144,7 @@ func dialExec( _ = resp.Body.Close() } - return nil, fmt.Errorf("failed to establish exec websocket: %w", err) + return nil, fmt.Errorf("failed to establish execute websocket: %w", err) } return websocket.NetConn(ctx, wsConn, websocket.MessageText), nil