Worker: decouple updateWorker() and syncVMs() to use different triggers (#403)

* Worker: decouple updateWorker() and syncVMs() to use different triggers

* Actually return an error otherwise errgroup won't terminate
This commit is contained in:
Nikolay Edigaryev 2026-02-06 18:16:40 +01:00 committed by GitHub
parent bdc2af3d58
commit f3b4eb42ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 36 additions and 15 deletions

View File

@ -30,10 +30,14 @@ import (
"github.com/shirou/gopsutil/v4/mem"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/metadata"
)
const pollInterval = 5 * time.Second
const (
pollInterval = 5 * time.Second
workerResourceUpdateInterval = 15 * time.Second
)
var ErrPollFailed = errors.New("failed to poll controller")
@ -224,27 +228,44 @@ func (worker *Worker) runNewSession(ctx context.Context) error {
return err
}
for {
if err := worker.updateWorker(ctx); err != nil {
worker.logger.Errorf("failed to update worker resource: %v", err)
group, ctx := errgroup.WithContext(subCtx)
return nil
group.Go(func() error {
for {
if err := worker.updateWorker(ctx); err != nil {
return fmt.Errorf("failed to update worker resource: %w", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(workerResourceUpdateInterval):
// Proceed
}
}
})
if err := worker.syncVMs(subCtx, updateFunc); err != nil {
worker.logger.Warnf("failed to sync VMs: %v", err)
group.Go(func() error {
for {
if err := worker.syncVMs(ctx, updateFunc); err != nil {
return fmt.Errorf("failed to sync VMs: %w", err)
}
return nil
select {
case <-ctx.Done():
return ctx.Err()
case <-worker.syncRequested:
case <-worker.pollTicker.C:
// Proceed
}
}
})
select {
case <-worker.syncRequested:
case <-worker.pollTicker.C:
// continue
case <-subCtx.Done():
return subCtx.Err()
}
if err := group.Wait(); err != nil {
worker.logger.Errorf("%v", err)
}
return nil
}
func (worker *Worker) registerWorker(ctx context.Context) error {