168 lines
4.0 KiB
Go
168 lines
4.0 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/cirruslabs/orchard/internal/worker/tart"
|
|
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
|
|
mapset "github.com/deckarep/golang-set/v2"
|
|
"github.com/samber/mo"
|
|
)
|
|
|
|
type ImagePull struct {
|
|
Cancel context.CancelFunc
|
|
}
|
|
|
|
func (worker *Worker) syncPulls(ctx context.Context) error {
|
|
allKeys := mapset.NewSet[string]()
|
|
|
|
remotePulls, err := worker.client.ImagePulls().FindForWorker(ctx, worker.name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
remotePullsIndex := map[string]*v1.ImagePull{}
|
|
for _, remotePull := range remotePulls {
|
|
// A copy is needed to not reference the loop variable
|
|
remotePullCopy := remotePull
|
|
|
|
remotePullsIndex[remotePull.Name] = &remotePullCopy
|
|
allKeys.Add(remotePull.Name)
|
|
}
|
|
|
|
localPullsIndex := map[string]*ImagePull{}
|
|
for key, localPull := range worker.imagePulls {
|
|
localPullsIndex[key] = localPull
|
|
allKeys.Add(key)
|
|
}
|
|
|
|
worker.logger.Infof("syncing %d local image pulls against %d remote image pulls...",
|
|
len(localPullsIndex), len(remotePullsIndex))
|
|
|
|
for key := range allKeys.Iter() {
|
|
remotePull := mo.PointerToOption(remotePullsIndex[key])
|
|
localPull := mo.PointerToOption(localPullsIndex[key])
|
|
|
|
switch {
|
|
case remotePull.IsSome() && localPull.IsNone():
|
|
// No need to do anything about the image pull in terminal state
|
|
remotePullConditions := remotePull.MustGet().Conditions
|
|
|
|
if v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeCompleted) ||
|
|
v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeFailed) {
|
|
continue
|
|
}
|
|
|
|
// Image pull exists remotely, but not locally,
|
|
// create and start a new local pull
|
|
pullCtx, pullCtxCancel := context.WithCancel(ctx)
|
|
|
|
newLocalPull := &ImagePull{
|
|
Cancel: pullCtxCancel,
|
|
}
|
|
|
|
go func() {
|
|
defer pullCtxCancel()
|
|
|
|
worker.performPull(pullCtx, key, remotePull.MustGet().Image)
|
|
}()
|
|
|
|
worker.imagePulls[key] = newLocalPull
|
|
case remotePull.IsNone() && localPull.IsSome():
|
|
// Pull exists locally, but not remotely,
|
|
// terminate and delete the local pull
|
|
localPull.MustGet().Cancel()
|
|
delete(worker.imagePulls, key)
|
|
case remotePull.IsSome() && localPull.IsSome():
|
|
// Terminate local pull when remote pull enters terminal state
|
|
remotePullConditions := remotePull.MustGet().Conditions
|
|
|
|
if !v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeCompleted) &&
|
|
!v1.ConditionIsTrue(remotePullConditions, v1.ConditionTypeFailed) {
|
|
continue
|
|
}
|
|
|
|
localPull.MustGet().Cancel()
|
|
delete(worker.imagePulls, key)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (worker *Worker) performPull(ctx context.Context, name string, image string) {
|
|
_, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{
|
|
Meta: v1.Meta{
|
|
Name: name,
|
|
},
|
|
PullState: v1.PullState{
|
|
Conditions: []v1.Condition{
|
|
{
|
|
Type: v1.ConditionTypeProgressing,
|
|
State: v1.ConditionStateTrue,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
|
|
worker.logger.Errorf("failed to update image pull state: %v", err)
|
|
|
|
return
|
|
}
|
|
|
|
_, _, err = tart.Tart(ctx, worker.logger, "pull", image)
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
|
|
worker.logger.Errorf("failed to pull image %s: %v", image, err)
|
|
|
|
_, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{
|
|
Meta: v1.Meta{
|
|
Name: name,
|
|
},
|
|
PullState: v1.PullState{
|
|
Conditions: []v1.Condition{
|
|
{
|
|
Type: v1.ConditionTypeFailed,
|
|
State: v1.ConditionStateTrue,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
worker.logger.Errorf("failed to update image pull state: %v", err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
if _, err := worker.client.ImagePulls().UpdateState(ctx, v1.ImagePull{
|
|
Meta: v1.Meta{
|
|
Name: name,
|
|
},
|
|
PullState: v1.PullState{
|
|
Conditions: []v1.Condition{
|
|
{
|
|
Type: v1.ConditionTypeCompleted,
|
|
State: v1.ConditionStateTrue,
|
|
},
|
|
},
|
|
},
|
|
}); err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
|
|
worker.logger.Errorf("failed to update image pull state: %v", err)
|
|
|
|
return
|
|
}
|
|
}
|