From 26668f2cbd0fa665b100bf51989b5ad14a2ad79f Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Tue, 19 Aug 2025 15:31:18 +0200 Subject: [PATCH] orchard controller run: introduce --experimental-disable-db-compression (#336) --- internal/command/controller/run.go | 20 ++++++--- internal/controller/controller.go | 15 ++++--- internal/controller/option.go | 9 +++- internal/controller/scheduler/scheduler.go | 41 +++++++++++-------- .../controller/store/badger/badger_store.go | 11 ++++- 5 files changed, 65 insertions(+), 31 deletions(-) diff --git a/internal/command/controller/run.go b/internal/command/controller/run.go index 38e46d9..3a1b70d 100644 --- a/internal/command/controller/run.go +++ b/internal/command/controller/run.go @@ -5,6 +5,13 @@ import ( "encoding/pem" "errors" "fmt" + "net/http" + _ "net/http/pprof" + "os" + "path/filepath" + "strconv" + "time" + configpkg "github.com/cirruslabs/orchard/internal/config" "github.com/cirruslabs/orchard/internal/controller" "github.com/cirruslabs/orchard/internal/netconstants" @@ -12,12 +19,6 @@ import ( "github.com/gin-gonic/gin" "github.com/spf13/cobra" "go.uber.org/zap" - "net/http" - _ "net/http/pprof" - "os" - "path/filepath" - "strconv" - "time" ) var ErrRunFailed = errors.New("failed to run controller") @@ -32,6 +33,7 @@ var experimentalRPCV2 bool var noExperimentalRPCV2 bool var experimentalPingInterval time.Duration var deprecatedPrometheusMetrics bool +var experimentalDisableDBCompression bool func newRunCommand() *cobra.Command { cmd := &cobra.Command{ @@ -76,6 +78,8 @@ func newRunCommand() *cobra.Command { "smaller than the controller's default 30 second interval") cmd.Flags().BoolVar(&deprecatedPrometheusMetrics, "deprecated-prometheus-metrics", false, "enable Prometheus metrics, which will soon be deprecated in favor of OpenTelemetry") + cmd.Flags().BoolVar(&experimentalDisableDBCompression, "experimental-disable-db-compression", false, + "disable database compression, which might reduce RAM usage in some scenarios") return cmd } @@ -187,6 +191,10 @@ func runController(cmd *cobra.Command, args []string) (err error) { controllerOpts = append(controllerOpts, controller.WithPrometheusMetrics()) } + if experimentalDisableDBCompression { + controllerOpts = append(controllerOpts, controller.WithDisableDBCompression()) + } + controllerInstance, err := controller.New(controllerOpts...) if err != nil { return err diff --git a/internal/controller/controller.go b/internal/controller/controller.go index a32d444..617c794 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -5,6 +5,12 @@ import ( "crypto/tls" "errors" "fmt" + "net" + "net/http" + "os" + "strings" + "time" + "github.com/cirruslabs/orchard/internal/controller/notifier" "github.com/cirruslabs/orchard/internal/controller/rendezvous" "github.com/cirruslabs/orchard/internal/controller/scheduler" @@ -24,11 +30,6 @@ import ( "golang.org/x/net/http2/h2c" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" - "net" - "net/http" - "os" - "strings" - "time" ) var ( @@ -60,6 +61,7 @@ type Controller struct { workerOfflineTimeout time.Duration maxWorkersPerLicense uint experimentalRPCV2 bool + disableDBCompression bool pingInterval time.Duration prometheusMetrics bool @@ -112,7 +114,8 @@ func New(opts ...Option) (*Controller, error) { } // Instantiate the database - store, err := badger.NewBadgerStore(controller.dataDir.DBPath(), controller.logger) + store, err := badger.NewBadgerStore(controller.dataDir.DBPath(), controller.disableDBCompression, + controller.logger) if err != nil { return nil, err } diff --git a/internal/controller/option.go b/internal/controller/option.go index d2e8941..e30384a 100644 --- a/internal/controller/option.go +++ b/internal/controller/option.go @@ -2,9 +2,10 @@ package controller import ( "crypto/tls" + "time" + "go.uber.org/zap" "golang.org/x/crypto/ssh" - "time" ) type Option func(*Controller) @@ -59,6 +60,12 @@ func WithExperimentalRPCV2() Option { } } +func WithDisableDBCompression() Option { + return func(controller *Controller) { + controller.disableDBCompression = true + } +} + func WithPingInterval(pingInterval time.Duration) Option { return func(controller *Controller) { controller.pingInterval = pingInterval diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index 8fa3560..e6434a2 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -4,6 +4,10 @@ import ( "cmp" "context" "errors" + "slices" + "sort" + "time" + "github.com/cirruslabs/orchard/internal/controller/lifecycle" "github.com/cirruslabs/orchard/internal/controller/notifier" storepkg "github.com/cirruslabs/orchard/internal/controller/store" @@ -15,9 +19,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" - "slices" - "sort" - "time" ) const ( @@ -91,18 +92,21 @@ func (scheduler *Scheduler) Run() { } healthCheckingLoopIterationStart := time.Now() - if err := scheduler.healthCheckingLoopIteration(); err != nil { + numWorkersHealth, numVMsHealth, err := scheduler.healthCheckingLoopIteration() + healthCheckingLoopIterationEnd := time.Now() + if err != nil { scheduler.logger.Errorf("Failed to health-check VMs: %v", err) } - healthCheckingLoopIterationEnd := time.Now() schedulingLoopIterationStart := time.Now() - err := scheduler.schedulingLoopIteration() + numWorkersScheduling, numVMsScheduling, err := scheduler.schedulingLoopIteration() schedulingLoopIterationEnd := time.Now() - scheduler.logger.Debugf("Health checking loop iteration took %v, "+ - "scheduling loop iteration took %v", + scheduler.logger.Debugf("Health checking loop iteration for %d workers and %d VMs took %v, "+ + "scheduling loop iteration for %d workers and %d VMs took %v", + numWorkersHealth, numVMsHealth, healthCheckingLoopIterationEnd.Sub(healthCheckingLoopIterationStart), + numWorkersScheduling, numVMsScheduling, schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart)) if err != nil { @@ -140,7 +144,7 @@ func (scheduler *Scheduler) RequestScheduling() { } //nolint:gocognit,gocyclo // this logic could be seen as even more complex if split into multiple functions -func (scheduler *Scheduler) schedulingLoopIteration() error { +func (scheduler *Scheduler) schedulingLoopIteration() (int, int, error) { affectedWorkers := mapset.NewSet[string]() // Scheduler consistency model is based on the following: @@ -206,7 +210,7 @@ func (scheduler *Scheduler) schedulingLoopIteration() error { return nil }); err != nil { - return err + return 0, 0, err } unscheduledVMs, workerInfos := ProcessVMs(vms) @@ -334,7 +338,7 @@ NextVM: continue NextWorker } - return err + return 0, 0, err } // Update lagging resource usage @@ -364,7 +368,7 @@ NextVM: notifyContextCancel() } - return nil + return len(workers), len(vms), nil } func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) { @@ -387,7 +391,10 @@ func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) { return unscheduledVMs, workerToResources } -func (scheduler *Scheduler) healthCheckingLoopIteration() error { +func (scheduler *Scheduler) healthCheckingLoopIteration() (int, int, error) { + // Stats for the caller + var numWorkers, numVMs int + // Get a lagging view of VMs var vms []v1.VM @@ -398,6 +405,7 @@ func (scheduler *Scheduler) healthCheckingLoopIteration() error { if err != nil { return err } + numVMs = len(vms) // Update metrics if scheduler.prometheusMetrics { @@ -405,13 +413,14 @@ func (scheduler *Scheduler) healthCheckingLoopIteration() error { if err != nil { return err } + numWorkers = len(workers) scheduler.reportStats(workers, vms) } return nil }); err != nil { - return err + return 0, 0, err } // Process each VM in a lagging list of VMs in an individual @@ -445,11 +454,11 @@ func (scheduler *Scheduler) healthCheckingLoopIteration() error { return scheduler.healthCheckVM(txn, *currentVM) }); err != nil { - return err + return 0, 0, err } } - return nil + return numWorkers, numVMs, nil } func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, vm v1.VM) error { diff --git a/internal/controller/store/badger/badger_store.go b/internal/controller/store/badger/badger_store.go index 41f07af..1758835 100644 --- a/internal/controller/store/badger/badger_store.go +++ b/internal/controller/store/badger/badger_store.go @@ -3,11 +3,13 @@ package badger import ( "errors" "fmt" + "time" + "github.com/avast/retry-go/v4" "github.com/cirruslabs/orchard/internal/controller/store" "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v3/options" "go.uber.org/zap" - "time" ) type Store struct { @@ -20,11 +22,16 @@ type Transaction struct { store.Transaction } -func NewBadgerStore(dbPath string, logger *zap.SugaredLogger) (store.Store, error) { +func NewBadgerStore(dbPath string, noCompression bool, logger *zap.SugaredLogger) (store.Store, error) { opts := badger.DefaultOptions(dbPath).WithLogger(newBadgerLogger(logger)) opts.SyncWrites = true + if noCompression { + opts.Compression = options.None + opts.BlockCacheSize = 0 + } + db, err := badger.Open(opts) if err != nil { return nil, err