Worker: decouple updateWorker() and syncVMs() to use different triggers

This commit is contained in:
Nikolay Edigaryev 2026-02-06 17:39:31 +01:00
parent bdc2af3d58
commit 261f643f8f
1 changed files with 37 additions and 16 deletions

View File

@ -30,10 +30,14 @@ import (
"github.com/shirou/gopsutil/v4/mem"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/metadata"
)
const pollInterval = 5 * time.Second
const (
pollInterval = 5 * time.Second
workerResourceUpdateInterval = 15 * time.Second
)
var ErrPollFailed = errors.New("failed to poll controller")
@ -224,27 +228,44 @@ func (worker *Worker) runNewSession(ctx context.Context) error {
return err
}
for {
if err := worker.updateWorker(ctx); err != nil {
worker.logger.Errorf("failed to update worker resource: %v", err)
group, ctx := errgroup.WithContext(subCtx)
return nil
group.Go(func() error {
for {
if err := worker.updateWorker(ctx); err != nil {
worker.logger.Errorf("failed to update worker resource: %v", err)
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(workerResourceUpdateInterval):
// Proceed
}
}
})
if err := worker.syncVMs(subCtx, updateFunc); err != nil {
worker.logger.Warnf("failed to sync VMs: %v", err)
group.Go(func() error {
for {
if err := worker.syncVMs(ctx, updateFunc); err != nil {
worker.logger.Warnf("failed to sync VMs: %v", err)
return nil
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-worker.syncRequested:
case <-worker.pollTicker.C:
// Proceed
}
}
})
select {
case <-worker.syncRequested:
case <-worker.pollTicker.C:
// continue
case <-subCtx.Done():
return subCtx.Err()
}
}
return group.Wait()
}
func (worker *Worker) registerWorker(ctx context.Context) error {