orchard/internal/controller/rpc.go

105 lines
3.2 KiB
Go

package controller
import (
"context"
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
v1pkg "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/cirruslabs/orchard/rpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"
"net"
//nolint:staticcheck // https://github.com/mitchellh/go-grpc-net-conn/pull/1
"github.com/golang/protobuf/proto"
grpc_net_conn "github.com/mitchellh/go-grpc-net-conn"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (controller *Controller) Watch(_ *emptypb.Empty, stream rpc.Controller_WatchServer) error {
if !controller.authorizeGRPC(stream.Context(), v1pkg.ServiceAccountRoleComputeWrite) {
return status.Errorf(codes.Unauthenticated, "auth failed")
}
workerMetadataValue := metadata.ValueFromIncomingContext(stream.Context(), rpc.MetadataWorkerNameKey)
if len(workerMetadataValue) == 0 {
return status.Errorf(codes.InvalidArgument, "no worker ident in metadata")
}
worker := workerMetadataValue[0]
workerCh, cancel := controller.workerNotifier.Register(stream.Context(), worker)
defer cancel()
for {
select {
case msg := <-workerCh:
if err := stream.Send(msg); err != nil {
return err
}
case <-stream.Context().Done():
return stream.Context().Err()
}
}
}
func (controller *Controller) PortForward(stream rpc.Controller_PortForwardServer) error {
if !controller.authorizeGRPC(stream.Context(), v1pkg.ServiceAccountRoleComputeWrite) {
return status.Errorf(codes.Unauthenticated, "auth failed")
}
sessionMetadataValue := metadata.ValueFromIncomingContext(stream.Context(), rpc.MetadataWorkerPortForwardingSessionKey)
if len(sessionMetadataValue) == 0 {
return status.Errorf(codes.InvalidArgument, "no session in metadata")
}
conn := &grpc_net_conn.Conn{
Stream: stream,
Request: &rpc.PortForwardData{},
Response: &rpc.PortForwardData{},
Encode: grpc_net_conn.SimpleEncoder(func(message proto.Message) *[]byte {
return &message.(*rpc.PortForwardData).Data
}),
Decode: grpc_net_conn.SimpleDecoder(func(message proto.Message) *[]byte {
return &message.(*rpc.PortForwardData).Data
}),
}
// make connection rendezvous aware of the connection
proxyCtx, err := controller.connRendezvous.Respond(sessionMetadataValue[0],
rendezvous.ResultWithErrorMessage[net.Conn]{
Result: conn,
},
)
if err != nil {
return err
}
select {
case <-proxyCtx.Done():
return proxyCtx.Err()
case <-stream.Context().Done():
return stream.Context().Err()
}
}
func (controller *Controller) ResolveIP(ctx context.Context, request *rpc.ResolveIPResult) (*emptypb.Empty, error) {
if !controller.authorizeGRPC(ctx, v1pkg.ServiceAccountRoleComputeWrite) {
return nil, status.Errorf(codes.Unauthenticated, "auth failed")
}
sessionMetadataValue := metadata.ValueFromIncomingContext(ctx, rpc.MetadataWorkerPortForwardingSessionKey)
if len(sessionMetadataValue) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "no session in metadata")
}
// Respond with the resolved IP address
_, err := controller.ipRendezvous.Respond(sessionMetadataValue[0], rendezvous.ResultWithErrorMessage[string]{
Result: request.Ip,
})
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}