orchard controller run: introduce --experimental-disable-db-compression (#336)

This commit is contained in:
Nikolay Edigaryev 2025-08-19 15:31:18 +02:00 committed by GitHub
parent a2fe11621f
commit 26668f2cbd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 65 additions and 31 deletions

View File

@ -5,6 +5,13 @@ import (
"encoding/pem"
"errors"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"strconv"
"time"
configpkg "github.com/cirruslabs/orchard/internal/config"
"github.com/cirruslabs/orchard/internal/controller"
"github.com/cirruslabs/orchard/internal/netconstants"
@ -12,12 +19,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/spf13/cobra"
"go.uber.org/zap"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"strconv"
"time"
)
var ErrRunFailed = errors.New("failed to run controller")
@ -32,6 +33,7 @@ var experimentalRPCV2 bool
var noExperimentalRPCV2 bool
var experimentalPingInterval time.Duration
var deprecatedPrometheusMetrics bool
var experimentalDisableDBCompression bool
func newRunCommand() *cobra.Command {
cmd := &cobra.Command{
@ -76,6 +78,8 @@ func newRunCommand() *cobra.Command {
"smaller than the controller's default 30 second interval")
cmd.Flags().BoolVar(&deprecatedPrometheusMetrics, "deprecated-prometheus-metrics", false,
"enable Prometheus metrics, which will soon be deprecated in favor of OpenTelemetry")
cmd.Flags().BoolVar(&experimentalDisableDBCompression, "experimental-disable-db-compression", false,
"disable database compression, which might reduce RAM usage in some scenarios")
return cmd
}
@ -187,6 +191,10 @@ func runController(cmd *cobra.Command, args []string) (err error) {
controllerOpts = append(controllerOpts, controller.WithPrometheusMetrics())
}
if experimentalDisableDBCompression {
controllerOpts = append(controllerOpts, controller.WithDisableDBCompression())
}
controllerInstance, err := controller.New(controllerOpts...)
if err != nil {
return err

View File

@ -5,6 +5,12 @@ import (
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/cirruslabs/orchard/internal/controller/notifier"
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
"github.com/cirruslabs/orchard/internal/controller/scheduler"
@ -24,11 +30,6 @@ import (
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"net"
"net/http"
"os"
"strings"
"time"
)
var (
@ -60,6 +61,7 @@ type Controller struct {
workerOfflineTimeout time.Duration
maxWorkersPerLicense uint
experimentalRPCV2 bool
disableDBCompression bool
pingInterval time.Duration
prometheusMetrics bool
@ -112,7 +114,8 @@ func New(opts ...Option) (*Controller, error) {
}
// Instantiate the database
store, err := badger.NewBadgerStore(controller.dataDir.DBPath(), controller.logger)
store, err := badger.NewBadgerStore(controller.dataDir.DBPath(), controller.disableDBCompression,
controller.logger)
if err != nil {
return nil, err
}

View File

@ -2,9 +2,10 @@ package controller
import (
"crypto/tls"
"time"
"go.uber.org/zap"
"golang.org/x/crypto/ssh"
"time"
)
type Option func(*Controller)
@ -59,6 +60,12 @@ func WithExperimentalRPCV2() Option {
}
}
func WithDisableDBCompression() Option {
return func(controller *Controller) {
controller.disableDBCompression = true
}
}
func WithPingInterval(pingInterval time.Duration) Option {
return func(controller *Controller) {
controller.pingInterval = pingInterval

View File

@ -4,6 +4,10 @@ import (
"cmp"
"context"
"errors"
"slices"
"sort"
"time"
"github.com/cirruslabs/orchard/internal/controller/lifecycle"
"github.com/cirruslabs/orchard/internal/controller/notifier"
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
@ -15,9 +19,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"slices"
"sort"
"time"
)
const (
@ -91,18 +92,21 @@ func (scheduler *Scheduler) Run() {
}
healthCheckingLoopIterationStart := time.Now()
if err := scheduler.healthCheckingLoopIteration(); err != nil {
numWorkersHealth, numVMsHealth, err := scheduler.healthCheckingLoopIteration()
healthCheckingLoopIterationEnd := time.Now()
if err != nil {
scheduler.logger.Errorf("Failed to health-check VMs: %v", err)
}
healthCheckingLoopIterationEnd := time.Now()
schedulingLoopIterationStart := time.Now()
err := scheduler.schedulingLoopIteration()
numWorkersScheduling, numVMsScheduling, err := scheduler.schedulingLoopIteration()
schedulingLoopIterationEnd := time.Now()
scheduler.logger.Debugf("Health checking loop iteration took %v, "+
"scheduling loop iteration took %v",
scheduler.logger.Debugf("Health checking loop iteration for %d workers and %d VMs took %v, "+
"scheduling loop iteration for %d workers and %d VMs took %v",
numWorkersHealth, numVMsHealth,
healthCheckingLoopIterationEnd.Sub(healthCheckingLoopIterationStart),
numWorkersScheduling, numVMsScheduling,
schedulingLoopIterationEnd.Sub(schedulingLoopIterationStart))
if err != nil {
@ -140,7 +144,7 @@ func (scheduler *Scheduler) RequestScheduling() {
}
//nolint:gocognit,gocyclo // this logic could be seen as even more complex if split into multiple functions
func (scheduler *Scheduler) schedulingLoopIteration() error {
func (scheduler *Scheduler) schedulingLoopIteration() (int, int, error) {
affectedWorkers := mapset.NewSet[string]()
// Scheduler consistency model is based on the following:
@ -206,7 +210,7 @@ func (scheduler *Scheduler) schedulingLoopIteration() error {
return nil
}); err != nil {
return err
return 0, 0, err
}
unscheduledVMs, workerInfos := ProcessVMs(vms)
@ -334,7 +338,7 @@ NextVM:
continue NextWorker
}
return err
return 0, 0, err
}
// Update lagging resource usage
@ -364,7 +368,7 @@ NextVM:
notifyContextCancel()
}
return nil
return len(workers), len(vms), nil
}
func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) {
@ -387,7 +391,10 @@ func ProcessVMs(vms []v1.VM) ([]v1.VM, WorkerInfos) {
return unscheduledVMs, workerToResources
}
func (scheduler *Scheduler) healthCheckingLoopIteration() error {
func (scheduler *Scheduler) healthCheckingLoopIteration() (int, int, error) {
// Stats for the caller
var numWorkers, numVMs int
// Get a lagging view of VMs
var vms []v1.VM
@ -398,6 +405,7 @@ func (scheduler *Scheduler) healthCheckingLoopIteration() error {
if err != nil {
return err
}
numVMs = len(vms)
// Update metrics
if scheduler.prometheusMetrics {
@ -405,13 +413,14 @@ func (scheduler *Scheduler) healthCheckingLoopIteration() error {
if err != nil {
return err
}
numWorkers = len(workers)
scheduler.reportStats(workers, vms)
}
return nil
}); err != nil {
return err
return 0, 0, err
}
// Process each VM in a lagging list of VMs in an individual
@ -445,11 +454,11 @@ func (scheduler *Scheduler) healthCheckingLoopIteration() error {
return scheduler.healthCheckVM(txn, *currentVM)
}); err != nil {
return err
return 0, 0, err
}
}
return nil
return numWorkers, numVMs, nil
}
func (scheduler *Scheduler) healthCheckVM(txn storepkg.Transaction, vm v1.VM) error {

View File

@ -3,11 +3,13 @@ package badger
import (
"errors"
"fmt"
"time"
"github.com/avast/retry-go/v4"
"github.com/cirruslabs/orchard/internal/controller/store"
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
"go.uber.org/zap"
"time"
)
type Store struct {
@ -20,11 +22,16 @@ type Transaction struct {
store.Transaction
}
func NewBadgerStore(dbPath string, logger *zap.SugaredLogger) (store.Store, error) {
func NewBadgerStore(dbPath string, noCompression bool, logger *zap.SugaredLogger) (store.Store, error) {
opts := badger.DefaultOptions(dbPath).WithLogger(newBadgerLogger(logger))
opts.SyncWrites = true
if noCompression {
opts.Compression = options.None
opts.BlockCacheSize = 0
}
db, err := badger.Open(opts)
if err != nil {
return nil, err