diff --git a/internal/command/controller/run.go b/internal/command/controller/run.go index a31f1e2..788073a 100644 --- a/internal/command/controller/run.go +++ b/internal/command/controller/run.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "strconv" + "time" ) var ErrRunFailed = errors.New("failed to run controller") @@ -26,6 +27,7 @@ var noTLS bool var sshNoClientAuth bool var experimentalRPCV2 bool var noExperimentalRPCV2 bool +var experimentalPingInterval time.Duration func newRunCommand() *cobra.Command { cmd := &cobra.Command{ @@ -65,6 +67,10 @@ func newRunCommand() *cobra.Command { _ = cmd.PersistentFlags().MarkHidden("experimental-rpc-v2") cmd.PersistentFlags().BoolVar(&noExperimentalRPCV2, "no-experimental-rpc-v2", false, "disable experimental RPC v2 (https://github.com/cirruslabs/orchard/issues/235)") + cmd.PersistentFlags().DurationVar(&experimentalPingInterval, "experimental-ping-interval", 0, + "interval between WebSocket PING's sent by the controller to workers and clients, "+ + "useful when facing intermediate load balancers/proxies that have timeouts "+ + "smaller than the controller's default 30 second interval") return cmd } @@ -156,6 +162,14 @@ func runController(cmd *cobra.Command, args []string) (err error) { controllerOpts = append(controllerOpts, controller.WithExperimentalRPCV2()) } + if experimentalPingInterval != 0 { + if experimentalPingInterval < 5*time.Second { + return fmt.Errorf("--experimental-ping-interval's value cannot be less than 5 seconds") + } + + controllerOpts = append(controllerOpts, controller.WithPingInterval(experimentalPingInterval)) + } + controllerInstance, err := controller.New(controllerOpts...) if err != nil { return err diff --git a/internal/controller/api_rpc_portforward.go b/internal/controller/api_rpc_portforward.go index a91faca..af746cf 100644 --- a/internal/controller/api_rpc_portforward.go +++ b/internal/controller/api_rpc_portforward.go @@ -43,7 +43,7 @@ func (controller *Controller) rpcPortForward(ctx *gin.Context) responder.Respond case <-proxyCtx.Done(): // Do not close the WebSocket connection as it should be already closed by our rendezvous party return responder.Empty() - case <-time.After(30 * time.Second): + case <-time.After(controller.pingInterval): pingCtx, pingCtxCancel := context.WithTimeout(ctx, 5*time.Second) if err := wsConn.Ping(pingCtx); err != nil { diff --git a/internal/controller/api_rpc_watch.go b/internal/controller/api_rpc_watch.go index 474a73e..c46c162 100644 --- a/internal/controller/api_rpc_watch.go +++ b/internal/controller/api_rpc_watch.go @@ -78,7 +78,7 @@ func (controller *Controller) rpcWatch(ctx *gin.Context) responder.Responder { return controller.wsError(wsConn, websocket.StatusInternalError, "watch RPC", "failure to write the watch instruction", err) } - case <-time.After(30 * time.Second): + case <-time.After(controller.pingInterval): pingCtx, pingCtxCancel := context.WithTimeout(ctx, 5*time.Second) if err := wsConn.Ping(pingCtx); err != nil { diff --git a/internal/controller/api_vms_portforward.go b/internal/controller/api_vms_portforward.go index 0395879..1a94532 100644 --- a/internal/controller/api_vms_portforward.go +++ b/internal/controller/api_vms_portforward.go @@ -142,7 +142,7 @@ func (controller *Controller) portForward( } return responder.Empty() - case <-time.After(30 * time.Second): + case <-time.After(controller.pingInterval): pingCtx, pingCtxCancel := context.WithTimeout(ctx, 5*time.Second) if err := wsConn.Ping(pingCtx); err != nil { diff --git a/internal/controller/controller.go b/internal/controller/controller.go index e1fbf42..59538f7 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -60,6 +60,7 @@ type Controller struct { workerOfflineTimeout time.Duration maxWorkersPerLicense uint experimentalRPCV2 bool + pingInterval time.Duration sshListenAddr string sshSigner ssh.Signer @@ -75,6 +76,7 @@ func New(opts ...Option) (*Controller, error) { ipRendezvous: rendezvous.New[rendezvous.ResultWithErrorMessage[string]](), workerOfflineTimeout: 3 * time.Minute, maxWorkersPerLicense: maxWorkersPerDefaultLicense, + pingInterval: 30 * time.Second, } // Apply options diff --git a/internal/controller/option.go b/internal/controller/option.go index e49bb0c..23687a9 100644 --- a/internal/controller/option.go +++ b/internal/controller/option.go @@ -59,6 +59,12 @@ func WithExperimentalRPCV2() Option { } } +func WithPingInterval(pingInterval time.Duration) Option { + return func(controller *Controller) { + controller.pingInterval = pingInterval + } +} + func WithLogger(logger *zap.Logger) Option { return func(controller *Controller) { controller.logger = logger.Sugar()