diff --git a/api/openapi.yaml b/api/openapi.yaml index f840550..7b8068e 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -321,6 +321,31 @@ paths: description: VM resource with the given name doesn't exist '503': description: Failed to establish connection with the worker responsible for the specified VM + /vms/{name}/ip: + get: + summary: "Resolve the VM's IP address on the worker" + tags: + - vms + parameters: + - in: query + name: wait + description: Duration in seconds to wait for the VM to transition into "running" state if not already running. + schema: + type: integer + minimum: 0 + maximum: 65535 + required: false + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/IP' + '404': + description: VM resource with the given name doesn't exist + '503': + description: Failed to resolve the IP address on the worker responsible for the specified VM components: schemas: Worker: @@ -369,6 +394,13 @@ components: type: object items: $ref: '#/components/schemas/Event' + IP: + title: Result of VM's IP resolution + type: object + properties: + ip: + type: string + description: The resolved IP address Event: title: Generic Resource Event type: object diff --git a/internal/controller/api.go b/internal/controller/api.go index 4561f07..449f2d5 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -121,6 +121,9 @@ func (controller *Controller) initAPI() *gin.Engine { v1.GET("/vms/:name/port-forward", func(c *gin.Context) { controller.portForwardVM(c).Respond(c) }) + v1.GET("/vms/:name/ip", func(c *gin.Context) { + controller.ip(c).Respond(c) + }) v1.DELETE("/vms/:name", func(c *gin.Context) { controller.deleteVM(c).Respond(c) }) diff --git a/internal/controller/api_vms_ip.go b/internal/controller/api_vms_ip.go new file mode 100644 index 0000000..8fbb7bf --- /dev/null +++ b/internal/controller/api_vms_ip.go @@ -0,0 +1,69 @@ +package controller + +import ( + "context" + "github.com/cirruslabs/orchard/internal/responder" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/cirruslabs/orchard/rpc" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "net/http" + "strconv" + "time" +) + +func (controller *Controller) ip(ctx *gin.Context) responder.Responder { + if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil { + return responder + } + + // Retrieve and parse path and query parameters + name := ctx.Param("name") + + waitRaw := ctx.Query("wait") + wait, err := strconv.ParseUint(waitRaw, 10, 16) + if err != nil { + return responder.Code(http.StatusBadRequest) + } + waitContext, waitContextCancel := context.WithTimeout(ctx, time.Duration(wait)*time.Second) + defer waitContextCancel() + + // Look-up the VM + vm, responderImpl := controller.waitForVM(waitContext, name) + if responderImpl != nil { + return responderImpl + } + + // Send an IP resolution request and wait for the result + session := uuid.New().String() + boomerangConnCh, cancel := controller.ipRendezvous.Request(ctx, session) + defer cancel() + + err = controller.workerNotifier.Notify(ctx, vm.Worker, &rpc.WatchInstruction{ + Action: &rpc.WatchInstruction_ResolveIpAction{ + ResolveIpAction: &rpc.WatchInstruction_ResolveIP{ + Session: session, + VmUid: vm.UID, + }, + }, + }) + if err != nil { + controller.logger.Warnf("failed to request VM's IP from the worker %s: %v", + vm.Worker, err) + + return responder.Code(http.StatusServiceUnavailable) + } + + select { + case ip := <-boomerangConnCh: + result := struct { + IP string `json:"ip"` + }{ + IP: ip, + } + + return responder.JSON(http.StatusOK, &result) + case <-ctx.Done(): + return responder.Error(ctx.Err()) + } +} diff --git a/internal/controller/api_vms_portforward.go b/internal/controller/api_vms_portforward.go index 98a56b9..14228d1 100644 --- a/internal/controller/api_vms_portforward.go +++ b/internal/controller/api_vms_portforward.go @@ -59,7 +59,7 @@ func (controller *Controller) portForward( ) responder.Responder { // Request and wait for a connection with a worker session := uuid.New().String() - boomerangConnCh, cancel := controller.proxy.Request(ctx, session) + boomerangConnCh, cancel := controller.connRendezvous.Request(ctx, session) defer cancel() // send request to worker to initiate port-forwarding connection back to us diff --git a/internal/controller/controller.go b/internal/controller/controller.go index aecec34..d4d583f 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -6,7 +6,7 @@ import ( "errors" "fmt" "github.com/cirruslabs/orchard/internal/controller/notifier" - "github.com/cirruslabs/orchard/internal/controller/proxy" + "github.com/cirruslabs/orchard/internal/controller/rendezvous" "github.com/cirruslabs/orchard/internal/controller/scheduler" "github.com/cirruslabs/orchard/internal/controller/sshserver" storepkg "github.com/cirruslabs/orchard/internal/controller/store" @@ -54,7 +54,8 @@ type Controller struct { logger *zap.SugaredLogger grpcServer *grpc.Server workerNotifier *notifier.Notifier - proxy *proxy.Proxy + connRendezvous *rendezvous.Rendezvous[net.Conn] + ipRendezvous *rendezvous.Rendezvous[string] enableSwaggerDocs bool workerOfflineTimeout time.Duration maxWorkersPerLicense uint @@ -69,7 +70,8 @@ type Controller struct { func New(opts ...Option) (*Controller, error) { controller := &Controller{ - proxy: proxy.NewProxy(), + connRendezvous: rendezvous.New[net.Conn](), + ipRendezvous: rendezvous.New[string](), workerOfflineTimeout: 3 * time.Minute, maxWorkersPerLicense: maxWorkersPerDefaultLicense, } @@ -125,7 +127,7 @@ func New(opts ...Option) (*Controller, error) { // Instantiate the SSH server (if configured) if controller.sshListenAddr != "" && controller.sshSigner != nil { controller.sshServer, err = sshserver.NewSSHServer(controller.sshListenAddr, controller.sshSigner, - store, controller.proxy, controller.workerNotifier, controller.sshNoClientAuth, controller.logger) + store, controller.connRendezvous, controller.workerNotifier, controller.sshNoClientAuth, controller.logger) if err != nil { return nil, err } diff --git a/internal/controller/proxy/proxy.go b/internal/controller/proxy/proxy.go deleted file mode 100644 index 9fd0121..0000000 --- a/internal/controller/proxy/proxy.go +++ /dev/null @@ -1,49 +0,0 @@ -package proxy - -import ( - "context" - "errors" - "github.com/cirruslabs/orchard/internal/concurrentmap" - "net" -) - -var ErrInvalidToken = errors.New("invalid proxy token") - -type Proxy struct { - sessions *concurrentmap.ConcurrentMap[*TokenSlot] -} - -type TokenSlot struct { - ctx context.Context - ch chan net.Conn -} - -func NewProxy() *Proxy { - return &Proxy{ - sessions: concurrentmap.NewConcurrentMap[*TokenSlot](), - } -} - -func (proxy *Proxy) Request(ctx context.Context, session string) (chan net.Conn, func()) { - tokenSlot := &TokenSlot{ - ctx: ctx, - ch: make(chan net.Conn), - } - - proxy.sessions.Store(session, tokenSlot) - - return tokenSlot.ch, func() { - proxy.sessions.Delete(session) - } -} - -func (proxy *Proxy) Respond(session string, conn net.Conn) (context.Context, error) { - tokenSlot, ok := proxy.sessions.Load(session) - if !ok { - return nil, ErrInvalidToken - } - - tokenSlot.ch <- conn - - return tokenSlot.ctx, nil -} diff --git a/internal/controller/rendezvous/rendezvous.go b/internal/controller/rendezvous/rendezvous.go new file mode 100644 index 0000000..55e7695 --- /dev/null +++ b/internal/controller/rendezvous/rendezvous.go @@ -0,0 +1,48 @@ +package rendezvous + +import ( + "context" + "errors" + "github.com/cirruslabs/orchard/internal/concurrentmap" +) + +var ErrInvalidToken = errors.New("invalid rendezvous token") + +type Rendezvous[T any] struct { + sessions *concurrentmap.ConcurrentMap[*TokenSlot[T]] +} + +type TokenSlot[T any] struct { + ctx context.Context + ch chan T +} + +func New[T any]() *Rendezvous[T] { + return &Rendezvous[T]{ + sessions: concurrentmap.NewConcurrentMap[*TokenSlot[T]](), + } +} + +func (rendezvous *Rendezvous[T]) Request(ctx context.Context, session string) (chan T, func()) { + tokenSlot := &TokenSlot[T]{ + ctx: ctx, + ch: make(chan T), + } + + rendezvous.sessions.Store(session, tokenSlot) + + return tokenSlot.ch, func() { + rendezvous.sessions.Delete(session) + } +} + +func (rendezvous *Rendezvous[T]) Respond(session string, conn T) (context.Context, error) { + tokenSlot, ok := rendezvous.sessions.Load(session) + if !ok { + return nil, ErrInvalidToken + } + + tokenSlot.ch <- conn + + return tokenSlot.ctx, nil +} diff --git a/internal/controller/proxy/proxy_test.go b/internal/controller/rendezvous/rendezvous_test.go similarity index 80% rename from internal/controller/proxy/proxy_test.go rename to internal/controller/rendezvous/rendezvous_test.go index ff60a3f..3461360 100644 --- a/internal/controller/proxy/proxy_test.go +++ b/internal/controller/rendezvous/rendezvous_test.go @@ -1,8 +1,8 @@ -package proxy_test +package rendezvous_test import ( "context" - "github.com/cirruslabs/orchard/internal/controller/proxy" + "github.com/cirruslabs/orchard/internal/controller/rendezvous" "github.com/google/uuid" "github.com/stretchr/testify/require" "net" @@ -15,7 +15,7 @@ func TestProxy(t *testing.T) { expectedConn, _ := net.Pipe() - proxy := proxy.NewProxy() + proxy := rendezvous.New[net.Conn]() token := uuid.New().String() diff --git a/internal/controller/rpc.go b/internal/controller/rpc.go index 8092038..880b848 100644 --- a/internal/controller/rpc.go +++ b/internal/controller/rpc.go @@ -1,6 +1,7 @@ package controller import ( + "context" v1pkg "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/cirruslabs/orchard/rpc" "google.golang.org/grpc/metadata" @@ -61,8 +62,8 @@ func (controller *Controller) PortForward(stream rpc.Controller_PortForwardServe }), } - // make proxy aware of the connection - proxyCtx, err := controller.proxy.Respond(sessionMetadataValue[0], conn) + // make connection rendezvous aware of the connection + proxyCtx, err := controller.connRendezvous.Respond(sessionMetadataValue[0], conn) if err != nil { return err } @@ -74,3 +75,22 @@ func (controller *Controller) PortForward(stream rpc.Controller_PortForwardServe return stream.Context().Err() } } + +func (controller *Controller) ResolveIP(ctx context.Context, request *rpc.ResolveIPResult) (*emptypb.Empty, error) { + if !controller.authorizeGRPC(ctx, v1pkg.ServiceAccountRoleComputeWrite) { + return nil, status.Errorf(codes.Unauthenticated, "auth failed") + } + + sessionMetadataValue := metadata.ValueFromIncomingContext(ctx, rpc.MetadataWorkerPortForwardingSessionKey) + if len(sessionMetadataValue) == 0 { + return nil, status.Errorf(codes.InvalidArgument, "no session in metadata") + } + + // Respond with the resolved IP address + _, err := controller.ipRendezvous.Respond(sessionMetadataValue[0], request.Ip) + if err != nil { + return nil, err + } + + return &emptypb.Empty{}, nil +} diff --git a/internal/controller/sshserver/sshserver.go b/internal/controller/sshserver/sshserver.go index 7a8702e..2728cf4 100644 --- a/internal/controller/sshserver/sshserver.go +++ b/internal/controller/sshserver/sshserver.go @@ -6,7 +6,7 @@ import ( "errors" "fmt" "github.com/cirruslabs/orchard/internal/controller/notifier" - proxypkg "github.com/cirruslabs/orchard/internal/controller/proxy" + "github.com/cirruslabs/orchard/internal/controller/rendezvous" storepkg "github.com/cirruslabs/orchard/internal/controller/store" "github.com/cirruslabs/orchard/internal/proxy" "github.com/cirruslabs/orchard/pkg/resource/v1" @@ -31,7 +31,7 @@ type SSHServer struct { listener net.Listener serverConfig *ssh.ServerConfig store storepkg.Store - proxy *proxypkg.Proxy + connRendezvous *rendezvous.Rendezvous[net.Conn] workerNotifier *notifier.Notifier logger *zap.SugaredLogger } @@ -40,14 +40,14 @@ func NewSSHServer( address string, signer ssh.Signer, store storepkg.Store, - proxy *proxypkg.Proxy, + connRendezvous *rendezvous.Rendezvous[net.Conn], workerNotifier *notifier.Notifier, noClientAuth bool, logger *zap.SugaredLogger, ) (*SSHServer, error) { server := &SSHServer{ store: store, - proxy: proxy, + connRendezvous: connRendezvous, workerNotifier: workerNotifier, logger: logger, } @@ -232,7 +232,7 @@ func (server *SSHServer) handleDirectTCPIP(ctx context.Context, newChannel ssh.N // The user wants to connect to an existing VM, request and wait // for a connection with the worker before accepting the channel session := uuid.New().String() - boomerangConnCh, cancel := server.proxy.Request(ctx, session) + boomerangConnCh, cancel := server.connRendezvous.Request(ctx, session) defer cancel() err = server.workerNotifier.Notify(ctx, vm.Worker, &rpc.WatchInstruction{ diff --git a/internal/tests/ip_endpoint_test.go b/internal/tests/ip_endpoint_test.go new file mode 100644 index 0000000..9d4f5d8 --- /dev/null +++ b/internal/tests/ip_endpoint_test.go @@ -0,0 +1,62 @@ +package tests_test + +import ( + "context" + "github.com/cirruslabs/orchard/internal/tests/devcontroller" + "github.com/cirruslabs/orchard/internal/tests/wait" + v1 "github.com/cirruslabs/orchard/pkg/resource/v1" + "github.com/stretchr/testify/require" + "golang.org/x/crypto/ssh" + "net" + "testing" + "time" +) + +func TestIPEndpoint(t *testing.T) { + // Run the Controller + devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t) + + // Create a VM to which we'll connect via Controller's SSH server + err := devClient.VMs().Create(context.Background(), &v1.VM{ + Meta: v1.Meta{ + Name: "test-vm", + }, + Image: "ghcr.io/cirruslabs/macos-sonoma-base:latest", + CPU: 4, + Memory: 8 * 1024, + Headless: true, + Status: v1.VMStatusPending, + }) + require.NoError(t, err) + + // Wait for the VM to start + require.True(t, wait.Wait(2*time.Minute, func() bool { + vm, err := devClient.VMs().Get(context.Background(), "test-vm") + require.NoError(t, err) + + return vm.Status == v1.VMStatusRunning + }), "failed to wait for the VM to start") + + // Retrieve the VM's IP + ip, err := devClient.VMs().IP(context.Background(), "test-vm", 30) + require.NoError(t, err) + + // Connect to the VM over SSH to make sure the provided IP is valid + sshClient, err := ssh.Dial("tcp", ip+":22", &ssh.ClientConfig{ + User: "admin", + Auth: []ssh.AuthMethod{ + ssh.Password("admin"), + }, + HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error { + return nil + }, + }) + require.NoError(t, err) + + sshSession, err := sshClient.NewSession() + require.NoError(t, err) + + output, err := sshSession.CombinedOutput("uname -a") + require.NoError(t, err) + require.Contains(t, string(output), "Darwin") +} diff --git a/internal/worker/rpc.go b/internal/worker/rpc.go index 7da5a86..d04750b 100644 --- a/internal/worker/rpc.go +++ b/internal/worker/rpc.go @@ -57,6 +57,8 @@ func (worker *Worker) watchRPC(ctx context.Context) error { go worker.handlePortForward(ctxWithMetadata, client, action.PortForwardAction) case *rpc.WatchInstruction_SyncVmsAction: worker.requestVMSyncing() + case *rpc.WatchInstruction_ResolveIpAction: + worker.handleGetIP(ctxWithMetadata, client, action.ResolveIpAction) } } } @@ -129,3 +131,46 @@ func (worker *Worker) handlePortForward( _ = proxy.Connections(vmConn, grpcConn) } + +func (worker *Worker) handleGetIP( + ctx context.Context, + client rpc.ControllerClient, + resolveIP *rpc.WatchInstruction_ResolveIP, +) { + grpcMetadata := metadata.Join( + worker.grpcMetadata(), + metadata.Pairs(rpc.MetadataWorkerPortForwardingSessionKey, resolveIP.Session), + ) + ctxWithMetadata := metadata.NewOutgoingContext(ctx, grpcMetadata) + + // Find the desired VM + vm, ok := lo.Find(worker.vmm.List(), func(item *vmmanager.VM) bool { + return item.Resource.UID == resolveIP.VmUid + }) + if !ok { + worker.logger.Warnf("failed to resolve IP for the VM with UID %q: VM not found", + resolveIP.VmUid) + + return + } + + // Obtain VM's IP address + ip, err := vm.IP(ctx) + if err != nil { + worker.logger.Warnf("failed to resolve IP for the VM with UID %q: \"tart ip\" failed: %v", + resolveIP.VmUid, err) + + return + } + + _, err = client.ResolveIP(ctxWithMetadata, &rpc.ResolveIPResult{ + Session: resolveIP.Session, + Ip: ip, + }) + if err != nil { + worker.logger.Warnf("failed to resolve IP for the VM with UID %q: "+ + "failed to call back to the controller: %v", resolveIP.VmUid, err) + + return + } +} diff --git a/internal/worker/vmmanager/vm.go b/internal/worker/vmmanager/vm.go index 602390a..b589ad2 100644 --- a/internal/worker/vmmanager/vm.go +++ b/internal/worker/vmmanager/vm.go @@ -216,7 +216,7 @@ func (vm *VM) cloneAndConfigure(ctx context.Context) error { // // See https://github.com/cirruslabs/orchard/issues/181 for more details. if vm.Resource.NetBridged != "" { - _, _, err = tart.Tart(ctx, vm.logger, "set", "--random-mac") + _, _, err = tart.Tart(ctx, vm.logger, "set", "--random-mac", vm.id()) if err != nil { return err } diff --git a/pkg/client/vms.go b/pkg/client/vms.go index 2dcbe9e..372ae4c 100644 --- a/pkg/client/vms.go +++ b/pkg/client/vms.go @@ -101,6 +101,22 @@ func (service *VMsService) PortForward( }) } +func (service *VMsService) IP(ctx context.Context, name string, waitSeconds uint16) (string, error) { + result := struct { + IP string `json:"ip"` + }{} + + err := service.client.request(ctx, http.MethodGet, fmt.Sprintf("vms/%s/ip", url.PathEscape(name)), + nil, &result, map[string]string{ + "wait": strconv.FormatUint(uint64(waitSeconds), 10), + }) + if err != nil { + return "", err + } + + return result.IP, nil +} + func (service *VMsService) StreamEvents(name string) *EventStreamer { return NewEventStreamer(service.client, fmt.Sprintf("vms/%s/events", url.PathEscape(name))) } diff --git a/buf.yaml b/rpc/buf.yaml similarity index 100% rename from buf.yaml rename to rpc/buf.yaml diff --git a/rpc/orchard.pb.go b/rpc/orchard.pb.go index 3f80cfe..6f7cbef 100644 --- a/rpc/orchard.pb.go +++ b/rpc/orchard.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 +// protoc-gen-go v1.34.2 // protoc (unknown) // source: orchard.proto @@ -30,6 +30,7 @@ type WatchInstruction struct { // // *WatchInstruction_PortForwardAction // *WatchInstruction_SyncVmsAction + // *WatchInstruction_ResolveIpAction Action isWatchInstruction_Action `protobuf_oneof:"action"` } @@ -86,6 +87,13 @@ func (x *WatchInstruction) GetSyncVmsAction() *WatchInstruction_SyncVMs { return nil } +func (x *WatchInstruction) GetResolveIpAction() *WatchInstruction_ResolveIP { + if x, ok := x.GetAction().(*WatchInstruction_ResolveIpAction); ok { + return x.ResolveIpAction + } + return nil +} + type isWatchInstruction_Action interface { isWatchInstruction_Action() } @@ -98,10 +106,16 @@ type WatchInstruction_SyncVmsAction struct { SyncVmsAction *WatchInstruction_SyncVMs `protobuf:"bytes,2,opt,name=sync_vms_action,json=syncVmsAction,proto3,oneof"` } +type WatchInstruction_ResolveIpAction struct { + ResolveIpAction *WatchInstruction_ResolveIP `protobuf:"bytes,3,opt,name=resolve_ip_action,json=resolveIpAction,proto3,oneof"` +} + func (*WatchInstruction_PortForwardAction) isWatchInstruction_Action() {} func (*WatchInstruction_SyncVmsAction) isWatchInstruction_Action() {} +func (*WatchInstruction_ResolveIpAction) isWatchInstruction_Action() {} + type PortForwardData struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -149,6 +163,61 @@ func (x *PortForwardData) GetData() []byte { return nil } +type ResolveIPResult struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Session string `protobuf:"bytes,1,opt,name=session,proto3" json:"session,omitempty"` + Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"` +} + +func (x *ResolveIPResult) Reset() { + *x = ResolveIPResult{} + if protoimpl.UnsafeEnabled { + mi := &file_orchard_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ResolveIPResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ResolveIPResult) ProtoMessage() {} + +func (x *ResolveIPResult) ProtoReflect() protoreflect.Message { + mi := &file_orchard_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ResolveIPResult.ProtoReflect.Descriptor instead. +func (*ResolveIPResult) Descriptor() ([]byte, []int) { + return file_orchard_proto_rawDescGZIP(), []int{2} +} + +func (x *ResolveIPResult) GetSession() string { + if x != nil { + return x.Session + } + return "" +} + +func (x *ResolveIPResult) GetIp() string { + if x != nil { + return x.Ip + } + return "" +} + type WatchInstruction_PortForward struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -157,7 +226,7 @@ type WatchInstruction_PortForward struct { // we can have multiple port forwards for the same vm/port pair // let's distinguish them by a unique session Session string `protobuf:"bytes,1,opt,name=session,proto3" json:"session,omitempty"` - // can be empty to request a forwarding to the worker itself + // can be empty to request port-forwarding to the worker itself VmUid string `protobuf:"bytes,2,opt,name=vm_uid,json=vmUid,proto3" json:"vm_uid,omitempty"` Port uint32 `protobuf:"varint,3,opt,name=port,proto3" json:"port,omitempty"` } @@ -165,7 +234,7 @@ type WatchInstruction_PortForward struct { func (x *WatchInstruction_PortForward) Reset() { *x = WatchInstruction_PortForward{} if protoimpl.UnsafeEnabled { - mi := &file_orchard_proto_msgTypes[2] + mi := &file_orchard_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -178,7 +247,7 @@ func (x *WatchInstruction_PortForward) String() string { func (*WatchInstruction_PortForward) ProtoMessage() {} func (x *WatchInstruction_PortForward) ProtoReflect() protoreflect.Message { - mi := &file_orchard_proto_msgTypes[2] + mi := &file_orchard_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -224,7 +293,7 @@ type WatchInstruction_SyncVMs struct { func (x *WatchInstruction_SyncVMs) Reset() { *x = WatchInstruction_SyncVMs{} if protoimpl.UnsafeEnabled { - mi := &file_orchard_proto_msgTypes[3] + mi := &file_orchard_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -237,7 +306,7 @@ func (x *WatchInstruction_SyncVMs) String() string { func (*WatchInstruction_SyncVMs) ProtoMessage() {} func (x *WatchInstruction_SyncVMs) ProtoReflect() protoreflect.Message { - mi := &file_orchard_proto_msgTypes[3] + mi := &file_orchard_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -253,12 +322,69 @@ func (*WatchInstruction_SyncVMs) Descriptor() ([]byte, []int) { return file_orchard_proto_rawDescGZIP(), []int{0, 1} } +type WatchInstruction_ResolveIP struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // we can have multiple IP resolution requests for the same vm + // let's distinguish them by a unique session + Session string `protobuf:"bytes,1,opt,name=session,proto3" json:"session,omitempty"` + VmUid string `protobuf:"bytes,2,opt,name=vm_uid,json=vmUid,proto3" json:"vm_uid,omitempty"` +} + +func (x *WatchInstruction_ResolveIP) Reset() { + *x = WatchInstruction_ResolveIP{} + if protoimpl.UnsafeEnabled { + mi := &file_orchard_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WatchInstruction_ResolveIP) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WatchInstruction_ResolveIP) ProtoMessage() {} + +func (x *WatchInstruction_ResolveIP) ProtoReflect() protoreflect.Message { + mi := &file_orchard_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WatchInstruction_ResolveIP.ProtoReflect.Descriptor instead. +func (*WatchInstruction_ResolveIP) Descriptor() ([]byte, []int) { + return file_orchard_proto_rawDescGZIP(), []int{0, 2} +} + +func (x *WatchInstruction_ResolveIP) GetSession() string { + if x != nil { + return x.Session + } + return "" +} + +func (x *WatchInstruction_ResolveIP) GetVmUid() string { + if x != nil { + return x.VmUid + } + return "" +} + var File_orchard_proto protoreflect.FileDescriptor var file_orchard_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x6f, 0x72, 0x63, 0x68, 0x61, 0x72, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x91, 0x02, 0x0a, + 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x9a, 0x03, 0x0a, 0x10, 0x57, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x4f, 0x0a, 0x13, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, @@ -269,26 +395,42 @@ var file_orchard_proto_rawDesc = []byte{ 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x56, 0x4d, 0x73, 0x48, 0x00, 0x52, 0x0d, 0x73, 0x79, 0x6e, 0x63, 0x56, 0x6d, - 0x73, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x52, 0x0a, 0x0b, 0x50, 0x6f, 0x72, 0x74, 0x46, - 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, - 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, - 0x12, 0x15, 0x0a, 0x06, 0x76, 0x6d, 0x5f, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x05, 0x76, 0x6d, 0x55, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x1a, 0x09, 0x0a, 0x07, 0x53, - 0x79, 0x6e, 0x63, 0x56, 0x4d, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x22, 0x25, 0x0a, 0x0f, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x44, - 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x79, 0x0a, 0x0a, 0x43, 0x6f, 0x6e, 0x74, 0x72, - 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x34, 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x16, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x11, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, - 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x30, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x50, - 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x12, 0x10, 0x2e, 0x50, 0x6f, 0x72, - 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x10, 0x2e, 0x50, - 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x44, 0x61, 0x74, 0x61, 0x28, 0x01, - 0x30, 0x01, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x63, 0x69, 0x72, 0x72, 0x75, 0x73, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x6f, 0x72, 0x63, 0x68, - 0x61, 0x72, 0x64, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x49, 0x0a, 0x11, 0x72, 0x65, 0x73, 0x6f, 0x6c, + 0x76, 0x65, 0x5f, 0x69, 0x70, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x73, 0x74, 0x72, 0x75, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x49, 0x50, 0x48, + 0x00, 0x52, 0x0f, 0x72, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x49, 0x70, 0x41, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x1a, 0x52, 0x0a, 0x0b, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, + 0x64, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x15, 0x0a, 0x06, 0x76, + 0x6d, 0x5f, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x6d, 0x55, + 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x1a, 0x09, 0x0a, 0x07, 0x53, 0x79, 0x6e, 0x63, 0x56, 0x4d, + 0x73, 0x1a, 0x3c, 0x0a, 0x09, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x49, 0x50, 0x12, 0x18, + 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x15, 0x0a, 0x06, 0x76, 0x6d, 0x5f, 0x75, + 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x6d, 0x55, 0x69, 0x64, 0x42, + 0x08, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x25, 0x0a, 0x0f, 0x50, 0x6f, 0x72, + 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x44, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x22, 0x3b, 0x0a, 0x0f, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x49, 0x50, 0x52, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x32, 0xb0, 0x01, + 0x0a, 0x0a, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x34, 0x0a, 0x05, + 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x11, 0x2e, + 0x57, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x30, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, + 0x64, 0x12, 0x10, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x44, + 0x61, 0x74, 0x61, 0x1a, 0x10, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, + 0x64, 0x44, 0x61, 0x74, 0x61, 0x28, 0x01, 0x30, 0x01, 0x12, 0x35, 0x0a, 0x09, 0x52, 0x65, 0x73, + 0x6f, 0x6c, 0x76, 0x65, 0x49, 0x50, 0x12, 0x10, 0x2e, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, + 0x49, 0x50, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, + 0x69, 0x72, 0x72, 0x75, 0x73, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x6f, 0x72, 0x63, 0x68, 0x61, 0x72, + 0x64, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -303,26 +445,31 @@ func file_orchard_proto_rawDescGZIP() []byte { return file_orchard_proto_rawDescData } -var file_orchard_proto_msgTypes = make([]protoimpl.MessageInfo, 4) -var file_orchard_proto_goTypes = []interface{}{ +var file_orchard_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_orchard_proto_goTypes = []any{ (*WatchInstruction)(nil), // 0: WatchInstruction (*PortForwardData)(nil), // 1: PortForwardData - (*WatchInstruction_PortForward)(nil), // 2: WatchInstruction.PortForward - (*WatchInstruction_SyncVMs)(nil), // 3: WatchInstruction.SyncVMs - (*emptypb.Empty)(nil), // 4: google.protobuf.Empty + (*ResolveIPResult)(nil), // 2: ResolveIPResult + (*WatchInstruction_PortForward)(nil), // 3: WatchInstruction.PortForward + (*WatchInstruction_SyncVMs)(nil), // 4: WatchInstruction.SyncVMs + (*WatchInstruction_ResolveIP)(nil), // 5: WatchInstruction.ResolveIP + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty } var file_orchard_proto_depIdxs = []int32{ - 2, // 0: WatchInstruction.port_forward_action:type_name -> WatchInstruction.PortForward - 3, // 1: WatchInstruction.sync_vms_action:type_name -> WatchInstruction.SyncVMs - 4, // 2: Controller.Watch:input_type -> google.protobuf.Empty - 1, // 3: Controller.PortForward:input_type -> PortForwardData - 0, // 4: Controller.Watch:output_type -> WatchInstruction - 1, // 5: Controller.PortForward:output_type -> PortForwardData - 4, // [4:6] is the sub-list for method output_type - 2, // [2:4] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 3, // 0: WatchInstruction.port_forward_action:type_name -> WatchInstruction.PortForward + 4, // 1: WatchInstruction.sync_vms_action:type_name -> WatchInstruction.SyncVMs + 5, // 2: WatchInstruction.resolve_ip_action:type_name -> WatchInstruction.ResolveIP + 6, // 3: Controller.Watch:input_type -> google.protobuf.Empty + 1, // 4: Controller.PortForward:input_type -> PortForwardData + 2, // 5: Controller.ResolveIP:input_type -> ResolveIPResult + 0, // 6: Controller.Watch:output_type -> WatchInstruction + 1, // 7: Controller.PortForward:output_type -> PortForwardData + 6, // 8: Controller.ResolveIP:output_type -> google.protobuf.Empty + 6, // [6:9] is the sub-list for method output_type + 3, // [3:6] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_orchard_proto_init() } @@ -331,7 +478,7 @@ func file_orchard_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_orchard_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_orchard_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*WatchInstruction); i { case 0: return &v.state @@ -343,7 +490,7 @@ func file_orchard_proto_init() { return nil } } - file_orchard_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_orchard_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*PortForwardData); i { case 0: return &v.state @@ -355,7 +502,19 @@ func file_orchard_proto_init() { return nil } } - file_orchard_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_orchard_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*ResolveIPResult); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_orchard_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*WatchInstruction_PortForward); i { case 0: return &v.state @@ -367,7 +526,7 @@ func file_orchard_proto_init() { return nil } } - file_orchard_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_orchard_proto_msgTypes[4].Exporter = func(v any, i int) any { switch v := v.(*WatchInstruction_SyncVMs); i { case 0: return &v.state @@ -379,10 +538,23 @@ func file_orchard_proto_init() { return nil } } + file_orchard_proto_msgTypes[5].Exporter = func(v any, i int) any { + switch v := v.(*WatchInstruction_ResolveIP); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } - file_orchard_proto_msgTypes[0].OneofWrappers = []interface{}{ + file_orchard_proto_msgTypes[0].OneofWrappers = []any{ (*WatchInstruction_PortForwardAction)(nil), (*WatchInstruction_SyncVmsAction)(nil), + (*WatchInstruction_ResolveIpAction)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -390,7 +562,7 @@ func file_orchard_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_orchard_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/rpc/orchard.proto b/rpc/orchard.proto index 59505ad..7328e7a 100644 --- a/rpc/orchard.proto +++ b/rpc/orchard.proto @@ -11,6 +11,9 @@ service Controller { // single purpose method when a port forward is requested and running // session information is passed in the requests metadata rpc PortForward(stream PortForwardData) returns (stream PortForwardData); + + // worker calls this method when it has successfully resolved the VM's IP + rpc ResolveIP(ResolveIPResult) returns (google.protobuf.Empty); } message WatchInstruction { @@ -23,14 +26,27 @@ message WatchInstruction { uint32 port = 3; } message SyncVMs { + // nothing for now + } + message ResolveIP { + // we can have multiple IP resolution requests for the same vm + // let's distinguish them by a unique session + string session = 1; + string vm_uid = 2; } oneof action { PortForward port_forward_action = 1; SyncVMs sync_vms_action = 2; + ResolveIP resolve_ip_action = 3; } } message PortForwardData { bytes data = 1; } + +message ResolveIPResult { + string session = 1; + string ip = 2; +} diff --git a/rpc/orchard_grpc.pb.go b/rpc/orchard_grpc.pb.go index b97afdd..d4f5a29 100644 --- a/rpc/orchard_grpc.pb.go +++ b/rpc/orchard_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 +// - protoc-gen-go-grpc v1.4.0 // - protoc (unknown) // source: orchard.proto @@ -16,12 +16,13 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +// Requires gRPC-Go v1.62.0 or later. +const _ = grpc.SupportPackageIsVersion8 const ( Controller_Watch_FullMethodName = "/Controller/Watch" Controller_PortForward_FullMethodName = "/Controller/PortForward" + Controller_ResolveIP_FullMethodName = "/Controller/ResolveIP" ) // ControllerClient is the client API for Controller service. @@ -33,6 +34,8 @@ type ControllerClient interface { // single purpose method when a port forward is requested and running // session information is passed in the requests metadata PortForward(ctx context.Context, opts ...grpc.CallOption) (Controller_PortForwardClient, error) + // worker calls this method when it has successfully resolved the VM's IP + ResolveIP(ctx context.Context, in *ResolveIPResult, opts ...grpc.CallOption) (*emptypb.Empty, error) } type controllerClient struct { @@ -44,11 +47,12 @@ func NewControllerClient(cc grpc.ClientConnInterface) ControllerClient { } func (c *controllerClient) Watch(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Controller_WatchClient, error) { - stream, err := c.cc.NewStream(ctx, &Controller_ServiceDesc.Streams[0], Controller_Watch_FullMethodName, opts...) + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Controller_ServiceDesc.Streams[0], Controller_Watch_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &controllerWatchClient{stream} + x := &controllerWatchClient{ClientStream: stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -76,11 +80,12 @@ func (x *controllerWatchClient) Recv() (*WatchInstruction, error) { } func (c *controllerClient) PortForward(ctx context.Context, opts ...grpc.CallOption) (Controller_PortForwardClient, error) { - stream, err := c.cc.NewStream(ctx, &Controller_ServiceDesc.Streams[1], Controller_PortForward_FullMethodName, opts...) + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Controller_ServiceDesc.Streams[1], Controller_PortForward_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &controllerPortForwardClient{stream} + x := &controllerPortForwardClient{ClientStream: stream} return x, nil } @@ -106,6 +111,16 @@ func (x *controllerPortForwardClient) Recv() (*PortForwardData, error) { return m, nil } +func (c *controllerClient) ResolveIP(ctx context.Context, in *ResolveIPResult, opts ...grpc.CallOption) (*emptypb.Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, Controller_ResolveIP_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // ControllerServer is the server API for Controller service. // All implementations must embed UnimplementedControllerServer // for forward compatibility @@ -115,6 +130,8 @@ type ControllerServer interface { // single purpose method when a port forward is requested and running // session information is passed in the requests metadata PortForward(Controller_PortForwardServer) error + // worker calls this method when it has successfully resolved the VM's IP + ResolveIP(context.Context, *ResolveIPResult) (*emptypb.Empty, error) mustEmbedUnimplementedControllerServer() } @@ -128,6 +145,9 @@ func (UnimplementedControllerServer) Watch(*emptypb.Empty, Controller_WatchServe func (UnimplementedControllerServer) PortForward(Controller_PortForwardServer) error { return status.Errorf(codes.Unimplemented, "method PortForward not implemented") } +func (UnimplementedControllerServer) ResolveIP(context.Context, *ResolveIPResult) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method ResolveIP not implemented") +} func (UnimplementedControllerServer) mustEmbedUnimplementedControllerServer() {} // UnsafeControllerServer may be embedded to opt out of forward compatibility for this service. @@ -146,7 +166,7 @@ func _Controller_Watch_Handler(srv interface{}, stream grpc.ServerStream) error if err := stream.RecvMsg(m); err != nil { return err } - return srv.(ControllerServer).Watch(m, &controllerWatchServer{stream}) + return srv.(ControllerServer).Watch(m, &controllerWatchServer{ServerStream: stream}) } type Controller_WatchServer interface { @@ -163,7 +183,7 @@ func (x *controllerWatchServer) Send(m *WatchInstruction) error { } func _Controller_PortForward_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(ControllerServer).PortForward(&controllerPortForwardServer{stream}) + return srv.(ControllerServer).PortForward(&controllerPortForwardServer{ServerStream: stream}) } type Controller_PortForwardServer interface { @@ -188,13 +208,36 @@ func (x *controllerPortForwardServer) Recv() (*PortForwardData, error) { return m, nil } +func _Controller_ResolveIP_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ResolveIPResult) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControllerServer).ResolveIP(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Controller_ResolveIP_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControllerServer).ResolveIP(ctx, req.(*ResolveIPResult)) + } + return interceptor(ctx, in, info, handler) +} + // Controller_ServiceDesc is the grpc.ServiceDesc for Controller service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var Controller_ServiceDesc = grpc.ServiceDesc{ ServiceName: "Controller", HandlerType: (*ControllerServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "ResolveIP", + Handler: _Controller_ResolveIP_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "Watch",