orchard/internal/controller/api_rpc_portforward.go

69 lines
2.1 KiB
Go

package controller
import (
"context"
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
"github.com/cirruslabs/orchard/internal/responder"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/coder/websocket"
"github.com/gin-gonic/gin"
"net"
"time"
)
func (controller *Controller) rpcPortForward(ctx *gin.Context) responder.Responder {
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
// Retrieve and parse path and query parameters
session := ctx.Query("session")
errorMessage := ctx.Query("errorMessage")
// Perform WebSocket protocol upgrade
wsConn, err := websocket.Accept(ctx.Writer, ctx.Request, &websocket.AcceptOptions{
OriginPatterns: []string{"*"},
})
if err != nil {
return responder.Error(err)
}
defer func() {
// Ensure that we always close the accepted WebSocket connection,
// otherwise resource leak is possible[1]
//
// [1]: https://github.com/coder/websocket/issues/445#issuecomment-2053792044
_ = wsConn.CloseNow()
}()
// Respond with the established connection
proxyCtx, err := controller.connRendezvous.Respond(session, rendezvous.ResultWithErrorMessage[net.Conn]{
Result: websocket.NetConn(ctx, wsConn, websocket.MessageBinary),
ErrorMessage: errorMessage,
})
if err != nil {
return controller.wsError(wsConn, websocket.StatusInternalError, "port forwarding RPC",
"failure to respond with the established WebSocket connection", err)
}
for {
select {
case <-proxyCtx.Done():
// Do not close the WebSocket connection as it should be already closed by our rendezvous party
return responder.Empty()
case <-time.After(controller.pingInterval):
pingCtx, pingCtxCancel := context.WithTimeout(ctx, 5*time.Second)
if err := wsConn.Ping(pingCtx); err != nil {
controller.logger.Warnf("port forwarding RPC: failed to ping the worker, "+
"connection might time out: %v", err)
}
pingCtxCancel()
case <-ctx.Done():
// Connection shouldn't be normally closed by the worker
return controller.wsErrorNoClose("watch RPC",
"worker unexpectedly disconnected", ctx.Err())
}
}
}