diff --git a/go.mod b/go.mod index 565e6c4..f187a41 100644 --- a/go.mod +++ b/go.mod @@ -23,9 +23,11 @@ require ( go.uber.org/zap v1.24.0 golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 golang.org/x/net v0.5.0 + golang.org/x/term v0.4.0 google.golang.org/grpc v1.53.0 google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v3 v3.0.1 + howett.net/plist v1.0.0 ) require ( @@ -65,7 +67,6 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect golang.org/x/sys v0.4.0 // indirect - golang.org/x/term v0.4.0 // indirect golang.org/x/text v0.6.0 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 5414723..813c713 100644 --- a/go.sum +++ b/go.sum @@ -103,6 +103,7 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc= github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -292,6 +293,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa3CI79GS0ol3YnhVnKP89i0kNg= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= @@ -301,3 +303,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +howett.net/plist v1.0.0 h1:7CrbWYbPPO/PyNy38b2EB/+gYbjCe2DXBxgtOOZbSQM= +howett.net/plist v1.0.0/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g= diff --git a/internal/controller/api_service_accounts.go b/internal/controller/api_service_accounts.go index 5bd80d6..c6dc61b 100644 --- a/internal/controller/api_service_accounts.go +++ b/internal/controller/api_service_accounts.go @@ -31,7 +31,6 @@ func (controller *Controller) createServiceAccount(ctx *gin.Context) responder.R } serviceAccount.CreatedAt = time.Now() - serviceAccount.UID = uuid.New().String() serviceAccount.Generation = 0 return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { diff --git a/internal/controller/api_vms_portforward.go b/internal/controller/api_vms_portforward.go index 0746218..03b23e8 100644 --- a/internal/controller/api_vms_portforward.go +++ b/internal/controller/api_vms_portforward.go @@ -45,7 +45,7 @@ func (controller *Controller) portForwardVM(ctx *gin.Context) responder.Responde } // Sanity-check - if vm.WorkerUID == "" { + if vm.Worker == "" { return responder.Code(http.StatusServiceUnavailable) } @@ -55,7 +55,7 @@ func (controller *Controller) portForwardVM(ctx *gin.Context) responder.Responde defer cancel() // send request to worker to initiate port-forwarding connection back to us - err = controller.workerNotifier.Notify(ctx, vm.WorkerUID, &rpc.WatchInstruction{ + err = controller.workerNotifier.Notify(ctx, vm.Worker, &rpc.WatchInstruction{ Action: &rpc.WatchInstruction_PortForwardAction{ PortForwardAction: &rpc.WatchInstruction_PortForward{ Session: session, @@ -65,7 +65,7 @@ func (controller *Controller) portForwardVM(ctx *gin.Context) responder.Responde }, }) if err != nil { - controller.logger.Warnf("failed to request port-forwarding from the worker %s: %v", vm.WorkerUID, err) + controller.logger.Warnf("failed to request port-forwarding from the worker %s: %v", vm.Worker, err) return responder.Code(http.StatusServiceUnavailable) } diff --git a/internal/controller/api_workers.go b/internal/controller/api_workers.go index c369cca..64087fb 100644 --- a/internal/controller/api_workers.go +++ b/internal/controller/api_workers.go @@ -6,7 +6,6 @@ import ( "github.com/cirruslabs/orchard/internal/responder" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/gin-gonic/gin" - "github.com/google/uuid" "net/http" "time" ) @@ -31,13 +30,17 @@ func (controller *Controller) createWorker(ctx *gin.Context) responder.Responder worker.LastSeen = currentTime } worker.CreatedAt = currentTime - worker.UID = uuid.New().String() worker.Generation = 0 return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder { - // Does the worker resource with this name already exists? - _, err := txn.GetWorker(worker.Name) - if !errors.Is(err, storepkg.ErrNotFound) { + // In case there already exist a worker with the same name, + // allow overwriting it if the request comes from a worker + // with the same machine ID + dbWorker, err := txn.GetWorker(worker.Name) + if err != nil && !errors.Is(err, storepkg.ErrNotFound) { + return responder.Code(http.StatusInternalServerError) + } + if err == nil && worker.MachineID != dbWorker.MachineID { return responder.Code(http.StatusConflict) } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 980f96a..0407847 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -12,7 +12,6 @@ import ( "github.com/cirruslabs/orchard/internal/netconstants" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" "github.com/cirruslabs/orchard/rpc" - "github.com/google/uuid" "go.uber.org/zap" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" @@ -121,7 +120,6 @@ func (controller *Controller) EnsureServiceAccount(serviceAccount *v1.ServiceAcc } serviceAccount.CreatedAt = time.Now() - serviceAccount.UID = uuid.New().String() serviceAccount.Generation = 0 return controller.store.Update(func(txn storepkg.Transaction) error { @@ -137,7 +135,7 @@ func (controller *Controller) DeleteServiceAccount(name string) error { func (controller *Controller) Run(ctx context.Context) error { // Run the scheduler so that each VM will eventually - // be assigned to a specific WorkerUID + // be assigned to a specific Worker go func() { err := runScheduler(controller.store) if err != nil { diff --git a/internal/controller/notifier/notifier.go b/internal/controller/notifier/notifier.go index 0b0d72f..316a7fc 100644 --- a/internal/controller/notifier/notifier.go +++ b/internal/controller/notifier/notifier.go @@ -25,25 +25,25 @@ func NewNotifier() *Notifier { } } -func (watcher *Notifier) Register(ctx context.Context, workerUID string) (chan *rpc.WatchInstruction, func()) { +func (watcher *Notifier) Register(ctx context.Context, worker string) (chan *rpc.WatchInstruction, func()) { subCtx, cancel := context.WithCancel(ctx) workerCh := make(chan *rpc.WatchInstruction) - watcher.workers.Store(workerUID, &WorkerSlot{ + watcher.workers.Store(worker, &WorkerSlot{ ctx: subCtx, ch: workerCh, }) return workerCh, func() { - watcher.workers.Delete(workerUID) + watcher.workers.Delete(worker) cancel() } } -func (watcher *Notifier) Notify(ctx context.Context, workerUID string, msg *rpc.WatchInstruction) error { - slot, ok := watcher.workers.Load(workerUID) +func (watcher *Notifier) Notify(ctx context.Context, worker string, msg *rpc.WatchInstruction) error { + slot, ok := watcher.workers.Load(worker) if !ok { - return fmt.Errorf("%w: %s", ErrNoWorker, workerUID) + return fmt.Errorf("%w: %s", ErrNoWorker, worker) } select { diff --git a/internal/controller/rpc.go b/internal/controller/rpc.go index 05214d9..9c7f8cf 100644 --- a/internal/controller/rpc.go +++ b/internal/controller/rpc.go @@ -18,13 +18,13 @@ func (controller *Controller) Watch(_ *emptypb.Empty, stream rpc.Controller_Watc return status.Errorf(codes.Unauthenticated, "auth failed") } - workerUIDMetadataValue := metadata.ValueFromIncomingContext(stream.Context(), rpc.MetadataWorkerUIDKey) - if len(workerUIDMetadataValue) == 0 { - return status.Errorf(codes.InvalidArgument, "no worker UID in metadata") + workerMetadataValue := metadata.ValueFromIncomingContext(stream.Context(), rpc.MetadataWorkerNameKey) + if len(workerMetadataValue) == 0 { + return status.Errorf(codes.InvalidArgument, "no worker ident in metadata") } - workerUID := workerUIDMetadataValue[0] - workerCh, cancel := controller.workerNotifier.Register(stream.Context(), workerUID) + worker := workerMetadataValue[0] + workerCh, cancel := controller.workerNotifier.Register(stream.Context(), worker) defer cancel() for { diff --git a/internal/controller/scheduler.go b/internal/controller/scheduler.go index 6c30231..4c47c69 100644 --- a/internal/controller/scheduler.go +++ b/internal/controller/scheduler.go @@ -51,7 +51,7 @@ func runSchedulerInner(store storepkg.Store) error { for _, vm := range vms { vm := vm - if vm.WorkerUID != "" { + if vm.Worker != "" { continue } @@ -59,7 +59,7 @@ func runSchedulerInner(store storepkg.Store) error { for _, worker := range workers { worker := worker - vm.WorkerUID = worker.UID + vm.Worker = worker.Name err := store.Update(func(txn storepkg.Transaction) error { return txn.SetVM(vm) diff --git a/internal/worker/iokitregistry/iokitregistry.go b/internal/worker/iokitregistry/iokitregistry.go new file mode 100644 index 0000000..738561b --- /dev/null +++ b/internal/worker/iokitregistry/iokitregistry.go @@ -0,0 +1,47 @@ +package iokitregistry + +import ( + "errors" + "fmt" + "howett.net/plist" + "os/exec" +) + +var ErrFailed = errors.New("failed to query I/O Kit registry") + +type Entry struct { + IOPlatformUUID string +} + +func PlatformUUID() (string, error) { + ioregPath, err := exec.LookPath("ioreg") + if err != nil { + // Fallback since on some systems the PATH + // variable does not include /usr/sbin + ioregPath = "/usr/sbin/ioreg" + } + + ioregCmd := exec.Command(ioregPath, "-a", "-c", "IOPlatformExpertDevice", "-rd1") + + ioregOutput, err := ioregCmd.Output() + if err != nil { + return "", fmt.Errorf("%w: failed to run ioreg command: %v", ErrFailed, err) + } + + var entries []Entry + + _, err = plist.Unmarshal(ioregOutput, &entries) + if err != nil { + return "", fmt.Errorf("%w: failed to unmarshal ioreg command's output: %v", + ErrFailed, err) + } + + for _, entry := range entries { + if entry.IOPlatformUUID != "" { + return entry.IOPlatformUUID, nil + } + } + + return "", fmt.Errorf("%w: no platform UUID found in the ioreg command's output", + ErrFailed) +} diff --git a/internal/worker/iokitregistry/iokitregistry_test.go b/internal/worker/iokitregistry/iokitregistry_test.go new file mode 100644 index 0000000..9955447 --- /dev/null +++ b/internal/worker/iokitregistry/iokitregistry_test.go @@ -0,0 +1,16 @@ +package iokitregistry_test + +import ( + "github.com/cirruslabs/orchard/internal/worker/iokitregistry" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "testing" +) + +func TestPlatformUUID(t *testing.T) { + platformUUID, err := iokitregistry.PlatformUUID() + require.NoError(t, err) + + _, err = uuid.Parse(platformUUID) + require.NoError(t, err) +} diff --git a/internal/worker/rpc.go b/internal/worker/rpc.go index dc180d3..5c1a3d4 100644 --- a/internal/worker/rpc.go +++ b/internal/worker/rpc.go @@ -75,9 +75,7 @@ func (worker *Worker) handlePortForward( // Obtain VM vm, err := worker.vmm.Get(v1.VM{ - Meta: v1.Meta{ - UID: portForwardAction.VmUid, - }, + UID: portForwardAction.VmUid, }) if err != nil { worker.logger.Warnf("port forwarding failed: failed to get the VM: %v", err) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index daeb7cb..c3b5e67 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/avast/retry-go/v4" + "github.com/cirruslabs/orchard/internal/worker/iokitregistry" "github.com/cirruslabs/orchard/internal/worker/vmmanager" "github.com/cirruslabs/orchard/pkg/client" v1 "github.com/cirruslabs/orchard/pkg/resource/v1" @@ -23,7 +24,6 @@ var ErrPollFailed = errors.New("failed to poll controller") type Worker struct { dataDirPath string name string - uid string vmm *vmmanager.VMManager client *client.Client logger *zap.SugaredLogger @@ -113,19 +113,23 @@ func (worker *Worker) runNewSession(ctx context.Context) error { } func (worker *Worker) registerWorker(ctx context.Context) error { - workerResource, err := worker.client.Workers().Create(ctx, v1.Worker{ + platformUUID, err := iokitregistry.PlatformUUID() + if err != nil { + return err + } + + _, err = worker.client.Workers().Create(ctx, v1.Worker{ Meta: v1.Meta{ Name: worker.name, }, - LastSeen: time.Now(), + LastSeen: time.Now(), + MachineID: platformUUID, }) if err != nil { return err } - worker.uid = workerResource.UID - - worker.logger.Infof("registered worker %s with UID %s", worker.name, workerResource.UID) + worker.logger.Infof("registered worker %s", worker.name) return nil } @@ -136,11 +140,6 @@ func (worker *Worker) updateWorker(ctx context.Context) error { return fmt.Errorf("%w: failed to retrieve worker from the API: %v", ErrPollFailed, err) } - if workerResource.UID != worker.uid { - return fmt.Errorf("%w: our UID is %s, controller's UID is %s", - ErrPollFailed, worker.uid, workerResource.UID) - } - worker.logger.Debugf("got worker from the API") workerResource.LastSeen = time.Now() @@ -155,7 +154,7 @@ func (worker *Worker) updateWorker(ctx context.Context) error { } func (worker *Worker) syncVMs(ctx context.Context) error { - remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.uid) + remoteVMs, err := worker.client.VMs().FindForWorker(ctx, worker.name) if err != nil { return err } @@ -340,6 +339,5 @@ func (worker *Worker) GPRCMetadata() metadata.MD { return metadata.Join( worker.client.GPRCMetadata(), metadata.Pairs(rpc.MetadataWorkerNameKey, worker.name), - metadata.Pairs(rpc.MetadataWorkerUIDKey, worker.uid), ) } diff --git a/pkg/client/vms.go b/pkg/client/vms.go index ae837a9..c4aac4e 100644 --- a/pkg/client/vms.go +++ b/pkg/client/vms.go @@ -23,20 +23,22 @@ func (service *VMsService) Create(ctx context.Context, vm *v1.VM) error { return nil } -func (service *VMsService) FindForWorker(ctx context.Context, workerUID string) (map[string]v1.VM, error) { +func (service *VMsService) FindForWorker(ctx context.Context, worker string) (map[string]v1.VM, error) { allVms, err := service.List(ctx) - if err != nil { return nil, err } var filteredVms = make(map[string]v1.VM) + for _, vmResource := range allVms { - if vmResource.WorkerUID != workerUID { + if vmResource.Worker != worker { continue } + filteredVms[vmResource.UID] = vmResource } + return filteredVms, nil } diff --git a/pkg/resource/v1/v1.go b/pkg/resource/v1/v1.go index c54275f..8df9a02 100644 --- a/pkg/resource/v1/v1.go +++ b/pkg/resource/v1/v1.go @@ -17,11 +17,6 @@ type Meta struct { // when receiving a POST request. CreatedAt time.Time `json:"createdAt"` - // UID is a useful field for avoiding data races within a single Name. - // - // It is populated by the Controller when receiving a POST request. - UID string `json:"uid"` - // Generation is a useful field for avoiding data races within a single UID. // // It is populated by the controller when receiving POST or PUT requests. @@ -33,6 +28,8 @@ type Worker struct { // to track unhealthy Workers. LastSeen time.Time + MachineID string + Meta } @@ -47,14 +44,19 @@ type VM struct { Status VMStatus `json:"status"` StatusMessage string `json:"status_message"` - // WorkerUID field is set by the Controller to assign this VM to a specific WorkerUID. - WorkerUID string `json:"worker"` + // Worker field is set by the Controller to assign this VM to a specific Worker. + Worker string `json:"worker"` Username string `json:"username"` Password string `json:"password"` StartupScript *VMScript `json:"startup_script"` ShutdownScript *VMScript `json:"shutdown_script"` + // UID is a useful field for avoiding data races within a single Name. + // + // It is populated by the Controller when receiving a POST request. + UID string `json:"uid"` + Meta } diff --git a/rpc/constants.go b/rpc/constants.go index ceea681..f9cf3be 100644 --- a/rpc/constants.go +++ b/rpc/constants.go @@ -7,6 +7,4 @@ const MetadataServiceAccountTokenKey = "x-orchard-service-account-token" const MetadataWorkerNameKey = "x-orchard-worker-name" -const MetadataWorkerUIDKey = "x-orchard-worker-uid" - const MetadataWorkerPortForwardingSessionKey = "x-orchard-port-forwarding-session"