From f3b4eb42ca6bb31e7fc070337338506abe15dd02 Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Fri, 6 Feb 2026 18:16:40 +0100 Subject: [PATCH] Worker: decouple updateWorker() and syncVMs() to use different triggers (#403) * Worker: decouple updateWorker() and syncVMs() to use different triggers * Actually return an error otherwise errgroup won't terminate --- internal/worker/worker.go | 51 +++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index ea264c5..3dd88db 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -30,10 +30,14 @@ import ( "github.com/shirou/gopsutil/v4/mem" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/metadata" ) -const pollInterval = 5 * time.Second +const ( + pollInterval = 5 * time.Second + workerResourceUpdateInterval = 15 * time.Second +) var ErrPollFailed = errors.New("failed to poll controller") @@ -224,27 +228,44 @@ func (worker *Worker) runNewSession(ctx context.Context) error { return err } - for { - if err := worker.updateWorker(ctx); err != nil { - worker.logger.Errorf("failed to update worker resource: %v", err) + group, ctx := errgroup.WithContext(subCtx) - return nil + group.Go(func() error { + for { + if err := worker.updateWorker(ctx); err != nil { + return fmt.Errorf("failed to update worker resource: %w", err) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(workerResourceUpdateInterval): + // Proceed + } } + }) - if err := worker.syncVMs(subCtx, updateFunc); err != nil { - worker.logger.Warnf("failed to sync VMs: %v", err) + group.Go(func() error { + for { + if err := worker.syncVMs(ctx, updateFunc); err != nil { + return fmt.Errorf("failed to sync VMs: %w", err) + } - return nil + select { + case <-ctx.Done(): + return ctx.Err() + case <-worker.syncRequested: + case <-worker.pollTicker.C: + // Proceed + } } + }) - select { - case <-worker.syncRequested: - case <-worker.pollTicker.C: - // continue - case <-subCtx.Done(): - return subCtx.Err() - } + if err := group.Wait(); err != nil { + worker.logger.Errorf("%v", err) } + + return nil } func (worker *Worker) registerWorker(ctx context.Context) error {