orchard/internal/controller/scheduler/imagepull.go

271 lines
7.2 KiB
Go

package scheduler
import (
"errors"
"fmt"
"time"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/samber/lo"
)
func (scheduler *Scheduler) imagePullLoopIteration() error {
// Get a lagging view of image pulls, image pull jobs and workers
var imagePulls []v1.ImagePull
var imagePullJobs []v1.ImagePullJob
var workers []v1.Worker
if err := scheduler.store.View(func(txn storepkg.Transaction) error {
var err error
imagePulls, err = txn.ListImagePulls()
if err != nil {
return err
}
imagePullJobs, err = txn.ListImagePullJobs()
if err != nil {
return err
}
workers, err = txn.ListWorkers()
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
scheduler.logger.Debugf("processing %d image pull jobs, %d image pulls and %d workers",
len(imagePullJobs), len(imagePulls), len(workers))
// Schedule new image pulls and update image pull job states
imagePullJobIndex := map[string]v1.ImagePullJob{}
for _, imagePullJob := range imagePullJobs {
imagePullJobIndex[imagePullJob.Name] = imagePullJob
// Schedule new image pulls
existingImagePulls := lo.Filter(imagePulls, func(imagePull v1.ImagePull, _ int) bool {
return lo.ContainsBy(imagePull.OwnerReferences, func(ownerReference v1.OwnerReference) bool {
return ownerReference == imagePullJob.OwnerReference()
})
})
for _, worker := range workers {
// Should we create an image pull for this worker?
if !worker.Labels.Contains(imagePullJob.Labels) {
continue
}
// Have we already created an image pull for this worker?
if _, ok := lo.Find(existingImagePulls, func(imagePull v1.ImagePull) bool {
return imagePull.Worker == worker.Name
}); ok {
continue
}
// Create an image pull for this worker
scheduler.logger.Debugf("creating image pull for job %s and worker %s",
imagePullJob.Name, worker.Name)
newImagePull, err := scheduler.createImagePull(imagePullJob, worker)
if err != nil {
return err
}
existingImagePulls = append(existingImagePulls, *newImagePull)
}
// Craft the current image pull job state
newImagePullJobState := v1.ImagePullJobState{
Total: int64(len(existingImagePulls)),
Progressing: int64(lo.CountBy(existingImagePulls, func(imagePull v1.ImagePull) bool {
return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeProgressing)
})),
Succeeded: int64(lo.CountBy(existingImagePulls, func(imagePull v1.ImagePull) bool {
return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeCompleted)
})),
Failed: int64(lo.CountBy(existingImagePulls, func(imagePull v1.ImagePull) bool {
return v1.ConditionIsTrue(imagePull.Conditions, v1.ConditionTypeFailed)
})),
}
if newImagePullJobState.Progressing != 0 {
newImagePullJobState.Conditions = append(newImagePullJobState.Conditions, v1.Condition{
Type: v1.ConditionTypeProgressing,
State: v1.ConditionStateTrue,
})
}
if newImagePullJobState.Failed != 0 {
newImagePullJobState.Conditions = append(newImagePullJobState.Conditions, v1.Condition{
Type: v1.ConditionTypeFailed,
State: v1.ConditionStateTrue,
})
} else if (newImagePullJobState.Succeeded + newImagePullJobState.Failed) == newImagePullJobState.Total {
newImagePullJobState.Conditions = append(newImagePullJobState.Conditions, v1.Condition{
Type: v1.ConditionTypeCompleted,
State: v1.ConditionStateTrue,
})
}
// Is the current image pull job state any different?
if cmp.Equal(imagePullJob.ImagePullJobState, newImagePullJobState) {
continue
}
// Update image pull job state
if err := scheduler.store.Update(func(txn storepkg.Transaction) error {
dbPullJob, err := txn.GetImagePullJob(imagePullJob.Name)
if err != nil {
// Is the image pull job still exists?
if errors.Is(err, storepkg.ErrNotFound) {
return nil
}
return err
}
// Is it the same image pull job?
if dbPullJob.UID != imagePullJob.UID {
return nil
}
dbPullJob.ImagePullJobState = newImagePullJobState
return txn.SetImagePullJob(*dbPullJob)
}); err != nil {
return err
}
}
// Garbage collect orphaned image pulls
for _, imagePull := range imagePulls {
// Is this image pull controlled by an image pull job?
imagePullJobOwnerReferences := lo.Filter(imagePull.OwnerReferences, func(ownerReference v1.OwnerReference, _ int) bool {
return ownerReference.Kind == v1.KindImagePullJob
})
if len(imagePullJobOwnerReferences) == 0 {
continue
}
// Does this image pull has any invalid references?
hasOrphanedPullJobOwnerReferences := lo.ContainsBy(imagePullJobOwnerReferences, func(ownerReference v1.OwnerReference) bool {
imagePullJob, ok := imagePullJobIndex[ownerReference.Name]
if !ok {
return true
}
return ownerReference != imagePullJob.OwnerReference()
})
if !hasOrphanedPullJobOwnerReferences {
continue
}
scheduler.logger.Debugf("removing image pull %s with non-existent owner reference", imagePull.Name)
if err := scheduler.store.Update(func(txn storepkg.Transaction) error {
// Is this image pull still exists?
dbImagePull, err := txn.GetImagePull(imagePull.Name)
if err != nil {
if errors.Is(err, storepkg.ErrNotFound) {
scheduler.logger.Warnf("image pull %s is gone, perhaps the user "+
"manually deleted it?", imagePull.Name)
return nil
}
return err
}
// Is this the same image pull?
if imagePull.UID != dbImagePull.UID {
return nil
}
return txn.DeleteImagePull(imagePull.Name)
}); err != nil {
return err
}
}
return nil
}
func (scheduler *Scheduler) createImagePull(imagePullJob v1.ImagePullJob, worker v1.Worker) (*v1.ImagePull, error) {
var imagePull v1.ImagePull
if err := scheduler.store.Update(func(txn storepkg.Transaction) error {
dbPullJob, err := txn.GetImagePullJob(imagePullJob.Name)
if err != nil {
// Is the image pull job still exists?
if errors.Is(err, storepkg.ErrNotFound) {
return nil
}
return err
}
// Is it the same pull job?
if dbPullJob.UID != imagePullJob.UID {
return nil
}
dbWorker, err := txn.GetWorker(worker.Name)
if err != nil {
// Is the worker still exists?
if errors.Is(err, storepkg.ErrNotFound) {
return nil
}
return err
}
// Do the worker labels still match?
if !dbWorker.Labels.Contains(dbPullJob.Labels) {
return nil
}
imagePull = v1.ImagePull{
Meta: v1.Meta{
Name: fmt.Sprintf("%s-%s", dbPullJob.Name, dbWorker.Name),
CreatedAt: time.Now(),
},
UID: uuid.NewString(),
OwnerReferences: []v1.OwnerReference{
dbPullJob.OwnerReference(),
},
Image: dbPullJob.Image,
Worker: worker.Name,
}
_, err = txn.GetImagePull(imagePull.Name)
if err != nil && !errors.Is(err, storepkg.ErrNotFound) {
return err
}
if !errors.Is(err, storepkg.ErrNotFound) {
scheduler.logger.Warnf("image pull %s already exists, perhaps the user "+
"manually created it?", imagePull.Name)
return nil
}
if err := txn.SetImagePull(imagePull); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return &imagePull, nil
}