Ability to pre-pull VM images by targeting workers with labels

This commit is contained in:
Nikolay Edigaryev 2025-12-08 13:12:49 +01:00
parent 173a34b083
commit aa36ad4cb6
29 changed files with 1463 additions and 8 deletions

View File

@ -10,7 +10,11 @@ func NewCommand() *cobra.Command {
Short: "Create resources on the controller",
}
command.AddCommand(newCreateVMCommand(), newCreateServiceAccount())
command.AddCommand(
newCreateVMCommand(),
newCreateServiceAccount(),
newCreateImagePullJob(),
)
return command
}

View File

@ -0,0 +1,56 @@
package create
import (
"fmt"
"os"
"github.com/cirruslabs/orchard/internal/simplename"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/spf13/cobra"
)
func newCreateImagePullJob() *cobra.Command {
command := &cobra.Command{
Use: "imagepulljob NAME",
Short: "Create an image pull job",
RunE: runCreateImagePullJob,
Args: cobra.ExactArgs(1),
}
command.Flags().StringVar(&image, "image", "",
"image to pull")
command.Flags().StringToStringVar(&labels, "labels", map[string]string{},
"labels required by this image pull job")
return command
}
func runCreateImagePullJob(cmd *cobra.Command, args []string) error {
name := args[0]
// Issue a warning if the name used will be invalid in the future
if err := simplename.ValidateNext(name); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "WARNING: %v\n", err)
}
// Validate command-line arguments
if image == "" {
return fmt.Errorf("please specify an \"--image\" to pull")
}
imagePullJob := &v1.ImagePullJob{
Meta: v1.Meta{
Name: name,
},
Image: image,
Labels: labels,
}
client, err := client.New()
if err != nil {
return err
}
return client.ImagePullJobs().Create(cmd.Context(), imagePullJob)
}

View File

@ -10,7 +10,13 @@ func NewCommand() *cobra.Command {
Short: "Delete resources from the controller",
}
command.AddCommand(newDeleteVMCommand(), newDeleteServiceComandCommand(), newDeleteWorkerCommand())
command.AddCommand(
newDeleteVMCommand(),
newDeleteServiceComandCommand(),
newDeleteWorkerCommand(),
newDeleteImagePullCommand(),
newDeleteImagePullJobCommand(),
)
return command
}

View File

@ -0,0 +1,26 @@
package deletecmd
import (
"github.com/cirruslabs/orchard/pkg/client"
"github.com/spf13/cobra"
)
func newDeleteImagePullCommand() *cobra.Command {
return &cobra.Command{
Use: "imagepull NAME",
Short: "Delete an image pull",
Args: cobra.ExactArgs(1),
RunE: runDeleteImagePull,
}
}
func runDeleteImagePull(cmd *cobra.Command, args []string) error {
name := args[0]
client, err := client.New()
if err != nil {
return err
}
return client.ImagePulls().Delete(cmd.Context(), name)
}

View File

@ -0,0 +1,26 @@
package deletecmd
import (
"github.com/cirruslabs/orchard/pkg/client"
"github.com/spf13/cobra"
)
func newDeleteImagePullJobCommand() *cobra.Command {
return &cobra.Command{
Use: "imagepulljob NAME",
Short: "Delete an image pull job",
Args: cobra.ExactArgs(1),
RunE: runDeleteImagePullJob,
}
}
func runDeleteImagePullJob(cmd *cobra.Command, args []string) error {
name := args[0]
client, err := client.New()
if err != nil {
return err
}
return client.ImagePullJobs().Delete(cmd.Context(), name)
}

View File

@ -0,0 +1,53 @@
package list
import (
"fmt"
"github.com/cirruslabs/orchard/pkg/client"
"github.com/gosuri/uitable"
"github.com/spf13/cobra"
)
func newListImagePullJobsCommand() *cobra.Command {
command := &cobra.Command{
Use: "imagepulljobs",
Short: "List image pull jobs",
RunE: runListImagePullJobs,
}
return command
}
func runListImagePullJobs(cmd *cobra.Command, args []string) error {
client, err := client.New()
if err != nil {
return err
}
imagePullJobs, err := client.ImagePullJobs().List(cmd.Context())
if err != nil {
return err
}
if quiet {
for _, imagePullJob := range imagePullJobs {
fmt.Println(imagePullJob.Name)
}
return nil
}
table := uitable.New()
table.Wrap = true
table.AddRow("Name", "Image", "Labels", "Progressing", "Succeeded", "Failed", "Total")
for _, imagePullJob := range imagePullJobs {
table.AddRow(imagePullJob.Name, imagePullJob.Image, imagePullJob.Labels, imagePullJob.Progressing,
imagePullJob.Succeeded, imagePullJob.Failed, imagePullJob.Total)
}
fmt.Println(table)
return nil
}

View File

@ -0,0 +1,54 @@
package list
import (
"fmt"
"github.com/cirruslabs/orchard/pkg/client"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/gosuri/uitable"
"github.com/spf13/cobra"
)
func newListImagePullsCommand() *cobra.Command {
command := &cobra.Command{
Use: "imagepulls",
Short: "List image pulls",
RunE: runListImagePulls,
}
return command
}
func runListImagePulls(cmd *cobra.Command, args []string) error {
client, err := client.New()
if err != nil {
return err
}
imagePulls, err := client.ImagePulls().List(cmd.Context())
if err != nil {
return err
}
if quiet {
for _, imagePull := range imagePulls {
fmt.Println(imagePull.Name)
}
return nil
}
table := uitable.New()
table.Wrap = true
table.AddRow("Name", "Image", "Worker", "Conditions")
for _, imagePullJob := range imagePulls {
table.AddRow(imagePullJob.Name, imagePullJob.Image, imagePullJob.Worker,
v1.ConditionsHumanize(imagePullJob.Conditions))
}
fmt.Println(table)
return nil
}

View File

@ -12,7 +12,13 @@ func NewCommand() *cobra.Command {
Short: "List resources on the controller",
}
command.AddCommand(newListWorkersCommand(), newListVMsCommand(), newListServiceAccountsCommand())
command.AddCommand(
newListWorkersCommand(),
newListVMsCommand(),
newListServiceAccountsCommand(),
newListImagePullsCommand(),
newListImagePullJobsCommand(),
)
command.Flags().BoolVarP(&quiet, "", "q", false, "only show resource names")

View File

@ -183,6 +183,37 @@ func (controller *Controller) initAPI() *gin.Engine {
controller.appendVMEvents(c).Respond(c)
})
// Image pulls
v1.POST("/imagepulls", func(c *gin.Context) {
controller.createImagePull(c).Respond(c)
})
v1.PUT("/imagepulls/:name/state", func(c *gin.Context) {
controller.updateImagePullState(c).Respond(c)
})
v1.GET("/imagepulls/:name", func(c *gin.Context) {
controller.getImagePull(c).Respond(c)
})
v1.GET("/imagepulls", func(c *gin.Context) {
controller.listImagePulls(c).Respond(c)
})
v1.DELETE("/imagepulls/:name", func(c *gin.Context) {
controller.deleteImagePull(c).Respond(c)
})
// Image pull jobs
v1.POST("/imagepulljobs", func(c *gin.Context) {
controller.createImagePullJob(c).Respond(c)
})
v1.GET("/imagepulljobs/:name", func(c *gin.Context) {
controller.getImagePullJob(c).Respond(c)
})
v1.GET("/imagepulljobs", func(c *gin.Context) {
controller.listImagePullJobs(c).Respond(c)
})
v1.DELETE("/imagepulljobs/:name", func(c *gin.Context) {
controller.deleteImagePullJob(c).Respond(c)
})
return ginEngine
}

View File

@ -19,6 +19,7 @@ func (controller *Controller) controllerInfo(ctx *gin.Context) responder.Respond
capabilities := []v1pkg.ControllerCapability{
v1pkg.ControllerCapabilityRPCV1,
v1pkg.ControllerCapabilityVMStateEndpoint,
v1pkg.ControllerCapabilityImagePullResource,
}
if controller.experimentalRPCV2 {

View File

@ -0,0 +1,124 @@
package controller
import (
"errors"
"net/http"
"time"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/responder"
"github.com/cirruslabs/orchard/internal/simplename"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
func (controller *Controller) createImagePullJob(ctx *gin.Context) responder.Responder {
// Auth
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
// Parse user input
var userPullJob v1.ImagePullJob
if err := ctx.ShouldBindJSON(&userPullJob); err != nil {
return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided"))
}
// Validate user input
if userPullJob.Name == "" {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("name field cannot be empty"))
} else if err := simplename.Validate(userPullJob.Name); err != nil {
return responder.JSON(http.StatusPreconditionFailed,
NewErrorResponse("name field %v", err))
}
if userPullJob.Image == "" {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("image field cannot be empty"))
}
// Provide defaults
userPullJob.CreatedAt = time.Now()
userPullJob.UID = uuid.NewString()
response := controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
// Does this resource already exists?
_, err := txn.GetImagePullJob(userPullJob.Name)
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
controller.logger.Errorf("failed to check if the image pull job exists in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
if err == nil {
return responder.JSON(http.StatusConflict, NewErrorResponse("image pull job with this name "+
"already exists"))
}
if err := txn.SetImagePullJob(userPullJob); err != nil {
controller.logger.Errorf("failed to create image pull job in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
return responder.JSON(http.StatusOK, &userPullJob)
})
return response
}
func (controller *Controller) getImagePullJob(ctx *gin.Context) responder.Responder {
// Auth
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil {
return responder
}
name := ctx.Param("name")
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
dbPullJob, err := txn.GetImagePullJob(name)
if err != nil {
return responder.Error(err)
}
return responder.JSON(http.StatusOK, dbPullJob)
})
}
func (controller *Controller) listImagePullJobs(ctx *gin.Context) responder.Responder {
// Auth
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil {
return responder
}
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
dbPullJobs, err := txn.ListImagePullJobs()
if err != nil {
return responder.Error(err)
}
return responder.JSON(http.StatusOK, dbPullJobs)
})
}
func (controller *Controller) deleteImagePullJob(ctx *gin.Context) responder.Responder {
// Auth
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
name := ctx.Param("name")
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
_, err := txn.GetImagePullJob(name)
if err != nil {
return responder.Error(err)
}
err = txn.DeleteImagePullJob(name)
if err != nil {
return responder.Error(err)
}
return responder.Code(http.StatusOK)
})
}

View File

@ -0,0 +1,159 @@
package controller
import (
"errors"
"net/http"
"time"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
"github.com/cirruslabs/orchard/internal/responder"
"github.com/cirruslabs/orchard/internal/simplename"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
func (controller *Controller) createImagePull(ctx *gin.Context) responder.Responder {
// Auth
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
// Parse user input
var userPull v1.ImagePull
if err := ctx.ShouldBindJSON(&userPull); err != nil {
return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided"))
}
// Validate user input
if userPull.Name == "" {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("name field cannot is empty"))
} else if err := simplename.Validate(userPull.Name); err != nil {
return responder.JSON(http.StatusPreconditionFailed,
NewErrorResponse("name field %v", err))
}
if userPull.Image == "" {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("image field cannot be empty"))
}
if userPull.Worker == "" {
return responder.JSON(http.StatusPreconditionFailed, NewErrorResponse("worker field cannot be empty"))
}
// Provide defaults
userPull.CreatedAt = time.Now()
userPull.UID = uuid.NewString()
response := controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
// Does this resource already exists?
_, err := txn.GetImagePull(userPull.Name)
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
controller.logger.Errorf("failed to check if the image pull exists in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
if err == nil {
return responder.JSON(http.StatusConflict, NewErrorResponse("image pull with this name already exists"))
}
if err := txn.SetImagePull(userPull); err != nil {
controller.logger.Errorf("failed to create image pull in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
return responder.JSON(http.StatusOK, &userPull)
})
return response
}
func (controller *Controller) updateImagePullState(ctx *gin.Context) responder.Responder {
// Auth
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
// Parse user input
var userPull v1.ImagePull
if err := ctx.ShouldBindJSON(&userPull); err != nil {
return responder.JSON(http.StatusBadRequest, NewErrorResponse("invalid JSON was provided"))
}
name := ctx.Param("name")
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
dbPull, err := txn.GetImagePull(name)
if err != nil {
return responder.Error(err)
}
dbPull.PullState = userPull.PullState
if err := txn.SetImagePull(*dbPull); err != nil {
controller.logger.Errorf("failed to update image pull in the DB: %v", err)
return responder.Code(http.StatusInternalServerError)
}
return responder.JSON(http.StatusOK, dbPull)
})
}
func (controller *Controller) getImagePull(ctx *gin.Context) responder.Responder {
// Auth
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil {
return responder
}
name := ctx.Param("name")
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
dbPull, err := txn.GetImagePull(name)
if err != nil {
return responder.Error(err)
}
return responder.JSON(http.StatusOK, dbPull)
})
}
func (controller *Controller) listImagePulls(ctx *gin.Context) responder.Responder {
// Auth
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeRead); responder != nil {
return responder
}
return controller.storeView(func(txn storepkg.Transaction) responder.Responder {
dbPulls, err := txn.ListImagePulls()
if err != nil {
return responder.Error(err)
}
return responder.JSON(http.StatusOK, dbPulls)
})
}
func (controller *Controller) deleteImagePull(ctx *gin.Context) responder.Responder {
// Auth
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
return responder
}
name := ctx.Param("name")
return controller.storeUpdate(func(txn storepkg.Transaction) responder.Responder {
_, err := txn.GetImagePull(name)
if err != nil {
return responder.Error(err)
}
err = txn.DeleteImagePull(name)
if err != nil {
return responder.Error(err)
}
return responder.Code(http.StatusOK)
})
}

View File

@ -0,0 +1,259 @@
package scheduler
import (
"errors"
"fmt"
"time"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/samber/lo"
)
func (scheduler *Scheduler) imagePullLoopIteration() error {
// Get a lagging view of image pulls, image pull jobs and workers
var imagePulls []v1.ImagePull
var imagePullJobs []v1.ImagePullJob
var workers []v1.Worker
if err := scheduler.store.View(func(txn storepkg.Transaction) error {
var err error
imagePulls, err = txn.ListImagePulls()
if err != nil {
return err
}
imagePullJobs, err = txn.ListImagePullJobs()
if err != nil {
return err
}
workers, err = txn.ListWorkers()
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
scheduler.logger.Debugf("processing %d image pull jobs, %d image pulls and %d workers",
len(imagePullJobs), len(imagePulls), len(workers))
// Schedule new image pulls and update image pull job states
imagePullJobIndex := map[string]v1.ImagePullJob{}
for _, imagePullJob := range imagePullJobs {
imagePullJobIndex[imagePullJob.Name] = imagePullJob
// Schedule new image pulls
existingImagePulls := lo.Filter(imagePulls, func(imagePull v1.ImagePull, _ int) bool {
return lo.ContainsBy(imagePull.OwnerReferences, func(ownerReference v1.OwnerReference) bool {
return ownerReference == imagePullJob.OwnerReference()
})
})
for _, worker := range workers {
// Should we create an image pull for this worker?
if !worker.Labels.Contains(imagePullJob.Labels) {
continue
}
// Have we already created an image pull for this worker?
if _, ok := lo.Find(existingImagePulls, func(imagePull v1.ImagePull) bool {
return imagePull.Worker == worker.Name
}); ok {
continue
}
// Create an image pull for this worker
scheduler.logger.Debugf("creating image pull for job %s and worker %s",
imagePullJob.Name, worker.Name)
newImagePull, err := scheduler.createImagePull(imagePullJob, worker)
if err != nil {
return err
}
existingImagePulls = append(existingImagePulls, *newImagePull)
}
// Craft the current image pull job state
newImagePullJobState := v1.ImagePullJobState{
Total: int64(len(existingImagePulls)),
Progressing: int64(lo.CountBy(existingImagePulls, func(imagePull v1.ImagePull) bool {
return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeProgressing)
})),
Succeeded: int64(lo.CountBy(existingImagePulls, func(imagePull v1.ImagePull) bool {
return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeCompleted)
})),
Failed: int64(lo.CountBy(existingImagePulls, func(imagePull v1.ImagePull) bool {
return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeFailed)
})),
}
if newImagePullJobState.Progressing != 0 {
newImagePullJobState.Conditions = append(newImagePullJobState.Conditions, v1.Condition{
Type: v1.ConditionTypeProgressing,
State: v1.ConditionStateTrue,
})
}
if newImagePullJobState.Failed != 0 {
newImagePullJobState.Conditions = append(newImagePullJobState.Conditions, v1.Condition{
Type: v1.ConditionTypeFailed,
State: v1.ConditionStateTrue,
})
} else if (newImagePullJobState.Progressing + newImagePullJobState.Succeeded + newImagePullJobState.Failed) == newImagePullJobState.Total {
newImagePullJobState.Conditions = append(newImagePullJobState.Conditions, v1.Condition{
Type: v1.ConditionTypeCompleted,
State: v1.ConditionStateTrue,
})
}
// Is the current image pull job state any different?
if cmp.Equal(imagePullJob.ImagePullJobState, newImagePullJobState) {
continue
}
// Update image pull job state
if err := scheduler.store.Update(func(txn storepkg.Transaction) error {
dbPullJob, err := txn.GetImagePullJob(imagePullJob.Name)
if err != nil {
// Is the image pull job still exists?
if errors.Is(err, storepkg.ErrNotFound) {
return nil
}
return err
}
// Is it the same image pull job?
if dbPullJob.UID != imagePullJob.UID {
return nil
}
dbPullJob.ImagePullJobState = newImagePullJobState
return txn.SetImagePullJob(*dbPullJob)
}); err != nil {
return err
}
}
// Garbage collect orphaned image pulls
for _, imagePull := range imagePulls {
if lo.ContainsBy(imagePull.OwnerReferences, func(ownerReference v1.OwnerReference) bool {
imagePullJob, ok := imagePullJobIndex[ownerReference.Name]
if !ok {
return false
}
return ownerReference == imagePullJob.OwnerReference()
}) {
// This image pull is still controlled by an existing image pull job
continue
}
scheduler.logger.Debugf("removing image pull %s with non-existent owner reference", imagePull.Name)
if err := scheduler.store.Update(func(txn storepkg.Transaction) error {
// Is this image pull still exists?
dbImagePull, err := txn.GetImagePull(imagePull.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
scheduler.logger.Warnf("image pull %s is gone, perhaps the user "+
"manually deleted it?", imagePull.Name)
return nil
}
return err
}
// Is this the same image pull?
if imagePull.UID != dbImagePull.UID {
return nil
}
return txn.DeleteImagePull(imagePull.Name)
}); err != nil {
return err
}
}
return nil
}
func (scheduler *Scheduler) createImagePull(imagePullJob v1.ImagePullJob, worker v1.Worker) (*v1.ImagePull, error) {
var imagePull v1.ImagePull
if err := scheduler.store.Update(func(txn storepkg.Transaction) error {
dbPullJob, err := txn.GetImagePullJob(imagePullJob.Name)
if err != nil {
// Is the image pull job still exists?
if errors.Is(err, storepkg.ErrNotFound) {
return nil
}
return err
}
// Is it the same pull job?
if dbPullJob.UID != imagePullJob.UID {
return nil
}
dbWorker, err := txn.GetWorker(worker.Name)
if err != nil {
// Is the worker still exists?
if errors.Is(err, storepkg.ErrNotFound) {
return nil
}
return err
}
// Do the worker labels still match?
if !dbWorker.Labels.Contains(dbPullJob.Labels) {
return nil
}
imagePull = v1.ImagePull{
Meta: v1.Meta{
Name: fmt.Sprintf("%s-%s", dbPullJob.Name, dbWorker.Name),
CreatedAt: time.Now(),
},
UID: uuid.NewString(),
OwnerReferences: []v1.OwnerReference{
dbPullJob.OwnerReference(),
},
Image: dbPullJob.Image,
Worker: worker.Name,
}
_, err = txn.GetImagePull(imagePull.Name)
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
return err
}
if !errors.Is(err, storepkg.ErrNotFound) {
scheduler.logger.Warnf("image pull %s already exists, perhaps the user "+
"manually created it?", imagePull.Name)
return nil
}
if err := txn.SetImagePull(imagePull); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return &imagePull, nil
}

View File

@ -99,16 +99,24 @@ func (scheduler *Scheduler) Run() {
scheduler.logger.Errorf("Failed to health-check VMs: %v", err)
}
imagePullLoopIterationStart := time.Now()
err = scheduler.imagePullLoopIteration()
imagePullLoopIterationEnd := time.Now()
if err != nil {
scheduler.logger.Errorf("Failed to process image pulls and image pull jobs: %v", err)
}
schedulingLoopIterationStart := time.Now()
numWorkersScheduling, numVMsScheduling, err := scheduler.schedulingLoopIteration()
schedulingLoopIterationEnd := time.Now()
scheduler.logger.Debugf("Health checking loop iteration for %d workers and %d VMs took %v, "+
"scheduling loop iteration for %d workers and %d VMs took %v",
"scheduling loop iteration for %d workers and %d VMs took %v, image pull loop iteration took %v",
numWorkersHealth, numVMsHealth,
healthCheckingLoopIterationEnd.Sub(healthCheckingLoopIterationStart),
numWorkersScheduling, numVMsScheduling,
schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart))
schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart),
imagePullLoopIterationEnd.Sub(imagePullLoopIterationStart))
if err != nil {
scheduler.logger.Errorf("Failed to schedule VMs: %v", err)

View File

@ -0,0 +1,30 @@
//nolint:dupl // maybe we'll figure out how to make DB resource accessors generic in the future
package badger
import (
"path"
"github.com/cirruslabs/orchard/pkg/resource/v1"
)
const SpaceImagePulls = "/imagepulls"
func ImagePullKey(name string) []byte {
return []byte(path.Join(SpaceImagePulls, name))
}
func (txn *Transaction) GetImagePull(name string) (*v1.ImagePull, error) {
return genericGet[v1.ImagePull](txn, ImagePullKey(name))
}
func (txn *Transaction) SetImagePull(pull v1.ImagePull) error {
return genericSet[v1.ImagePull](txn, ImagePullKey(pull.Name), pull)
}
func (txn *Transaction) DeleteImagePull(name string) error {
return genericDelete(txn, ImagePullKey(name))
}
func (txn *Transaction) ListImagePulls() ([]v1.ImagePull, error) {
return genericList[v1.ImagePull](txn, []byte(SpaceImagePulls))
}

View File

@ -0,0 +1,30 @@
//nolint:dupl // maybe we'll figure out how to make DB resource accessors generic in the future
package badger
import (
"path"
"github.com/cirruslabs/orchard/pkg/resource/v1"
)
const SpaceImagePullJobs = "/imagepulljobs"
func ImagePullJobKey(name string) []byte {
return []byte(path.Join(SpaceImagePullJobs, name))
}
func (txn *Transaction) GetImagePullJob(name string) (*v1.ImagePullJob, error) {
return genericGet[v1.ImagePullJob](txn, ImagePullJobKey(name))
}
func (txn *Transaction) SetImagePullJob(pull v1.ImagePullJob) error {
return genericSet[v1.ImagePullJob](txn, ImagePullJobKey(pull.Name), pull)
}
func (txn *Transaction) DeleteImagePullJob(name string) error {
return genericDelete(txn, ImagePullJobKey(name))
}
func (txn *Transaction) ListImagePullJobs() ([]v1.ImagePullJob, error) {
return genericList[v1.ImagePullJob](txn, []byte(SpaceImagePullJobs))
}

View File

@ -47,4 +47,14 @@ type Transaction interface {
GetClusterSettings() (*v1.ClusterSettings, error)
SetClusterSettings(clusterSettings v1.ClusterSettings) error
GetImagePull(name string) (result *v1.ImagePull, err error)
SetImagePull(pull v1.ImagePull) (err error)
DeleteImagePull(name string) (err error)
ListImagePulls() (result []v1.ImagePull, err error)
GetImagePullJob(name string) (result *v1.ImagePullJob, err error)
SetImagePullJob(pull v1.ImagePullJob) (err error)
DeleteImagePullJob(name string) (err error)
ListImagePullJobs() (result []v1.ImagePullJob, err error)
}

View File

@ -0,0 +1,96 @@
package tests
import (
"testing"
"time"
"github.com/cirruslabs/orchard/internal/imageconstant"
"github.com/cirruslabs/orchard/internal/tests/devcontroller"
"github.com/cirruslabs/orchard/internal/tests/wait"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/stretchr/testify/require"
)
func TestImagePull(t *testing.T) {
devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t)
// Determine the worker name that we'll target
workers, err := devClient.Workers().List(t.Context())
require.NoError(t, err)
require.Len(t, workers, 1)
// Create an image pull
imagePullName := "test"
err = devClient.ImagePulls().Create(t.Context(), &v1.ImagePull{
Meta: v1.Meta{
Name: imagePullName,
},
Image: imageconstant.DefaultMacosImage,
Worker: workers[0].Name,
})
require.NoError(t, err)
// Wait for the image pull to enter terminal state
var imagePull *v1.ImagePull
require.True(t, wait.Wait(2*time.Minute, func() bool {
imagePull, err = devClient.ImagePulls().Get(t.Context(), imagePullName)
require.NoError(t, err)
t.Logf("Waiting for the image pull to enter terminal state. Current conditions: %s.",
v1.ConditionsHumanize(imagePull.Conditions))
return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeCompleted) ||
v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeFailed)
}), "failed to wait for image pull to enter terminal state")
// Ensure that image pull succeeded
require.True(t, v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeCompleted))
}
func TestImagePullJob(t *testing.T) {
devClient, _, _ := devcontroller.StartIntegrationTestEnvironment(t)
// Determine the worker name that we'll target
workers, err := devClient.Workers().List(t.Context())
require.NoError(t, err)
require.Len(t, workers, 1)
// Create an image pull job
imagePullJobName := "test"
err = devClient.ImagePullJobs().Create(t.Context(), &v1.ImagePullJob{
Meta: v1.Meta{
Name: imagePullJobName,
},
Image: imageconstant.DefaultMacosImage,
})
require.NoError(t, err)
// Wait for the image pull job to be completed
var imagePullJob *v1.ImagePullJob
require.True(t, wait.Wait(2*time.Minute, func() bool {
imagePullJob, err = devClient.ImagePullJobs().Get(t.Context(), imagePullJobName)
require.NoError(t, err)
t.Logf("Waiting for the image pull job to enter terminal state. Current conditions: %s.",
v1.ConditionsHumanize(imagePullJob.Conditions))
return v1.ConditionIsTrue(imagePullJob.Conditions, v1.ConditionTypeCompleted) ||
v1.ConditionIsTrue(imagePullJob.Conditions, v1.ConditionTypeFailed)
}), "failed to wait for image pull to enter terminal state")
// Ensure that image pull had succeeded
require.Equal(t, []v1.Condition{
{
Type: v1.ConditionTypeCompleted,
State: v1.ConditionStateTrue,
},
}, imagePullJob.Conditions)
require.EqualValues(t, 0, imagePullJob.Progressing)
require.EqualValues(t, 1, imagePullJob.Succeeded)
require.EqualValues(t, 0, imagePullJob.Failed)
require.EqualValues(t, 1, imagePullJob.Total)
}

View File

@ -0,0 +1,167 @@
package worker
import (
"context"
"errors"
"github.com/cirruslabs/orchard/internal/worker/tart"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
mapset "github.com/deckarep/golang-set/v2"
"github.com/samber/mo"
)
type ImagePull struct {
Cancel context.CancelFunc
}
func (worker *Worker) syncPulls(ctx context.Context) error {
allKeys := mapset.NewSet[string]()
remotePulls, err := worker.client.ImagePulls().FindForWorker(ctx, worker.name)
if err != nil {
return err
}
remotePullsIndex := map[string]*v1.ImagePull{}
for _, remotePull := range remotePulls {
// A copy is needed to not reference the loop variable
remotePullCopy := remotePull
remotePullsIndex[remotePull.Name] = &remotePullCopy
allKeys.Add(remotePull.Name)
}
localPullsIndex := map[string]*ImagePull{}
for key, localPull := range worker.imagePulls {
localPullsIndex[key] = localPull
allKeys.Add(key)
}
worker.logger.Infof("syncing %d local image pulls against %d remote image pulls...",
len(localPullsIndex), len(remotePullsIndex))
for key := range allKeys.Iter() {
remotePull := mo.PointerToOption(remotePullsIndex[key])
localPull := mo.PointerToOption(localPullsIndex[key])
switch {
case remotePull.IsSome() && localPull.IsNone():
// No need to do anything about the image pull in terminal state
remotePullConditions := remotePull.MustGet().Conditions
if v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeCompleted) ||
v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeFailed) {
continue
}
// Image pull exists remotely, but not locally,
// create and start a new local pull
pullCtx, pullCtxCancel := context.WithCancel(ctx)
newLocalPull := &ImagePull{
Cancel: pullCtxCancel,
}
go func() {
defer pullCtxCancel()
worker.performPull(pullCtx, key, remotePull.MustGet().Image)
}()
worker.imagePulls[key] = newLocalPull
case remotePull.IsNone() && localPull.IsSome():
// Pull exists locally, but not remotely,
// terminate and delete the local pull
localPull.MustGet().Cancel()
delete(worker.imagePulls, key)
case remotePull.IsSome() && localPull.IsSome():
// Terminate local pull when remote pull enters terminal state
remotePullConditions := remotePull.MustGet().Conditions
if !v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeCompleted) &&
!v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeFailed) {
continue
}
localPull.MustGet().Cancel()
delete(worker.imagePulls, key)
}
}
return nil
}
func (worker *Worker) performPull(ctx context.Context, name string, image string) {
_, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{
Meta: v1.Meta{
Name: name,
},
PullState: v1.PullState{
Conditions: []v1.Condition{
{
Type: v1.ConditionTypeProgressing,
State: v1.ConditionStateTrue,
},
},
},
})
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
worker.logger.Errorf("failed to update image pull state: %v", err)
return
}
_, _, err = tart.Tart(ctx, worker.logger, "pull", image)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
worker.logger.Errorf("failed to pull image %s: %v", image, err)
_, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{
Meta: v1.Meta{
Name: name,
},
PullState: v1.PullState{
Conditions: []v1.Condition{
{
Type: v1.ConditionTypeFailed,
State: v1.ConditionStateTrue,
},
},
},
})
if err != nil {
worker.logger.Errorf("failed to update image pull state: %v", err)
}
return
}
if _, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{
Meta: v1.Meta{
Name: name,
},
PullState: v1.PullState{
Conditions: []v1.Condition{
{
Type: v1.ConditionTypeCompleted,
State: v1.ConditionStateTrue,
},
},
},
}); err != nil {
if errors.Is(err, context.Canceled) {
return
}
worker.logger.Errorf("failed to update image pull state: %v", err)
return
}
}

View File

@ -51,6 +51,8 @@ type Worker struct {
localNetworkHelper *localnetworkhelper.LocalNetworkHelper
imagePulls map[string]*ImagePull
logger *zap.SugaredLogger
}
@ -60,6 +62,7 @@ func New(client *client.Client, opts ...Option) (*Worker, error) {
pollTicker: time.NewTicker(pollInterval),
vmm: vmmanager.New(),
syncRequested: make(chan bool, 1),
imagePulls: map[string]*ImagePull{},
}
// Apply options
@ -215,6 +218,14 @@ func (worker *Worker) runNewSession(ctx context.Context) error {
return nil
}
if info.Capabilities.Has(v1.ControllerCapabilityImagePullResource) {
if err := worker.syncPulls(subCtx); err != nil {
worker.logger.Warnf("failed to sync image pulls: %v", err)
return nil
}
}
select {
case <-worker.syncRequested:
case <-worker.pollTicker.C:

View File

@ -349,6 +349,18 @@ func (client *Client) ClusterSettings() *ClusterSettingsService {
}
}
func (client *Client) ImagePulls() *ImagePullsService {
return &ImagePullsService{
client: client,
}
}
func (client *Client) ImagePullJobs() *ImagePullJobsService {
return &ImagePullJobsService{
client: client,
}
}
func (client *Client) RPC() *RPCService {
return &RPCService{
client: client,

View File

@ -0,0 +1,82 @@
package client
import (
"context"
"fmt"
"net/http"
"net/url"
"github.com/cirruslabs/orchard/pkg/resource/v1"
)
const imagePullJobsEndpointPrefix = "imagepulljobs"
type ImagePullJobsService struct {
client *Client
}
func (service *ImagePullJobsService) Create(ctx context.Context, pullJob *v1.ImagePullJob) error {
err := service.client.request(ctx, http.MethodPost, imagePullJobsEndpointPrefix, pullJob, nil, nil)
if err != nil {
return err
}
return nil
}
func (service *ImagePullJobsService) List(ctx context.Context) ([]v1.ImagePullJob, error) {
var pullJobs []v1.ImagePullJob
err := service.client.request(ctx, http.MethodGet, imagePullJobsEndpointPrefix, nil, &pullJobs, nil)
if err != nil {
return nil, err
}
return pullJobs, nil
}
func (service *ImagePullJobsService) Get(ctx context.Context, name string) (*v1.ImagePullJob, error) {
var pullJob v1.ImagePullJob
err := service.client.request(ctx, http.MethodGet, fmt.Sprintf("%s/%s", imagePullJobsEndpointPrefix,
url.PathEscape(name)), nil, &pullJob, nil)
if err != nil {
return nil, err
}
return &pullJob, nil
}
func (service *ImagePullJobsService) Update(ctx context.Context, pull v1.ImagePullJob) (*v1.ImagePullJob, error) {
var updatedPullJob v1.ImagePullJob
err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("%s/%s", imagePullJobsEndpointPrefix,
url.PathEscape(pull.Name)), pull, &updatedPullJob, nil)
if err != nil {
return &updatedPullJob, err
}
return &updatedPullJob, nil
}
func (service *ImagePullJobsService) UpdateState(ctx context.Context, pull v1.ImagePullJob) (*v1.ImagePullJob, error) {
var updatedPullJob v1.ImagePullJob
err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("%s/%s/state", imagePullJobsEndpointPrefix,
url.PathEscape(pull.Name)), pull, &updatedPullJob, nil)
if err != nil {
return &updatedPullJob, err
}
return &updatedPullJob, nil
}
func (service *ImagePullJobsService) Delete(ctx context.Context, name string) error {
err := service.client.request(ctx, http.MethodDelete, fmt.Sprintf("%s/%s", imagePullJobsEndpointPrefix,
url.PathEscape(name)), nil, nil, nil)
if err != nil {
return err
}
return nil
}

101
pkg/client/imagepulls.go Normal file
View File

@ -0,0 +1,101 @@
package client
import (
"context"
"fmt"
"net/http"
"net/url"
"github.com/cirruslabs/orchard/pkg/resource/v1"
)
const pullsEndpointPrefix = "imagepulls"
type ImagePullsService struct {
client *Client
}
func (service *ImagePullsService) Create(ctx context.Context, pull *v1.ImagePull) error {
err := service.client.request(ctx, http.MethodPost, pullsEndpointPrefix, pull, nil, nil)
if err != nil {
return err
}
return nil
}
func (service *ImagePullsService) FindForWorker(ctx context.Context, worker string) ([]v1.ImagePull, error) {
allPulls, err := service.List(ctx)
if err != nil {
return nil, err
}
var result []v1.ImagePull
for _, pull := range allPulls {
if pull.Worker != worker {
continue
}
result = append(result, pull)
}
return result, nil
}
func (service *ImagePullsService) List(ctx context.Context) ([]v1.ImagePull, error) {
var pulls []v1.ImagePull
err := service.client.request(ctx, http.MethodGet, pullsEndpointPrefix, nil, &pulls, nil)
if err != nil {
return nil, err
}
return pulls, nil
}
func (service *ImagePullsService) Get(ctx context.Context, name string) (*v1.ImagePull, error) {
var pull v1.ImagePull
err := service.client.request(ctx, http.MethodGet, fmt.Sprintf("%s/%s", pullsEndpointPrefix,
url.PathEscape(name)), nil, &pull, nil)
if err != nil {
return nil, err
}
return &pull, nil
}
func (service *ImagePullsService) Update(ctx context.Context, pull v1.ImagePull) (*v1.ImagePull, error) {
var updatedPull v1.ImagePull
err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("%s/%s", pullsEndpointPrefix,
url.PathEscape(pull.Name)), pull, &updatedPull, nil)
if err != nil {
return &updatedPull, err
}
return &updatedPull, nil
}
func (service *ImagePullsService) UpdateState(ctx context.Context, pull v1.ImagePull) (*v1.ImagePull, error) {
var updatedPull v1.ImagePull
err := service.client.request(ctx, http.MethodPut, fmt.Sprintf("%s/%s/state", pullsEndpointPrefix,
url.PathEscape(pull.Name)), pull, &updatedPull, nil)
if err != nil {
return &updatedPull, err
}
return &updatedPull, nil
}
func (service *ImagePullsService) Delete(ctx context.Context, name string) error {
err := service.client.request(ctx, http.MethodDelete, fmt.Sprintf("%s/%s", pullsEndpointPrefix,
url.PathEscape(name)), nil, nil, nil)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,26 @@
package v1
type ImagePull struct {
Meta
// UID is a useful field for avoiding data races within a single Name.
//
// It is populated by the Controller when receiving a POST request.
UID string `json:"uid,omitempty"`
OwnerReferences []OwnerReference `json:"ownerReferences,omitempty"`
Image string `json:"image,omitempty"`
Worker string `json:"worker,omitempty"`
PullState
}
func (pull *ImagePull) SetVersion(version uint64) {
pull.Version = version
}
type PullState struct {
Conditions []Condition `json:"conditions,omitempty"`
}

View File

@ -0,0 +1,36 @@
package v1
type ImagePullJob struct {
Meta
// UID is a useful field for avoiding data races within a single Name.
//
// It is populated by the Controller when receiving a POST request.
UID string `json:"uid,omitempty"`
Image string `json:"image,omitempty"`
Labels Labels `json:"labels,omitempty"`
ImagePullJobState
}
func (imagePullJob *ImagePullJob) SetVersion(version uint64) {
imagePullJob.Version = version
}
func (imagePullJob *ImagePullJob) OwnerReference() OwnerReference {
return OwnerReference{
Kind: KindImagePullJob,
Name: imagePullJob.Name,
UID: imagePullJob.UID,
}
}
type ImagePullJobState struct {
Conditions []Condition `json:"conditions,omitempty"`
Progressing int64 `json:"progressing,omitempty"`
Succeeded int64 `json:"succeeded,omitempty"`
Failed int64 `json:"failed,omitempty"`
Total int64 `json:"total,omitempty"`
}

7
pkg/resource/v1/kind.go Normal file
View File

@ -0,0 +1,7 @@
package v1
type Kind string
const (
KindImagePullJob Kind = "ImagePullJob"
)

View File

@ -1,5 +1,10 @@
package v1
import (
"fmt"
"strings"
)
type Labels map[string]string
func (labels Labels) Contains(other Labels) bool {
@ -11,3 +16,13 @@ func (labels Labels) Contains(other Labels) bool {
return true
}
func (labels Labels) String() string {
var kvs []string
for key, value := range labels {
kvs = append(kvs, fmt.Sprintf("%s: %s", key, value))
}
return strings.Join(kvs, ", ")
}

View File

@ -21,6 +21,17 @@ type Meta struct {
Version uint64 `json:"version,omitempty"`
}
type OwnerReference struct {
Kind Kind `json:"kind,omitempty"`
Name string `json:"name,omitempty"`
// UID is a useful field for avoiding data races within a single Name.
//
// It is populated by the Controller when receiving a POST request.
UID string `json:"uid,omitempty"`
}
type VM struct {
Image string `json:"image,omitempty"`
ImagePullPolicy ImagePullPolicy `json:"imagePullPolicy,omitempty"`
@ -195,9 +206,10 @@ const (
type ControllerCapability string
const (
ControllerCapabilityRPCV1 ControllerCapability = "rpc-v1"
ControllerCapabilityRPCV2 ControllerCapability = "rpc-v2"
ControllerCapabilityVMStateEndpoint ControllerCapability = "vm-state-endpoint"
ControllerCapabilityRPCV1 ControllerCapability = "rpc-v1"
ControllerCapabilityRPCV2 ControllerCapability = "rpc-v2"
ControllerCapabilityVMStateEndpoint ControllerCapability = "vm-state-endpoint"
ControllerCapabilityImagePullResource ControllerCapability = "pull-resource"
)
type ControllerCapabilities []ControllerCapability

View File

@ -13,12 +13,19 @@ type Condition struct {
type ConditionType string
const (
// VM conditions
ConditionTypeScheduled ConditionType = "scheduled"
ConditionTypeRunning ConditionType = "running"
// VM conditions (internal to worker)
ConditionTypeCloning ConditionType = "cloning"
ConditionTypeSuspending ConditionType = "suspending"
ConditionTypeStopping ConditionType = "stopping"
// ImagePull and ImagePullJob conditions
ConditionTypeProgressing ConditionType = "progressing"
ConditionTypeCompleted ConditionType = "completed"
ConditionTypeFailed ConditionType = "failed"
)
type ConditionState string