105 lines
3.2 KiB
Go
105 lines
3.2 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
|
|
v1pkg "github.com/cirruslabs/orchard/pkg/resource/v1"
|
|
"github.com/cirruslabs/orchard/rpc"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
"net"
|
|
|
|
//nolint:staticcheck // https://github.com/mitchellh/go-grpc-net-conn/pull/1
|
|
"github.com/golang/protobuf/proto"
|
|
grpc_net_conn "github.com/mitchellh/go-grpc-net-conn"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
func (controller *Controller) Watch(_ *emptypb.Empty, stream rpc.Controller_WatchServer) error {
|
|
if !controller.authorizeGRPC(stream.Context(), v1pkg.ServiceAccountRoleComputeWrite) {
|
|
return status.Errorf(codes.Unauthenticated, "auth failed")
|
|
}
|
|
|
|
workerMetadataValue := metadata.ValueFromIncomingContext(stream.Context(), rpc.MetadataWorkerNameKey)
|
|
if len(workerMetadataValue) == 0 {
|
|
return status.Errorf(codes.InvalidArgument, "no worker ident in metadata")
|
|
}
|
|
|
|
worker := workerMetadataValue[0]
|
|
workerCh, cancel := controller.workerNotifier.Register(stream.Context(), worker)
|
|
defer cancel()
|
|
|
|
for {
|
|
select {
|
|
case msg := <-workerCh:
|
|
if err := stream.Send(msg); err != nil {
|
|
return err
|
|
}
|
|
case <-stream.Context().Done():
|
|
return stream.Context().Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (controller *Controller) PortForward(stream rpc.Controller_PortForwardServer) error {
|
|
if !controller.authorizeGRPC(stream.Context(), v1pkg.ServiceAccountRoleComputeWrite) {
|
|
return status.Errorf(codes.Unauthenticated, "auth failed")
|
|
}
|
|
|
|
sessionMetadataValue := metadata.ValueFromIncomingContext(stream.Context(), rpc.MetadataWorkerPortForwardingSessionKey)
|
|
if len(sessionMetadataValue) == 0 {
|
|
return status.Errorf(codes.InvalidArgument, "no session in metadata")
|
|
}
|
|
|
|
conn := &grpc_net_conn.Conn{
|
|
Stream: stream,
|
|
Request: &rpc.PortForwardData{},
|
|
Response: &rpc.PortForwardData{},
|
|
Encode: grpc_net_conn.SimpleEncoder(func(message proto.Message) *[]byte {
|
|
return &message.(*rpc.PortForwardData).Data
|
|
}),
|
|
Decode: grpc_net_conn.SimpleDecoder(func(message proto.Message) *[]byte {
|
|
return &message.(*rpc.PortForwardData).Data
|
|
}),
|
|
}
|
|
|
|
// make connection rendezvous aware of the connection
|
|
proxyCtx, err := controller.connRendezvous.Respond(sessionMetadataValue[0],
|
|
rendezvous.ResultWithErrorMessage[net.Conn]{
|
|
Result: conn,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case <-proxyCtx.Done():
|
|
return proxyCtx.Err()
|
|
case <-stream.Context().Done():
|
|
return stream.Context().Err()
|
|
}
|
|
}
|
|
|
|
func (controller *Controller) ResolveIP(ctx context.Context, request *rpc.ResolveIPResult) (*emptypb.Empty, error) {
|
|
if !controller.authorizeGRPC(ctx, v1pkg.ServiceAccountRoleComputeWrite) {
|
|
return nil, status.Errorf(codes.Unauthenticated, "auth failed")
|
|
}
|
|
|
|
sessionMetadataValue := metadata.ValueFromIncomingContext(ctx, rpc.MetadataWorkerPortForwardingSessionKey)
|
|
if len(sessionMetadataValue) == 0 {
|
|
return nil, status.Errorf(codes.InvalidArgument, "no session in metadata")
|
|
}
|
|
|
|
// Respond with the resolved IP address
|
|
_, err := controller.ipRendezvous.Respond(sessionMetadataValue[0], rendezvous.ResultWithErrorMessage[string]{
|
|
Result: request.Ip,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &emptypb.Empty{}, nil
|
|
}
|