orchard/internal/worker/imagepull.go

168 lines
4.0 KiB
Go

package worker
import (
"context"
"errors"
"github.com/cirruslabs/orchard/internal/worker/tart"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
mapset "github.com/deckarep/golang-set/v2"
"github.com/samber/mo"
)
type ImagePull struct {
Cancel context.CancelFunc
}
func (worker *Worker) syncPulls(ctx context.Context) error {
allKeys := mapset.NewSet[string]()
remotePulls, err := worker.client.ImagePulls().FindForWorker(ctx, worker.name)
if err != nil {
return err
}
remotePullsIndex := map[string]*v1.ImagePull{}
for _, remotePull := range remotePulls {
// A copy is needed to not reference the loop variable
remotePullCopy := remotePull
remotePullsIndex[remotePull.Name] = &remotePullCopy
allKeys.Add(remotePull.Name)
}
localPullsIndex := map[string]*ImagePull{}
for key, localPull := range worker.imagePulls {
localPullsIndex[key] = localPull
allKeys.Add(key)
}
worker.logger.Infof("syncing %d local image pulls against %d remote image pulls...",
len(localPullsIndex), len(remotePullsIndex))
for key := range allKeys.Iter() {
remotePull := mo.PointerToOption(remotePullsIndex[key])
localPull := mo.PointerToOption(localPullsIndex[key])
switch {
case remotePull.IsSome() && localPull.IsNone():
// No need to do anything about the image pull in terminal state
remotePullConditions := remotePull.MustGet().Conditions
if v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeCompleted) ||
v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeFailed) {
continue
}
// Image pull exists remotely, but not locally,
// create and start a new local pull
pullCtx, pullCtxCancel := context.WithCancel(ctx)
newLocalPull := &ImagePull{
Cancel: pullCtxCancel,
}
go func() {
defer pullCtxCancel()
worker.performPull(pullCtx, key, remotePull.MustGet().Image)
}()
worker.imagePulls[key] = newLocalPull
case remotePull.IsNone() && localPull.IsSome():
// Pull exists locally, but not remotely,
// terminate and delete the local pull
localPull.MustGet().Cancel()
delete(worker.imagePulls, key)
case remotePull.IsSome() && localPull.IsSome():
// Terminate local pull when remote pull enters terminal state
remotePullConditions := remotePull.MustGet().Conditions
if !v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeCompleted) &&
!v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeFailed) {
continue
}
localPull.MustGet().Cancel()
delete(worker.imagePulls, key)
}
}
return nil
}
func (worker *Worker) performPull(ctx context.Context, name string, image string) {
_, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{
Meta: v1.Meta{
Name: name,
},
PullState: v1.PullState{
Conditions: []v1.Condition{
{
Type: v1.ConditionTypeProgressing,
State: v1.ConditionStateTrue,
},
},
},
})
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
worker.logger.Errorf("failed to update image pull state: %v", err)
return
}
_, _, err = tart.Tart(ctx, worker.logger, "pull", image)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
worker.logger.Errorf("failed to pull image %s: %v", image, err)
_, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{
Meta: v1.Meta{
Name: name,
},
PullState: v1.PullState{
Conditions: []v1.Condition{
{
Type: v1.ConditionTypeFailed,
State: v1.ConditionStateTrue,
},
},
},
})
if err != nil {
worker.logger.Errorf("failed to update image pull state: %v", err)
}
return
}
if _, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{
Meta: v1.Meta{
Name: name,
},
PullState: v1.PullState{
Conditions: []v1.Condition{
{
Type: v1.ConditionTypeCompleted,
State: v1.ConditionStateTrue,
},
},
},
}); err != nil {
if errors.Is(err, context.Canceled) {
return
}
worker.logger.Errorf("failed to update image pull state: %v", err)
return
}
}