From fb3056d3ae382d2eaa40070b50a5e543db75c77d Mon Sep 17 00:00:00 2001 From: Fedor Korotkov Date: Fri, 17 Mar 2023 06:11:28 -0400 Subject: [PATCH] Refactorings for simplify readability (#35) --- .../concurrentmap/concurrentmap.go | 0 internal/controller/api.go | 6 +- internal/controller/api_vms_portforward.go | 29 +- internal/controller/controller.go | 13 +- internal/controller/notifier/notifier.go | 57 ++ .../notifier_test.go} | 14 +- internal/controller/proxy/proxy.go | 49 ++ .../{rendezvous => proxy}/proxy_test.go | 6 +- internal/controller/rendezvous/proxy.go | 49 -- internal/controller/rendezvous/watcher.go | 57 -- internal/controller/rpc.go | 55 +- internal/controller/scheduler.go | 4 +- internal/worker/rpc.go | 54 +- internal/worker/worker.go | 12 +- pkg/client/client.go | 10 +- pkg/client/vms.go | 4 +- pkg/resource/v1/v1.go | 4 +- rpc/constants.go | 12 + rpc/orchard.pb.go | 667 +++--------------- rpc/orchard.proto | 51 +- rpc/orchard_grpc.pb.go | 83 ++- 21 files changed, 360 insertions(+), 876 deletions(-) rename internal/{controller/rendezvous => }/concurrentmap/concurrentmap.go (100%) create mode 100644 internal/controller/notifier/notifier.go rename internal/controller/{rendezvous/watcher_test.go => notifier/notifier_test.go} (58%) create mode 100644 internal/controller/proxy/proxy.go rename internal/controller/{rendezvous => proxy}/proxy_test.go (81%) delete mode 100644 internal/controller/rendezvous/proxy.go delete mode 100644 internal/controller/rendezvous/watcher.go create mode 100644 rpc/constants.go diff --git a/internal/controller/rendezvous/concurrentmap/concurrentmap.go b/internal/concurrentmap/concurrentmap.go similarity index 100% rename from internal/controller/rendezvous/concurrentmap/concurrentmap.go rename to internal/concurrentmap/concurrentmap.go diff --git a/internal/controller/api.go b/internal/controller/api.go index 0600520..b1ca810 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -6,8 +6,8 @@ import ( "errors" storepkg "github.com/cirruslabs/orchard/internal/controller/store" "github.com/cirruslabs/orchard/internal/responder" - "github.com/cirruslabs/orchard/pkg/client" v1pkg "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/cirruslabs/orchard/rpc" "github.com/deckarep/golang-set/v2" "github.com/gin-gonic/gin" "google.golang.org/grpc/metadata" @@ -169,11 +169,11 @@ func (controller *Controller) authorizeGRPC(ctx context.Context, scopes ...v1pkg return true } - name := metadata.ValueFromIncomingContext(ctx, client.MetadataServiceAccountNameKey) + name := metadata.ValueFromIncomingContext(ctx, rpc.MetadataServiceAccountNameKey) if len(name) != 1 { return false } - token := metadata.ValueFromIncomingContext(ctx, client.MetadataServiceAccountTokenKey) + token := metadata.ValueFromIncomingContext(ctx, rpc.MetadataServiceAccountTokenKey) if len(token) != 1 { return false } diff --git a/internal/controller/api_vms_portforward.go b/internal/controller/api_vms_portforward.go index 4d053b0..0746218 100644 --- a/internal/controller/api_vms_portforward.go +++ b/internal/controller/api_vms_portforward.go @@ -45,35 +45,36 @@ func (controller *Controller) portForwardVM(ctx *gin.Context) responder.Responde } // Sanity-check - if vm.Worker == "" { + if vm.WorkerUID == "" { return responder.Code(http.StatusServiceUnavailable) } - // Request and wait for a rendez-vous with a worker - token := uuid.New().String() - - rendezvousConnCh, cancel := controller.proxy.Request(ctx, token) + // Request and wait for a connection with a worker + session := uuid.New().String() + boomerangConnCh, cancel := controller.proxy.Request(ctx, session) defer cancel() - err = controller.watcher.Notify(ctx, vm.Worker, &rpc.WatchFromController{ - Action: &rpc.WatchFromController_PortForwardAction{ - PortForwardAction: &rpc.WatchFromController_PortForward{ - Token: token, - VmUid: vm.UID, - VmPort: uint32(port), + // send request to worker to initiate port-forwarding connection back to us + err = controller.workerNotifier.Notify(ctx, vm.WorkerUID, &rpc.WatchInstruction{ + Action: &rpc.WatchInstruction_PortForwardAction{ + PortForwardAction: &rpc.WatchInstruction_PortForward{ + Session: session, + VmUid: vm.UID, + VmPort: uint32(port), }, }, }) if err != nil { - controller.logger.Warnf("failed to rendez-vous with the worker %s: %v", vm.Worker, err) + controller.logger.Warnf("failed to request port-forwarding from the worker %s: %v", vm.WorkerUID, err) return responder.Code(http.StatusServiceUnavailable) } + // worker will asynchronously start port-forwarding so we wait select { - case rendezvousConn := <-rendezvousConnCh: + case fromWorkerConnection := <-boomerangConnCh: websocket.Handler(func(wsConn *websocket.Conn) { - if err := proxy.Connections(wsConn, rendezvousConn); err != nil { + if err := proxy.Connections(wsConn, fromWorkerConnection); err != nil { controller.logger.Warnf("failed to port-forward: %v", err) } }).ServeHTTP(ctx.Writer, ctx.Request) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 79e4b8a..c77364a 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -5,7 +5,8 @@ import ( "crypto/tls" "errors" "fmt" - "github.com/cirruslabs/orchard/internal/controller/rendezvous" + "github.com/cirruslabs/orchard/internal/controller/notifier" + "github.com/cirruslabs/orchard/internal/controller/proxy" storepkg "github.com/cirruslabs/orchard/internal/controller/store" "github.com/cirruslabs/orchard/internal/controller/store/badger" "github.com/cirruslabs/orchard/internal/netconstants" @@ -37,16 +38,16 @@ type Controller struct { store storepkg.Store logger *zap.SugaredLogger grpcServer *grpc.Server - watcher *rendezvous.Watcher - proxy *rendezvous.Proxy + workerNotifier *notifier.Notifier + proxy *proxy.Proxy rpc.UnimplementedControllerServer } func New(opts ...Option) (*Controller, error) { controller := &Controller{ - watcher: rendezvous.NewWatcher(), - proxy: rendezvous.NewProxy(), + workerNotifier: notifier.NewNotifier(), + proxy: proxy.NewProxy(), } // Apply options @@ -129,7 +130,7 @@ func (controller *Controller) EnsureServiceAccount(serviceAccount *v1.ServiceAcc func (controller *Controller) Run(ctx context.Context) error { // Run the scheduler so that each VM will eventually - // be assigned to a specific Worker + // be assigned to a specific WorkerUID go func() { err := runScheduler(controller.store) if err != nil { diff --git a/internal/controller/notifier/notifier.go b/internal/controller/notifier/notifier.go new file mode 100644 index 0000000..0b0d72f --- /dev/null +++ b/internal/controller/notifier/notifier.go @@ -0,0 +1,57 @@ +package notifier + +import ( + "context" + "errors" + "fmt" + "github.com/cirruslabs/orchard/internal/concurrentmap" + "github.com/cirruslabs/orchard/rpc" +) + +var ErrNoWorker = errors.New("no worker registered with this name") + +type Notifier struct { + workers *concurrentmap.ConcurrentMap[*WorkerSlot] +} + +type WorkerSlot struct { + ctx context.Context + ch chan *rpc.WatchInstruction +} + +func NewNotifier() *Notifier { + return &Notifier{ + workers: concurrentmap.NewConcurrentMap[*WorkerSlot](), + } +} + +func (watcher *Notifier) Register(ctx context.Context, workerUID string) (chan *rpc.WatchInstruction, func()) { + subCtx, cancel := context.WithCancel(ctx) + workerCh := make(chan *rpc.WatchInstruction) + + watcher.workers.Store(workerUID, &WorkerSlot{ + ctx: subCtx, + ch: workerCh, + }) + + return workerCh, func() { + watcher.workers.Delete(workerUID) + cancel() + } +} + +func (watcher *Notifier) Notify(ctx context.Context, workerUID string, msg *rpc.WatchInstruction) error { + slot, ok := watcher.workers.Load(workerUID) + if !ok { + return fmt.Errorf("%w: %s", ErrNoWorker, workerUID) + } + + select { + case slot.ch <- msg: + return nil + case <-slot.ctx.Done(): + return slot.ctx.Err() + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/internal/controller/rendezvous/watcher_test.go b/internal/controller/notifier/notifier_test.go similarity index 58% rename from internal/controller/rendezvous/watcher_test.go rename to internal/controller/notifier/notifier_test.go index 17693e9..0adeddd 100644 --- a/internal/controller/rendezvous/watcher_test.go +++ b/internal/controller/notifier/notifier_test.go @@ -1,9 +1,9 @@ -package rendezvous_test +package notifier_test import ( "context" "fmt" - "github.com/cirruslabs/orchard/internal/controller/rendezvous" + "github.com/cirruslabs/orchard/internal/controller/notifier" "github.com/google/uuid" "github.com/stretchr/testify/require" "sync" @@ -11,25 +11,25 @@ import ( "time" ) -func TestWatcher(t *testing.T) { +func TestNotifier(t *testing.T) { ctx := context.Background() - watcher := rendezvous.NewWatcher() + notifier := notifier.NewNotifier() var topic = uuid.New().String() - msgCh, cancel := watcher.Subscribe(context.Background(), topic) + msgCh, cancel := notifier.Register(context.Background(), topic) defer cancel() var wg sync.WaitGroup wg.Add(1) go func() { - require.NoError(t, watcher.Notify(ctx, topic, nil)) + require.NoError(t, notifier.Notify(ctx, topic, nil)) time.Sleep(time.Second) - require.NoError(t, watcher.Notify(ctx, topic, nil)) + require.NoError(t, notifier.Notify(ctx, topic, nil)) wg.Done() }() diff --git a/internal/controller/proxy/proxy.go b/internal/controller/proxy/proxy.go new file mode 100644 index 0000000..9fd0121 --- /dev/null +++ b/internal/controller/proxy/proxy.go @@ -0,0 +1,49 @@ +package proxy + +import ( + "context" + "errors" + "github.com/cirruslabs/orchard/internal/concurrentmap" + "net" +) + +var ErrInvalidToken = errors.New("invalid proxy token") + +type Proxy struct { + sessions *concurrentmap.ConcurrentMap[*TokenSlot] +} + +type TokenSlot struct { + ctx context.Context + ch chan net.Conn +} + +func NewProxy() *Proxy { + return &Proxy{ + sessions: concurrentmap.NewConcurrentMap[*TokenSlot](), + } +} + +func (proxy *Proxy) Request(ctx context.Context, session string) (chan net.Conn, func()) { + tokenSlot := &TokenSlot{ + ctx: ctx, + ch: make(chan net.Conn), + } + + proxy.sessions.Store(session, tokenSlot) + + return tokenSlot.ch, func() { + proxy.sessions.Delete(session) + } +} + +func (proxy *Proxy) Respond(session string, conn net.Conn) (context.Context, error) { + tokenSlot, ok := proxy.sessions.Load(session) + if !ok { + return nil, ErrInvalidToken + } + + tokenSlot.ch <- conn + + return tokenSlot.ctx, nil +} diff --git a/internal/controller/rendezvous/proxy_test.go b/internal/controller/proxy/proxy_test.go similarity index 81% rename from internal/controller/rendezvous/proxy_test.go rename to internal/controller/proxy/proxy_test.go index 95883f8..ff60a3f 100644 --- a/internal/controller/rendezvous/proxy_test.go +++ b/internal/controller/proxy/proxy_test.go @@ -1,8 +1,8 @@ -package rendezvous_test +package proxy_test import ( "context" - "github.com/cirruslabs/orchard/internal/controller/rendezvous" + "github.com/cirruslabs/orchard/internal/controller/proxy" "github.com/google/uuid" "github.com/stretchr/testify/require" "net" @@ -15,7 +15,7 @@ func TestProxy(t *testing.T) { expectedConn, _ := net.Pipe() - proxy := rendezvous.NewProxy() + proxy := proxy.NewProxy() token := uuid.New().String() diff --git a/internal/controller/rendezvous/proxy.go b/internal/controller/rendezvous/proxy.go deleted file mode 100644 index ad4f80e..0000000 --- a/internal/controller/rendezvous/proxy.go +++ /dev/null @@ -1,49 +0,0 @@ -package rendezvous - -import ( - "context" - "errors" - "github.com/cirruslabs/orchard/internal/controller/rendezvous/concurrentmap" - "net" -) - -var ErrInvalidToken = errors.New("invalid proxy token") - -type Proxy struct { - tokens *concurrentmap.ConcurrentMap[*TokenSlot] -} - -type TokenSlot struct { - ctx context.Context - ch chan net.Conn -} - -func NewProxy() *Proxy { - return &Proxy{ - tokens: concurrentmap.NewConcurrentMap[*TokenSlot](), - } -} - -func (proxy *Proxy) Request(ctx context.Context, token string) (chan net.Conn, func()) { - tokenSlot := &TokenSlot{ - ctx: ctx, - ch: make(chan net.Conn), - } - - proxy.tokens.Store(token, tokenSlot) - - return tokenSlot.ch, func() { - proxy.tokens.Delete(token) - } -} - -func (proxy *Proxy) Respond(token string, conn net.Conn) (context.Context, error) { - tokenSlot, ok := proxy.tokens.Load(token) - if !ok { - return nil, ErrInvalidToken - } - - tokenSlot.ch <- conn - - return tokenSlot.ctx, nil -} diff --git a/internal/controller/rendezvous/watcher.go b/internal/controller/rendezvous/watcher.go deleted file mode 100644 index df5cb3a..0000000 --- a/internal/controller/rendezvous/watcher.go +++ /dev/null @@ -1,57 +0,0 @@ -package rendezvous - -import ( - "context" - "errors" - "fmt" - "github.com/cirruslabs/orchard/internal/controller/rendezvous/concurrentmap" - "github.com/cirruslabs/orchard/rpc" -) - -var ErrNoWorker = errors.New("no worker registered with this name") - -type Watcher struct { - workers *concurrentmap.ConcurrentMap[*WorkerSlot] -} - -type WorkerSlot struct { - ctx context.Context - ch chan *rpc.WatchFromController -} - -func NewWatcher() *Watcher { - return &Watcher{ - workers: concurrentmap.NewConcurrentMap[*WorkerSlot](), - } -} - -func (watcher *Watcher) Subscribe(ctx context.Context, worker string) (chan *rpc.WatchFromController, func()) { - subCtx, cancel := context.WithCancel(ctx) - workerCh := make(chan *rpc.WatchFromController) - - watcher.workers.Store(worker, &WorkerSlot{ - ctx: subCtx, - ch: workerCh, - }) - - return workerCh, func() { - watcher.workers.Delete(worker) - cancel() - } -} - -func (watcher *Watcher) Notify(ctx context.Context, topic string, msg *rpc.WatchFromController) error { - slot, ok := watcher.workers.Load(topic) - if !ok { - return fmt.Errorf("%w: %s", ErrNoWorker, topic) - } - - select { - case slot.ch <- msg: - return nil - case <-slot.ctx.Done(): - return slot.ctx.Err() - case <-ctx.Done(): - return ctx.Err() - } -} diff --git a/internal/controller/rpc.go b/internal/controller/rpc.go index 08524fd..05214d9 100644 --- a/internal/controller/rpc.go +++ b/internal/controller/rpc.go @@ -3,6 +3,9 @@ package controller import ( v1pkg "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/cirruslabs/orchard/rpc" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/emptypb" + //nolint:staticcheck // https://github.com/mitchellh/go-grpc-net-conn/pull/1 "github.com/golang/protobuf/proto" grpc_net_conn "github.com/mitchellh/go-grpc-net-conn" @@ -10,24 +13,18 @@ import ( "google.golang.org/grpc/status" ) -func (controller *Controller) Watch(stream rpc.Controller_WatchServer) error { +func (controller *Controller) Watch(_ *emptypb.Empty, stream rpc.Controller_WatchServer) error { if !controller.authorizeGRPC(stream.Context(), v1pkg.ServiceAccountRoleWorker) { return status.Errorf(codes.Unauthenticated, "auth failed") } - // The first message is always an initialization - watchFromWorker, err := stream.Recv() - if err != nil { - return err + workerUIDMetadataValue := metadata.ValueFromIncomingContext(stream.Context(), rpc.MetadataWorkerUIDKey) + if len(workerUIDMetadataValue) == 0 { + return status.Errorf(codes.InvalidArgument, "no worker UID in metadata") } - initAction, ok := watchFromWorker.Action.(*rpc.WatchFromWorker_InitAction) - if !ok { - return status.Errorf(codes.FailedPrecondition, "expected an initialization message") - } - - // Subscribe to rendez-vous requests from the API for this worker - workerCh, cancel := controller.watcher.Subscribe(stream.Context(), initAction.InitAction.WorkerUid) + workerUID := workerUIDMetadataValue[0] + workerCh, cancel := controller.workerNotifier.Register(stream.Context(), workerUID) defer cancel() for { @@ -47,39 +44,25 @@ func (controller *Controller) PortForward(stream rpc.Controller_PortForwardServe return status.Errorf(codes.Unauthenticated, "auth failed") } - // The first message is always an initialization - portForwardFromWorker, err := stream.Recv() - if err != nil { - return err + sessionMetadataValue := metadata.ValueFromIncomingContext(stream.Context(), rpc.MetadataWorkerPortForwardingSessionKey) + if len(sessionMetadataValue) == 0 { + return status.Errorf(codes.InvalidArgument, "no session in metadata") } - initAction, ok := portForwardFromWorker.Action.(*rpc.PortForwardFromWorker_InitAction) - if !ok { - return status.Errorf(codes.FailedPrecondition, "expected an initialization message") - } - - // Satisfy the rendez-vous request conn := &grpc_net_conn.Conn{ - Stream: stream, - Request: &rpc.PortForwardFromController{ - Action: &rpc.PortForwardFromController_DataAction{ - DataAction: &rpc.PortForwardFromController_Data{}, - }, - }, - Response: &rpc.PortForwardFromWorker{ - Action: &rpc.PortForwardFromWorker_DataAction{ - DataAction: &rpc.PortForwardFromWorker_Data{}, - }, - }, + Stream: stream, + Request: &rpc.PortForwardData{}, + Response: &rpc.PortForwardData{}, Encode: grpc_net_conn.SimpleEncoder(func(message proto.Message) *[]byte { - return &message.(*rpc.PortForwardFromController).Action.(*rpc.PortForwardFromController_DataAction).DataAction.Data + return &message.(*rpc.PortForwardData).Data }), Decode: grpc_net_conn.SimpleDecoder(func(message proto.Message) *[]byte { - return &message.(*rpc.PortForwardFromWorker).Action.(*rpc.PortForwardFromWorker_DataAction).DataAction.Data + return &message.(*rpc.PortForwardData).Data }), } - proxyCtx, err := controller.proxy.Respond(initAction.InitAction.Token, conn) + // make proxy aware of the connection + proxyCtx, err := controller.proxy.Respond(sessionMetadataValue[0], conn) if err != nil { return err } diff --git a/internal/controller/scheduler.go b/internal/controller/scheduler.go index 4c47c69..6c30231 100644 --- a/internal/controller/scheduler.go +++ b/internal/controller/scheduler.go @@ -51,7 +51,7 @@ func runSchedulerInner(store storepkg.Store) error { for _, vm := range vms { vm := vm - if vm.Worker != "" { + if vm.WorkerUID != "" { continue } @@ -59,7 +59,7 @@ func runSchedulerInner(store storepkg.Store) error { for _, worker := range workers { worker := worker - vm.Worker = worker.Name + vm.WorkerUID = worker.UID err := store.Update(func(txn storepkg.Transaction) error { return txn.SetVM(vm) diff --git a/internal/worker/rpc.go b/internal/worker/rpc.go index 9061b1b..dc180d3 100644 --- a/internal/worker/rpc.go +++ b/internal/worker/rpc.go @@ -7,6 +7,7 @@ import ( v1 "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/cirruslabs/orchard/rpc" "google.golang.org/grpc/keepalive" + "google.golang.org/protobuf/types/known/emptypb" "time" //nolint:staticcheck // https://github.com/mitchellh/go-grpc-net-conn/pull/1 @@ -30,30 +31,20 @@ func (worker *Worker) watchRPC(ctx context.Context) error { client := rpc.NewControllerClient(conn) - ctxWithMetadata := metadata.NewOutgoingContext(ctx, worker.client.GPRCMetadata()) + ctxWithMetadata := metadata.NewOutgoingContext(ctx, worker.GPRCMetadata()) - stream, err := client.Watch(ctxWithMetadata) + stream, err := client.Watch(ctxWithMetadata, &emptypb.Empty{}) if err != nil { return err } - if err := stream.Send(&rpc.WatchFromWorker{ - Action: &rpc.WatchFromWorker_InitAction{ - InitAction: &rpc.WatchFromWorker_Init{ - WorkerUid: worker.name, - }, - }, - }); err != nil { - return err - } - for { watchFromController, err := stream.Recv() if err != nil { return err } - portForwardAction, ok := watchFromController.Action.(*rpc.WatchFromController_PortForwardAction) + portForwardAction, ok := watchFromController.Action.(*rpc.WatchInstruction_PortForwardAction) if !ok { continue } @@ -65,28 +56,23 @@ func (worker *Worker) watchRPC(ctx context.Context) error { func (worker *Worker) handlePortForward( ctx context.Context, client rpc.ControllerClient, - portForwardAction *rpc.WatchFromController_PortForward, + portForwardAction *rpc.WatchInstruction_PortForward, ) { subCtx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := client.PortForward(subCtx) + grpcMetadata := metadata.Join( + worker.GPRCMetadata(), + metadata.Pairs(rpc.MetadataWorkerPortForwardingSessionKey, portForwardAction.Session), + ) + ctxWithMetadata := metadata.NewOutgoingContext(subCtx, grpcMetadata) + stream, err := client.PortForward(ctxWithMetadata) if err != nil { worker.logger.Warnf("port forwarding failed: failed to call PortForward() RPC method: %v", err) return } - if err := stream.Send(&rpc.PortForwardFromWorker{ - Action: &rpc.PortForwardFromWorker_InitAction{ - InitAction: &rpc.PortForwardFromWorker_Init{ - Token: portForwardAction.Token, - }, - }, - }); err != nil { - return - } - // Obtain VM vm, err := worker.vmm.Get(v1.VM{ Meta: v1.Meta{ @@ -117,22 +103,14 @@ func (worker *Worker) handlePortForward( // Proxy bytes grpcConn := &grpc_net_conn.Conn{ - Stream: stream, - Request: &rpc.PortForwardFromWorker{ - Action: &rpc.PortForwardFromWorker_DataAction{ - DataAction: &rpc.PortForwardFromWorker_Data{}, - }, - }, - Response: &rpc.PortForwardFromController{ - Action: &rpc.PortForwardFromController_DataAction{ - DataAction: &rpc.PortForwardFromController_Data{}, - }, - }, + Stream: stream, + Request: &rpc.PortForwardData{}, + Response: &rpc.PortForwardData{}, Encode: grpc_net_conn.SimpleEncoder(func(message proto.Message) *[]byte { - return &message.(*rpc.PortForwardFromWorker).Action.(*rpc.PortForwardFromWorker_DataAction).DataAction.Data + return &message.(*rpc.PortForwardData).Data }), Decode: grpc_net_conn.SimpleDecoder(func(message proto.Message) *[]byte { - return &message.(*rpc.PortForwardFromController).Action.(*rpc.PortForwardFromController_DataAction).DataAction.Data + return &message.(*rpc.PortForwardData).Data }), } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 8ba5506..1436486 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -8,8 +8,10 @@ import ( "github.com/cirruslabs/orchard/internal/worker/vmmanager" "github.com/cirruslabs/orchard/pkg/client" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/cirruslabs/orchard/rpc" "github.com/hashicorp/go-multierror" "go.uber.org/zap" + "google.golang.org/grpc/metadata" "os" "time" ) @@ -165,7 +167,7 @@ func (worker *Worker) updateWorker(ctx context.Context) error { } func (worker *Worker) syncVMs(ctx context.Context) error { - remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name) + remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.uid) if err != nil { return err } @@ -345,3 +347,11 @@ func (worker *Worker) DeleteAllVMs() error { } return result } + +func (worker *Worker) GPRCMetadata() metadata.MD { + return metadata.Join( + worker.client.GPRCMetadata(), + metadata.Pairs(rpc.MetadataWorkerNameKey, worker.name), + metadata.Pairs(rpc.MetadataWorkerUIDKey, worker.uid), + ) +} diff --git a/pkg/client/client.go b/pkg/client/client.go index 0c4e125..18457d3 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "github.com/cirruslabs/orchard/internal/config" + "github.com/cirruslabs/orchard/rpc" "golang.org/x/net/websocket" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -18,11 +19,6 @@ import ( "net/url" ) -const MetadataServiceAccountNameKey = "x-orchard-service-account-name" - -//nolint:gosec // G101 check yields a false-positive here, this is not a hard-coded credential -const MetadataServiceAccountTokenKey = "x-orchard-service-account-token" - var ( ErrFailed = errors.New("API client failed") ErrInvalidState = errors.New("invalid state") @@ -121,8 +117,8 @@ func (client *Client) GPRCMetadata() metadata.MD { if client.serviceAccountName != "" && client.serviceAccountToken != "" { result = map[string]string{ - MetadataServiceAccountNameKey: client.serviceAccountName, - MetadataServiceAccountTokenKey: client.serviceAccountToken, + rpc.MetadataServiceAccountNameKey: client.serviceAccountName, + rpc.MetadataServiceAccountTokenKey: client.serviceAccountToken, } } diff --git a/pkg/client/vms.go b/pkg/client/vms.go index acfbd7e..ae837a9 100644 --- a/pkg/client/vms.go +++ b/pkg/client/vms.go @@ -23,7 +23,7 @@ func (service *VMsService) Create(ctx context.Context, vm *v1.VM) error { return nil } -func (service *VMsService) FindForWorker(ctx context.Context, workerName string) (map[string]v1.VM, error) { +func (service *VMsService) FindForWorker(ctx context.Context, workerUID string) (map[string]v1.VM, error) { allVms, err := service.List(ctx) if err != nil { @@ -32,7 +32,7 @@ func (service *VMsService) FindForWorker(ctx context.Context, workerName string) var filteredVms = make(map[string]v1.VM) for _, vmResource := range allVms { - if vmResource.Worker != workerName { + if vmResource.WorkerUID != workerUID { continue } filteredVms[vmResource.UID] = vmResource diff --git a/pkg/resource/v1/v1.go b/pkg/resource/v1/v1.go index 7e3c968..c54275f 100644 --- a/pkg/resource/v1/v1.go +++ b/pkg/resource/v1/v1.go @@ -47,8 +47,8 @@ type VM struct { Status VMStatus `json:"status"` StatusMessage string `json:"status_message"` - // Worker field is set by the Controller to assign this VM to a specific Worker. - Worker string `json:"worker"` + // WorkerUID field is set by the Controller to assign this VM to a specific WorkerUID. + WorkerUID string `json:"worker"` Username string `json:"username"` Password string `json:"password"` diff --git a/rpc/constants.go b/rpc/constants.go new file mode 100644 index 0000000..ceea681 --- /dev/null +++ b/rpc/constants.go @@ -0,0 +1,12 @@ +package rpc + +const MetadataServiceAccountNameKey = "x-orchard-service-account-name" + +//nolint:gosec // G101 check yields a false-positive here, this is not a hard-coded credential +const MetadataServiceAccountTokenKey = "x-orchard-service-account-token" + +const MetadataWorkerNameKey = "x-orchard-worker-name" + +const MetadataWorkerUIDKey = "x-orchard-worker-uid" + +const MetadataWorkerPortForwardingSessionKey = "x-orchard-port-forwarding-session" diff --git a/rpc/orchard.pb.go b/rpc/orchard.pb.go index 5d482b2..75dc158 100644 --- a/rpc/orchard.pb.go +++ b/rpc/orchard.pb.go @@ -9,6 +9,7 @@ package rpc import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" reflect "reflect" sync "sync" ) @@ -20,19 +21,19 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type WatchFromWorker struct { +type WatchInstruction struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields // Types that are assignable to Action: // - // *WatchFromWorker_InitAction - Action isWatchFromWorker_Action `protobuf_oneof:"action"` + // *WatchInstruction_PortForwardAction + Action isWatchInstruction_Action `protobuf_oneof:"action"` } -func (x *WatchFromWorker) Reset() { - *x = WatchFromWorker{} +func (x *WatchInstruction) Reset() { + *x = WatchInstruction{} if protoimpl.UnsafeEnabled { mi := &file_orchard_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -40,13 +41,13 @@ func (x *WatchFromWorker) Reset() { } } -func (x *WatchFromWorker) String() string { +func (x *WatchInstruction) String() string { return protoimpl.X.MessageStringOf(x) } -func (*WatchFromWorker) ProtoMessage() {} +func (*WatchInstruction) ProtoMessage() {} -func (x *WatchFromWorker) ProtoReflect() protoreflect.Message { +func (x *WatchInstruction) ProtoReflect() protoreflect.Message { mi := &file_orchard_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -58,48 +59,45 @@ func (x *WatchFromWorker) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use WatchFromWorker.ProtoReflect.Descriptor instead. -func (*WatchFromWorker) Descriptor() ([]byte, []int) { +// Deprecated: Use WatchInstruction.ProtoReflect.Descriptor instead. +func (*WatchInstruction) Descriptor() ([]byte, []int) { return file_orchard_proto_rawDescGZIP(), []int{0} } -func (m *WatchFromWorker) GetAction() isWatchFromWorker_Action { +func (m *WatchInstruction) GetAction() isWatchInstruction_Action { if m != nil { return m.Action } return nil } -func (x *WatchFromWorker) GetInitAction() *WatchFromWorker_Init { - if x, ok := x.GetAction().(*WatchFromWorker_InitAction); ok { - return x.InitAction +func (x *WatchInstruction) GetPortForwardAction() *WatchInstruction_PortForward { + if x, ok := x.GetAction().(*WatchInstruction_PortForwardAction); ok { + return x.PortForwardAction } return nil } -type isWatchFromWorker_Action interface { - isWatchFromWorker_Action() +type isWatchInstruction_Action interface { + isWatchInstruction_Action() } -type WatchFromWorker_InitAction struct { - InitAction *WatchFromWorker_Init `protobuf:"bytes,1,opt,name=init_action,json=initAction,proto3,oneof"` +type WatchInstruction_PortForwardAction struct { + PortForwardAction *WatchInstruction_PortForward `protobuf:"bytes,1,opt,name=port_forward_action,json=portForwardAction,proto3,oneof"` } -func (*WatchFromWorker_InitAction) isWatchFromWorker_Action() {} +func (*WatchInstruction_PortForwardAction) isWatchInstruction_Action() {} -type WatchFromController struct { +type PortForwardData struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Types that are assignable to Action: - // - // *WatchFromController_PortForwardAction - Action isWatchFromController_Action `protobuf_oneof:"action"` + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` } -func (x *WatchFromController) Reset() { - *x = WatchFromController{} +func (x *PortForwardData) Reset() { + *x = PortForwardData{} if protoimpl.UnsafeEnabled { mi := &file_orchard_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -107,13 +105,13 @@ func (x *WatchFromController) Reset() { } } -func (x *WatchFromController) String() string { +func (x *PortForwardData) String() string { return protoimpl.X.MessageStringOf(x) } -func (*WatchFromController) ProtoMessage() {} +func (*PortForwardData) ProtoMessage() {} -func (x *WatchFromController) ProtoReflect() protoreflect.Message { +func (x *PortForwardData) ProtoReflect() protoreflect.Message { mi := &file_orchard_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -125,49 +123,32 @@ func (x *WatchFromController) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use WatchFromController.ProtoReflect.Descriptor instead. -func (*WatchFromController) Descriptor() ([]byte, []int) { +// Deprecated: Use PortForwardData.ProtoReflect.Descriptor instead. +func (*PortForwardData) Descriptor() ([]byte, []int) { return file_orchard_proto_rawDescGZIP(), []int{1} } -func (m *WatchFromController) GetAction() isWatchFromController_Action { - if m != nil { - return m.Action +func (x *PortForwardData) GetData() []byte { + if x != nil { + return x.Data } return nil } -func (x *WatchFromController) GetPortForwardAction() *WatchFromController_PortForward { - if x, ok := x.GetAction().(*WatchFromController_PortForwardAction); ok { - return x.PortForwardAction - } - return nil -} - -type isWatchFromController_Action interface { - isWatchFromController_Action() -} - -type WatchFromController_PortForwardAction struct { - PortForwardAction *WatchFromController_PortForward `protobuf:"bytes,1,opt,name=port_forward_action,json=portForwardAction,proto3,oneof"` -} - -func (*WatchFromController_PortForwardAction) isWatchFromController_Action() {} - -type PortForwardFromWorker struct { +type WatchInstruction_PortForward struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Types that are assignable to Action: - // - // *PortForwardFromWorker_InitAction - // *PortForwardFromWorker_DataAction - Action isPortForwardFromWorker_Action `protobuf_oneof:"action"` + // we can have multiple port forwards for the same vm/port pair + // let's distinguish them by a unique session + Session string `protobuf:"bytes,1,opt,name=session,proto3" json:"session,omitempty"` + VmUid string `protobuf:"bytes,2,opt,name=vm_uid,json=vmUid,proto3" json:"vm_uid,omitempty"` + VmPort uint32 `protobuf:"varint,3,opt,name=vm_port,json=vmPort,proto3" json:"vm_port,omitempty"` } -func (x *PortForwardFromWorker) Reset() { - *x = PortForwardFromWorker{} +func (x *WatchInstruction_PortForward) Reset() { + *x = WatchInstruction_PortForward{} if protoimpl.UnsafeEnabled { mi := &file_orchard_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -175,13 +156,13 @@ func (x *PortForwardFromWorker) Reset() { } } -func (x *PortForwardFromWorker) String() string { +func (x *WatchInstruction_PortForward) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PortForwardFromWorker) ProtoMessage() {} +func (*WatchInstruction_PortForward) ProtoMessage() {} -func (x *PortForwardFromWorker) ProtoReflect() protoreflect.Message { +func (x *WatchInstruction_PortForward) ProtoReflect() protoreflect.Message { mi := &file_orchard_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -193,425 +174,64 @@ func (x *PortForwardFromWorker) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PortForwardFromWorker.ProtoReflect.Descriptor instead. -func (*PortForwardFromWorker) Descriptor() ([]byte, []int) { - return file_orchard_proto_rawDescGZIP(), []int{2} -} - -func (m *PortForwardFromWorker) GetAction() isPortForwardFromWorker_Action { - if m != nil { - return m.Action - } - return nil -} - -func (x *PortForwardFromWorker) GetInitAction() *PortForwardFromWorker_Init { - if x, ok := x.GetAction().(*PortForwardFromWorker_InitAction); ok { - return x.InitAction - } - return nil -} - -func (x *PortForwardFromWorker) GetDataAction() *PortForwardFromWorker_Data { - if x, ok := x.GetAction().(*PortForwardFromWorker_DataAction); ok { - return x.DataAction - } - return nil -} - -type isPortForwardFromWorker_Action interface { - isPortForwardFromWorker_Action() -} - -type PortForwardFromWorker_InitAction struct { - InitAction *PortForwardFromWorker_Init `protobuf:"bytes,1,opt,name=init_action,json=initAction,proto3,oneof"` -} - -type PortForwardFromWorker_DataAction struct { - DataAction *PortForwardFromWorker_Data `protobuf:"bytes,2,opt,name=data_action,json=dataAction,proto3,oneof"` -} - -func (*PortForwardFromWorker_InitAction) isPortForwardFromWorker_Action() {} - -func (*PortForwardFromWorker_DataAction) isPortForwardFromWorker_Action() {} - -type PortForwardFromController struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Types that are assignable to Action: - // - // *PortForwardFromController_DataAction - Action isPortForwardFromController_Action `protobuf_oneof:"action"` -} - -func (x *PortForwardFromController) Reset() { - *x = PortForwardFromController{} - if protoimpl.UnsafeEnabled { - mi := &file_orchard_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PortForwardFromController) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PortForwardFromController) ProtoMessage() {} - -func (x *PortForwardFromController) ProtoReflect() protoreflect.Message { - mi := &file_orchard_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PortForwardFromController.ProtoReflect.Descriptor instead. -func (*PortForwardFromController) Descriptor() ([]byte, []int) { - return file_orchard_proto_rawDescGZIP(), []int{3} -} - -func (m *PortForwardFromController) GetAction() isPortForwardFromController_Action { - if m != nil { - return m.Action - } - return nil -} - -func (x *PortForwardFromController) GetDataAction() *PortForwardFromController_Data { - if x, ok := x.GetAction().(*PortForwardFromController_DataAction); ok { - return x.DataAction - } - return nil -} - -type isPortForwardFromController_Action interface { - isPortForwardFromController_Action() -} - -type PortForwardFromController_DataAction struct { - DataAction *PortForwardFromController_Data `protobuf:"bytes,1,opt,name=data_action,json=dataAction,proto3,oneof"` -} - -func (*PortForwardFromController_DataAction) isPortForwardFromController_Action() {} - -type WatchFromWorker_Init struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - WorkerUid string `protobuf:"bytes,1,opt,name=worker_uid,json=workerUid,proto3" json:"worker_uid,omitempty"` -} - -func (x *WatchFromWorker_Init) Reset() { - *x = WatchFromWorker_Init{} - if protoimpl.UnsafeEnabled { - mi := &file_orchard_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *WatchFromWorker_Init) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*WatchFromWorker_Init) ProtoMessage() {} - -func (x *WatchFromWorker_Init) ProtoReflect() protoreflect.Message { - mi := &file_orchard_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use WatchFromWorker_Init.ProtoReflect.Descriptor instead. -func (*WatchFromWorker_Init) Descriptor() ([]byte, []int) { +// Deprecated: Use WatchInstruction_PortForward.ProtoReflect.Descriptor instead. +func (*WatchInstruction_PortForward) Descriptor() ([]byte, []int) { return file_orchard_proto_rawDescGZIP(), []int{0, 0} } -func (x *WatchFromWorker_Init) GetWorkerUid() string { +func (x *WatchInstruction_PortForward) GetSession() string { if x != nil { - return x.WorkerUid + return x.Session } return "" } -type WatchFromController_PortForward struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Token string `protobuf:"bytes,1,opt,name=token,proto3" json:"token,omitempty"` - VmUid string `protobuf:"bytes,2,opt,name=vm_uid,json=vmUid,proto3" json:"vm_uid,omitempty"` - VmPort uint32 `protobuf:"varint,3,opt,name=vm_port,json=vmPort,proto3" json:"vm_port,omitempty"` -} - -func (x *WatchFromController_PortForward) Reset() { - *x = WatchFromController_PortForward{} - if protoimpl.UnsafeEnabled { - mi := &file_orchard_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *WatchFromController_PortForward) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*WatchFromController_PortForward) ProtoMessage() {} - -func (x *WatchFromController_PortForward) ProtoReflect() protoreflect.Message { - mi := &file_orchard_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use WatchFromController_PortForward.ProtoReflect.Descriptor instead. -func (*WatchFromController_PortForward) Descriptor() ([]byte, []int) { - return file_orchard_proto_rawDescGZIP(), []int{1, 0} -} - -func (x *WatchFromController_PortForward) GetToken() string { - if x != nil { - return x.Token - } - return "" -} - -func (x *WatchFromController_PortForward) GetVmUid() string { +func (x *WatchInstruction_PortForward) GetVmUid() string { if x != nil { return x.VmUid } return "" } -func (x *WatchFromController_PortForward) GetVmPort() uint32 { +func (x *WatchInstruction_PortForward) GetVmPort() uint32 { if x != nil { return x.VmPort } return 0 } -type PortForwardFromWorker_Init struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Token string `protobuf:"bytes,1,opt,name=token,proto3" json:"token,omitempty"` -} - -func (x *PortForwardFromWorker_Init) Reset() { - *x = PortForwardFromWorker_Init{} - if protoimpl.UnsafeEnabled { - mi := &file_orchard_proto_msgTypes[6] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PortForwardFromWorker_Init) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PortForwardFromWorker_Init) ProtoMessage() {} - -func (x *PortForwardFromWorker_Init) ProtoReflect() protoreflect.Message { - mi := &file_orchard_proto_msgTypes[6] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PortForwardFromWorker_Init.ProtoReflect.Descriptor instead. -func (*PortForwardFromWorker_Init) Descriptor() ([]byte, []int) { - return file_orchard_proto_rawDescGZIP(), []int{2, 0} -} - -func (x *PortForwardFromWorker_Init) GetToken() string { - if x != nil { - return x.Token - } - return "" -} - -type PortForwardFromWorker_Data struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` -} - -func (x *PortForwardFromWorker_Data) Reset() { - *x = PortForwardFromWorker_Data{} - if protoimpl.UnsafeEnabled { - mi := &file_orchard_proto_msgTypes[7] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PortForwardFromWorker_Data) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PortForwardFromWorker_Data) ProtoMessage() {} - -func (x *PortForwardFromWorker_Data) ProtoReflect() protoreflect.Message { - mi := &file_orchard_proto_msgTypes[7] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PortForwardFromWorker_Data.ProtoReflect.Descriptor instead. -func (*PortForwardFromWorker_Data) Descriptor() ([]byte, []int) { - return file_orchard_proto_rawDescGZIP(), []int{2, 1} -} - -func (x *PortForwardFromWorker_Data) GetData() []byte { - if x != nil { - return x.Data - } - return nil -} - -type PortForwardFromController_Data struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` -} - -func (x *PortForwardFromController_Data) Reset() { - *x = PortForwardFromController_Data{} - if protoimpl.UnsafeEnabled { - mi := &file_orchard_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PortForwardFromController_Data) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PortForwardFromController_Data) ProtoMessage() {} - -func (x *PortForwardFromController_Data) ProtoReflect() protoreflect.Message { - mi := &file_orchard_proto_msgTypes[8] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PortForwardFromController_Data.ProtoReflect.Descriptor instead. -func (*PortForwardFromController_Data) Descriptor() ([]byte, []int) { - return file_orchard_proto_rawDescGZIP(), []int{3, 0} -} - -func (x *PortForwardFromController_Data) GetData() []byte { - if x != nil { - return x.Data - } - return nil -} - var File_orchard_proto protoreflect.FileDescriptor var file_orchard_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x6f, 0x72, 0x63, 0x68, 0x61, 0x72, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, - 0x7c, 0x0a, 0x0f, 0x57, 0x61, 0x74, 0x63, 0x68, 0x46, 0x72, 0x6f, 0x6d, 0x57, 0x6f, 0x72, 0x6b, - 0x65, 0x72, 0x12, 0x38, 0x0a, 0x0b, 0x69, 0x6e, 0x69, 0x74, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x46, - 0x72, 0x6f, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x48, 0x00, - 0x52, 0x0a, 0x69, 0x6e, 0x69, 0x74, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x25, 0x0a, 0x04, - 0x49, 0x6e, 0x69, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x75, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, - 0x55, 0x69, 0x64, 0x42, 0x08, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0xc8, 0x01, - 0x0a, 0x13, 0x57, 0x61, 0x74, 0x63, 0x68, 0x46, 0x72, 0x6f, 0x6d, 0x43, 0x6f, 0x6e, 0x74, 0x72, - 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x52, 0x0a, 0x13, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x66, 0x6f, - 0x72, 0x77, 0x61, 0x72, 0x64, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x46, 0x72, 0x6f, 0x6d, 0x43, 0x6f, - 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, - 0x77, 0x61, 0x72, 0x64, 0x48, 0x00, 0x52, 0x11, 0x70, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, - 0x61, 0x72, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x53, 0x0a, 0x0b, 0x50, 0x6f, 0x72, - 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, - 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x15, - 0x0a, 0x06, 0x76, 0x6d, 0x5f, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x76, 0x6d, 0x55, 0x69, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x76, 0x6d, 0x5f, 0x70, 0x6f, 0x72, 0x74, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x76, 0x6d, 0x50, 0x6f, 0x72, 0x74, 0x42, 0x08, - 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0xdb, 0x01, 0x0a, 0x15, 0x50, 0x6f, 0x72, - 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x57, 0x6f, 0x72, 0x6b, - 0x65, 0x72, 0x12, 0x3e, 0x0a, 0x0b, 0x69, 0x6e, 0x69, 0x74, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, - 0x72, 0x77, 0x61, 0x72, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x2e, - 0x49, 0x6e, 0x69, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x69, 0x6e, 0x69, 0x74, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x3e, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, - 0x72, 0x77, 0x61, 0x72, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x2e, - 0x44, 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x41, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x1a, 0x1c, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, - 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, - 0x1a, 0x1a, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x08, 0x0a, 0x06, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x85, 0x01, 0x0a, 0x19, 0x50, 0x6f, 0x72, 0x74, 0x46, - 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, - 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x42, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x61, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x50, 0x6f, 0x72, 0x74, - 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x43, 0x6f, 0x6e, 0x74, 0x72, - 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x0a, 0x64, 0x61, - 0x74, 0x61, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x1a, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, - 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, - 0x64, 0x61, 0x74, 0x61, 0x42, 0x08, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x32, 0x88, - 0x01, 0x0a, 0x0a, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x33, 0x0a, - 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x10, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x46, 0x72, - 0x6f, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x1a, 0x14, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, - 0x46, 0x72, 0x6f, 0x6d, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x28, 0x01, - 0x30, 0x01, 0x12, 0x45, 0x0a, 0x0b, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, - 0x64, 0x12, 0x16, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x46, - 0x72, 0x6f, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x1a, 0x1a, 0x2e, 0x50, 0x6f, 0x72, 0x74, - 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x43, 0x6f, 0x6e, 0x74, 0x72, - 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x28, 0x01, 0x30, 0x01, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x69, 0x72, 0x72, 0x75, 0x73, 0x6c, 0x61, - 0x62, 0x73, 0x2f, 0x6f, 0x72, 0x63, 0x68, 0x61, 0x72, 0x64, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x0a, 0x0d, 0x6f, 0x72, 0x63, 0x68, 0x61, 0x72, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, + 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc6, 0x01, 0x0a, + 0x10, 0x57, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x4f, 0x0a, 0x13, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, + 0x64, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, + 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x48, 0x00, 0x52, + 0x11, 0x70, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x41, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x1a, 0x57, 0x0a, 0x0b, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, + 0x64, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x15, 0x0a, 0x06, 0x76, + 0x6d, 0x5f, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x6d, 0x55, + 0x69, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x76, 0x6d, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x06, 0x76, 0x6d, 0x50, 0x6f, 0x72, 0x74, 0x42, 0x08, 0x0a, 0x06, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x25, 0x0a, 0x0f, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, + 0x77, 0x61, 0x72, 0x64, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x79, 0x0a, 0x0a, + 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x34, 0x0a, 0x05, 0x57, 0x61, + 0x74, 0x63, 0x68, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x11, 0x2e, 0x57, 0x61, + 0x74, 0x63, 0x68, 0x49, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x30, 0x01, + 0x12, 0x35, 0x0a, 0x0b, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x12, + 0x10, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x44, 0x61, 0x74, + 0x61, 0x1a, 0x10, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x44, + 0x61, 0x74, 0x61, 0x28, 0x01, 0x30, 0x01, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x69, 0x72, 0x72, 0x75, 0x73, 0x6c, 0x61, 0x62, 0x73, + 0x2f, 0x6f, 0x72, 0x63, 0x68, 0x61, 0x72, 0x64, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -626,33 +246,24 @@ func file_orchard_proto_rawDescGZIP() []byte { return file_orchard_proto_rawDescData } -var file_orchard_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_orchard_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_orchard_proto_goTypes = []interface{}{ - (*WatchFromWorker)(nil), // 0: WatchFromWorker - (*WatchFromController)(nil), // 1: WatchFromController - (*PortForwardFromWorker)(nil), // 2: PortForwardFromWorker - (*PortForwardFromController)(nil), // 3: PortForwardFromController - (*WatchFromWorker_Init)(nil), // 4: WatchFromWorker.Init - (*WatchFromController_PortForward)(nil), // 5: WatchFromController.PortForward - (*PortForwardFromWorker_Init)(nil), // 6: PortForwardFromWorker.Init - (*PortForwardFromWorker_Data)(nil), // 7: PortForwardFromWorker.Data - (*PortForwardFromController_Data)(nil), // 8: PortForwardFromController.Data + (*WatchInstruction)(nil), // 0: WatchInstruction + (*PortForwardData)(nil), // 1: PortForwardData + (*WatchInstruction_PortForward)(nil), // 2: WatchInstruction.PortForward + (*emptypb.Empty)(nil), // 3: google.protobuf.Empty } var file_orchard_proto_depIdxs = []int32{ - 4, // 0: WatchFromWorker.init_action:type_name -> WatchFromWorker.Init - 5, // 1: WatchFromController.port_forward_action:type_name -> WatchFromController.PortForward - 6, // 2: PortForwardFromWorker.init_action:type_name -> PortForwardFromWorker.Init - 7, // 3: PortForwardFromWorker.data_action:type_name -> PortForwardFromWorker.Data - 8, // 4: PortForwardFromController.data_action:type_name -> PortForwardFromController.Data - 0, // 5: Controller.Watch:input_type -> WatchFromWorker - 2, // 6: Controller.PortForward:input_type -> PortForwardFromWorker - 1, // 7: Controller.Watch:output_type -> WatchFromController - 3, // 8: Controller.PortForward:output_type -> PortForwardFromController - 7, // [7:9] is the sub-list for method output_type - 5, // [5:7] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 2, // 0: WatchInstruction.port_forward_action:type_name -> WatchInstruction.PortForward + 3, // 1: Controller.Watch:input_type -> google.protobuf.Empty + 1, // 2: Controller.PortForward:input_type -> PortForwardData + 0, // 3: Controller.Watch:output_type -> WatchInstruction + 1, // 4: Controller.PortForward:output_type -> PortForwardData + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_orchard_proto_init() } @@ -662,7 +273,7 @@ func file_orchard_proto_init() { } if !protoimpl.UnsafeEnabled { file_orchard_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*WatchFromWorker); i { + switch v := v.(*WatchInstruction); i { case 0: return &v.state case 1: @@ -674,7 +285,7 @@ func file_orchard_proto_init() { } } file_orchard_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*WatchFromController); i { + switch v := v.(*PortForwardData); i { case 0: return &v.state case 1: @@ -686,79 +297,7 @@ func file_orchard_proto_init() { } } file_orchard_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PortForwardFromWorker); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_orchard_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PortForwardFromController); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_orchard_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*WatchFromWorker_Init); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_orchard_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*WatchFromController_PortForward); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_orchard_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PortForwardFromWorker_Init); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_orchard_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PortForwardFromWorker_Data); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_orchard_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PortForwardFromController_Data); i { + switch v := v.(*WatchInstruction_PortForward); i { case 0: return &v.state case 1: @@ -771,17 +310,7 @@ func file_orchard_proto_init() { } } file_orchard_proto_msgTypes[0].OneofWrappers = []interface{}{ - (*WatchFromWorker_InitAction)(nil), - } - file_orchard_proto_msgTypes[1].OneofWrappers = []interface{}{ - (*WatchFromController_PortForwardAction)(nil), - } - file_orchard_proto_msgTypes[2].OneofWrappers = []interface{}{ - (*PortForwardFromWorker_InitAction)(nil), - (*PortForwardFromWorker_DataAction)(nil), - } - file_orchard_proto_msgTypes[3].OneofWrappers = []interface{}{ - (*PortForwardFromController_DataAction)(nil), + (*WatchInstruction_PortForwardAction)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -789,7 +318,7 @@ func file_orchard_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_orchard_proto_rawDesc, NumEnums: 0, - NumMessages: 9, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, diff --git a/rpc/orchard.proto b/rpc/orchard.proto index c3b8f55..905801a 100644 --- a/rpc/orchard.proto +++ b/rpc/orchard.proto @@ -1,25 +1,23 @@ syntax = "proto3"; +import "google/protobuf/empty.proto"; + option go_package = "github.com/cirruslabs/orchard/rpc"; service Controller { - rpc Watch(stream WatchFromWorker) returns (stream WatchFromController); - rpc PortForward(stream PortForwardFromWorker) returns (stream PortForwardFromController); + // message bus between the controller and a worker + rpc Watch(google.protobuf.Empty) returns (stream WatchInstruction); + + // single purpose method when a port forward is requested and running + // session information is passed in the requests metadata + rpc PortForward(stream PortForwardData) returns (stream PortForwardData); } -message WatchFromWorker { - message Init { - string worker_uid = 1; - } - - oneof action { - Init init_action = 1; - } -} - -message WatchFromController { +message WatchInstruction { message PortForward { - string token = 1; + // we can have multiple port forwards for the same vm/port pair + // let's distinguish them by a unique session + string session = 1; string vm_uid = 2; uint32 vm_port = 3; } @@ -29,27 +27,6 @@ message WatchFromController { } } -message PortForwardFromWorker { - message Init { - string token = 1; - } - - message Data { - bytes data = 1; - } - - oneof action { - Init init_action = 1; - Data data_action = 2; - } -} - -message PortForwardFromController { - message Data { - bytes data = 1; - } - - oneof action { - Data data_action = 1; - } +message PortForwardData { + bytes data = 1; } diff --git a/rpc/orchard_grpc.pb.go b/rpc/orchard_grpc.pb.go index 82a7b9d..e661c8d 100644 --- a/rpc/orchard_grpc.pb.go +++ b/rpc/orchard_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 +// - protoc-gen-go-grpc v1.2.0 // - protoc (unknown) // source: orchard.proto @@ -11,6 +11,7 @@ import ( grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" ) // This is a compile-time assertion to ensure that this generated file @@ -18,16 +19,14 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 -const ( - Controller_Watch_FullMethodName = "/Controller/Watch" - Controller_PortForward_FullMethodName = "/Controller/PortForward" -) - // ControllerClient is the client API for Controller service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type ControllerClient interface { - Watch(ctx context.Context, opts ...grpc.CallOption) (Controller_WatchClient, error) + // message bus between the controller and a worker + Watch(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Controller_WatchClient, error) + // single purpose method when a port forward is requested and running + // session information is passed in the requests metadata PortForward(ctx context.Context, opts ...grpc.CallOption) (Controller_PortForwardClient, error) } @@ -39,18 +38,23 @@ func NewControllerClient(cc grpc.ClientConnInterface) ControllerClient { return &controllerClient{cc} } -func (c *controllerClient) Watch(ctx context.Context, opts ...grpc.CallOption) (Controller_WatchClient, error) { - stream, err := c.cc.NewStream(ctx, &Controller_ServiceDesc.Streams[0], Controller_Watch_FullMethodName, opts...) +func (c *controllerClient) Watch(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Controller_WatchClient, error) { + stream, err := c.cc.NewStream(ctx, &Controller_ServiceDesc.Streams[0], "/Controller/Watch", opts...) if err != nil { return nil, err } x := &controllerWatchClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } return x, nil } type Controller_WatchClient interface { - Send(*WatchFromWorker) error - Recv() (*WatchFromController, error) + Recv() (*WatchInstruction, error) grpc.ClientStream } @@ -58,12 +62,8 @@ type controllerWatchClient struct { grpc.ClientStream } -func (x *controllerWatchClient) Send(m *WatchFromWorker) error { - return x.ClientStream.SendMsg(m) -} - -func (x *controllerWatchClient) Recv() (*WatchFromController, error) { - m := new(WatchFromController) +func (x *controllerWatchClient) Recv() (*WatchInstruction, error) { + m := new(WatchInstruction) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } @@ -71,7 +71,7 @@ func (x *controllerWatchClient) Recv() (*WatchFromController, error) { } func (c *controllerClient) PortForward(ctx context.Context, opts ...grpc.CallOption) (Controller_PortForwardClient, error) { - stream, err := c.cc.NewStream(ctx, &Controller_ServiceDesc.Streams[1], Controller_PortForward_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &Controller_ServiceDesc.Streams[1], "/Controller/PortForward", opts...) if err != nil { return nil, err } @@ -80,8 +80,8 @@ func (c *controllerClient) PortForward(ctx context.Context, opts ...grpc.CallOpt } type Controller_PortForwardClient interface { - Send(*PortForwardFromWorker) error - Recv() (*PortForwardFromController, error) + Send(*PortForwardData) error + Recv() (*PortForwardData, error) grpc.ClientStream } @@ -89,12 +89,12 @@ type controllerPortForwardClient struct { grpc.ClientStream } -func (x *controllerPortForwardClient) Send(m *PortForwardFromWorker) error { +func (x *controllerPortForwardClient) Send(m *PortForwardData) error { return x.ClientStream.SendMsg(m) } -func (x *controllerPortForwardClient) Recv() (*PortForwardFromController, error) { - m := new(PortForwardFromController) +func (x *controllerPortForwardClient) Recv() (*PortForwardData, error) { + m := new(PortForwardData) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } @@ -105,7 +105,10 @@ func (x *controllerPortForwardClient) Recv() (*PortForwardFromController, error) // All implementations must embed UnimplementedControllerServer // for forward compatibility type ControllerServer interface { - Watch(Controller_WatchServer) error + // message bus between the controller and a worker + Watch(*emptypb.Empty, Controller_WatchServer) error + // single purpose method when a port forward is requested and running + // session information is passed in the requests metadata PortForward(Controller_PortForwardServer) error mustEmbedUnimplementedControllerServer() } @@ -114,7 +117,7 @@ type ControllerServer interface { type UnimplementedControllerServer struct { } -func (UnimplementedControllerServer) Watch(Controller_WatchServer) error { +func (UnimplementedControllerServer) Watch(*emptypb.Empty, Controller_WatchServer) error { return status.Errorf(codes.Unimplemented, "method Watch not implemented") } func (UnimplementedControllerServer) PortForward(Controller_PortForwardServer) error { @@ -134,12 +137,15 @@ func RegisterControllerServer(s grpc.ServiceRegistrar, srv ControllerServer) { } func _Controller_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(ControllerServer).Watch(&controllerWatchServer{stream}) + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ControllerServer).Watch(m, &controllerWatchServer{stream}) } type Controller_WatchServer interface { - Send(*WatchFromController) error - Recv() (*WatchFromWorker, error) + Send(*WatchInstruction) error grpc.ServerStream } @@ -147,25 +153,17 @@ type controllerWatchServer struct { grpc.ServerStream } -func (x *controllerWatchServer) Send(m *WatchFromController) error { +func (x *controllerWatchServer) Send(m *WatchInstruction) error { return x.ServerStream.SendMsg(m) } -func (x *controllerWatchServer) Recv() (*WatchFromWorker, error) { - m := new(WatchFromWorker) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - func _Controller_PortForward_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(ControllerServer).PortForward(&controllerPortForwardServer{stream}) } type Controller_PortForwardServer interface { - Send(*PortForwardFromController) error - Recv() (*PortForwardFromWorker, error) + Send(*PortForwardData) error + Recv() (*PortForwardData, error) grpc.ServerStream } @@ -173,12 +171,12 @@ type controllerPortForwardServer struct { grpc.ServerStream } -func (x *controllerPortForwardServer) Send(m *PortForwardFromController) error { +func (x *controllerPortForwardServer) Send(m *PortForwardData) error { return x.ServerStream.SendMsg(m) } -func (x *controllerPortForwardServer) Recv() (*PortForwardFromWorker, error) { - m := new(PortForwardFromWorker) +func (x *controllerPortForwardServer) Recv() (*PortForwardData, error) { + m := new(PortForwardData) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } @@ -197,7 +195,6 @@ var Controller_ServiceDesc = grpc.ServiceDesc{ StreamName: "Watch", Handler: _Controller_Watch_Handler, ServerStreams: true, - ClientStreams: true, }, { StreamName: "PortForward",