Add option to disable workqueue bucket rate limiter (#4451)
This commit is contained in:
parent
012f1a5b23
commit
a401686bd5
|
|
@ -93,6 +93,11 @@ spec:
|
|||
{{- with .Values.flags.k8sClientRateLimiterBurst }}
|
||||
- "--k8s-client-rate-limiter-burst={{ . }}"
|
||||
{{- end }}
|
||||
{{- with .Values.flags.rateLimiter }}
|
||||
{{- with .name }}
|
||||
- "--workqueue-rate-limiter={{ . }}"
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
command:
|
||||
- "/manager"
|
||||
{{- with .Values.metrics }}
|
||||
|
|
|
|||
|
|
@ -136,6 +136,13 @@ flags:
|
|||
# excludeLabelPropagationPrefixes:
|
||||
# - "argocd.argoproj.io/instance"
|
||||
|
||||
## Workqueue rate limiter configuration.
|
||||
## By default, controller-runtime uses a combined rate limiter with both a per-item
|
||||
## exponential backoff and an overall token bucket (10 QPS, 100 bucket size).
|
||||
## Valid names: "bucket_rate_limiter" (default), "typed_rate_limiter" (per-item only, no global token bucket).
|
||||
# rateLimiter:
|
||||
# name: "bucket_rate_limiter"
|
||||
|
||||
# Overrides the default `.Release.Namespace` for all resources in this chart.
|
||||
namespaceOverride: ""
|
||||
|
||||
|
|
|
|||
|
|
@ -692,7 +692,7 @@ func (r *AutoscalingListenerReconciler) publishRunningListener(autoscalingListen
|
|||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (r *AutoscalingListenerReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
func (r *AutoscalingListenerReconciler) SetupWithManager(mgr ctrl.Manager, opts ...Option) error {
|
||||
labelBasedWatchFunc := func(_ context.Context, obj client.Object) []reconcile.Request {
|
||||
var requests []reconcile.Request
|
||||
labels := obj.GetLabels()
|
||||
|
|
@ -716,14 +716,16 @@ func (r *AutoscalingListenerReconciler) SetupWithManager(mgr ctrl.Manager) error
|
|||
return requests
|
||||
}
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&v1alpha1.AutoscalingListener{}).
|
||||
Owns(&corev1.Pod{}).
|
||||
Owns(&corev1.ServiceAccount{}).
|
||||
Watches(&rbacv1.Role{}, handler.EnqueueRequestsFromMapFunc(labelBasedWatchFunc)).
|
||||
Watches(&rbacv1.RoleBinding{}, handler.EnqueueRequestsFromMapFunc(labelBasedWatchFunc)).
|
||||
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
|
||||
Complete(r)
|
||||
return builderWithOptions(
|
||||
ctrl.NewControllerManagedBy(mgr).
|
||||
For(&v1alpha1.AutoscalingListener{}).
|
||||
Owns(&corev1.Pod{}).
|
||||
Owns(&corev1.ServiceAccount{}).
|
||||
Watches(&rbacv1.Role{}, handler.EnqueueRequestsFromMapFunc(labelBasedWatchFunc)).
|
||||
Watches(&rbacv1.RoleBinding{}, handler.EnqueueRequestsFromMapFunc(labelBasedWatchFunc)).
|
||||
WithEventFilter(predicate.ResourceVersionChangedPredicate{}),
|
||||
opts,
|
||||
).Complete(r)
|
||||
}
|
||||
|
||||
func listenerContainerStatus(pod *corev1.Pod) *corev1.ContainerStatus {
|
||||
|
|
|
|||
|
|
@ -762,25 +762,27 @@ func (r *AutoscalingRunnerSetReconciler) listEphemeralRunnerSets(ctx context.Con
|
|||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (r *AutoscalingRunnerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&v1alpha1.AutoscalingRunnerSet{}).
|
||||
Owns(&v1alpha1.EphemeralRunnerSet{}).
|
||||
Watches(&v1alpha1.AutoscalingListener{}, handler.EnqueueRequestsFromMapFunc(
|
||||
func(_ context.Context, o client.Object) []reconcile.Request {
|
||||
autoscalingListener := o.(*v1alpha1.AutoscalingListener)
|
||||
return []reconcile.Request{
|
||||
{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: autoscalingListener.Spec.AutoscalingRunnerSetNamespace,
|
||||
Name: autoscalingListener.Spec.AutoscalingRunnerSetName,
|
||||
func (r *AutoscalingRunnerSetReconciler) SetupWithManager(mgr ctrl.Manager, opts ...Option) error {
|
||||
return builderWithOptions(
|
||||
ctrl.NewControllerManagedBy(mgr).
|
||||
For(&v1alpha1.AutoscalingRunnerSet{}).
|
||||
Owns(&v1alpha1.EphemeralRunnerSet{}).
|
||||
Watches(&v1alpha1.AutoscalingListener{}, handler.EnqueueRequestsFromMapFunc(
|
||||
func(_ context.Context, o client.Object) []reconcile.Request {
|
||||
autoscalingListener := o.(*v1alpha1.AutoscalingListener)
|
||||
return []reconcile.Request{
|
||||
{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: autoscalingListener.Spec.AutoscalingRunnerSetNamespace,
|
||||
Name: autoscalingListener.Spec.AutoscalingRunnerSetName,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
)).
|
||||
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
|
||||
Complete(r)
|
||||
}
|
||||
},
|
||||
)).
|
||||
WithEventFilter(predicate.ResourceVersionChangedPredicate{}),
|
||||
opts,
|
||||
).Complete(r)
|
||||
}
|
||||
|
||||
type autoscalingRunnerSetFinalizerDependencyCleaner struct {
|
||||
|
|
|
|||
|
|
@ -522,12 +522,14 @@ func (r *EphemeralRunnerSetReconciler) deleteEphemeralRunnerWithActionsClient(ct
|
|||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (r *EphemeralRunnerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&v1alpha1.EphemeralRunnerSet{}).
|
||||
Owns(&v1alpha1.EphemeralRunner{}).
|
||||
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
|
||||
Complete(r)
|
||||
func (r *EphemeralRunnerSetReconciler) SetupWithManager(mgr ctrl.Manager, opts ...Option) error {
|
||||
return builderWithOptions(
|
||||
ctrl.NewControllerManagedBy(mgr).
|
||||
For(&v1alpha1.EphemeralRunnerSet{}).
|
||||
Owns(&v1alpha1.EphemeralRunner{}).
|
||||
WithEventFilter(predicate.ResourceVersionChangedPredicate{}),
|
||||
opts,
|
||||
).Complete(r)
|
||||
}
|
||||
|
||||
type ephemeralRunnerStepper struct {
|
||||
|
|
|
|||
|
|
@ -1,8 +1,10 @@
|
|||
package actionsgithubcom
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
)
|
||||
|
||||
// Options is the optional configuration for the controllers, which can be
|
||||
|
|
@ -37,6 +39,25 @@ func WithMaxConcurrentReconciles(n int) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithTypedRateLimiter sets the rate limiter for the controller's workqueue.
|
||||
//
|
||||
// By default, the controller-runtime uses
|
||||
// workqueue.DefaultTypedControllerRateLimiter[reconcile.Request], which combines
|
||||
// an exponential backoff per-item limiter with a token bucket overall limiter
|
||||
// (10 QPS, 100 bucket size). In large-scale environments with many runner
|
||||
// scale sets, the token bucket limiter can become a bottleneck for
|
||||
// reconciliation throughput.
|
||||
//
|
||||
// Use this option to override the default rate limiter, for example, to use
|
||||
// workqueue.DefaultTypedItemBasedRateLimiter[reconcile.Request], which removes
|
||||
// the overall token bucket constraint while keeping the per-item exponential
|
||||
// backoff.
|
||||
func WithTypedRateLimiter(rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) Option {
|
||||
return func(b *controller.Options) {
|
||||
b.RateLimiter = rateLimiter
|
||||
}
|
||||
}
|
||||
|
||||
// builderWithOptions applies the given options to the provided builder, if any.
|
||||
// This is a helper function to avoid the need to import the controller-runtime package in every reconciler source file
|
||||
// and the command package that creates the controller.
|
||||
|
|
|
|||
28
main.go
28
main.go
|
|
@ -39,10 +39,12 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime"
|
||||
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/webhook"
|
||||
// +kubebuilder:scaffold:imports
|
||||
)
|
||||
|
|
@ -110,6 +112,8 @@ func main() {
|
|||
|
||||
k8sClientRateLimiterQPS int
|
||||
k8sClientRateLimiterBurst int
|
||||
|
||||
workqueueRateLimiter string
|
||||
)
|
||||
var c github.Config
|
||||
err = envconfig.Process("github", &c)
|
||||
|
|
@ -155,6 +159,7 @@ func main() {
|
|||
flag.Var(&autoScalerImagePullSecrets, "auto-scaler-image-pull-secrets", "The default image-pull secret name for auto-scaler listener container.")
|
||||
flag.IntVar(&k8sClientRateLimiterQPS, "k8s-client-rate-limiter-qps", 20, "The QPS value of the K8s client rate limiter.")
|
||||
flag.IntVar(&k8sClientRateLimiterBurst, "k8s-client-rate-limiter-burst", 30, "The burst value of the K8s client rate limiter.")
|
||||
flag.StringVar(&workqueueRateLimiter, "workqueue-rate-limiter", "", `The workqueue rate limiter to use. Valid values are "bucket_rate_limiter" (default) and "typed_rate_limiter" (per-item only, no global token bucket).`)
|
||||
flag.Parse()
|
||||
|
||||
runnerPodDefaults.RunnerImagePullSecrets = runnerImagePullSecrets
|
||||
|
|
@ -293,6 +298,20 @@ func main() {
|
|||
|
||||
log.Info("Resource builder initializing")
|
||||
|
||||
var controllerOpts []actionsgithubcom.Option
|
||||
switch workqueueRateLimiter {
|
||||
case "typed_rate_limiter":
|
||||
log.Info("Using typed rate limiter (per-item only, no global token bucket)")
|
||||
controllerOpts = append(controllerOpts,
|
||||
actionsgithubcom.WithTypedRateLimiter(workqueue.DefaultTypedItemBasedRateLimiter[reconcile.Request]()),
|
||||
)
|
||||
case "bucket_rate_limiter", "":
|
||||
log.Info("Using default bucket rate limiter")
|
||||
default:
|
||||
log.Error(fmt.Errorf("unknown workqueue rate limiter: %s", workqueueRateLimiter), "invalid --workqueue-rate-limiter value")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = (&actionsgithubcom.AutoscalingRunnerSetReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Log: log.WithName("AutoscalingRunnerSet").WithValues("version", build.Version),
|
||||
|
|
@ -302,17 +321,18 @@ func main() {
|
|||
UpdateStrategy: actionsgithubcom.UpdateStrategy(updateStrategy),
|
||||
DefaultRunnerScaleSetListenerImagePullSecrets: autoScalerImagePullSecrets,
|
||||
ResourceBuilder: rb,
|
||||
}).SetupWithManager(mgr); err != nil {
|
||||
}).SetupWithManager(mgr, controllerOpts...); err != nil {
|
||||
log.Error(err, "unable to create controller", "controller", "AutoscalingRunnerSet")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
runnerOpts := append(controllerOpts, actionsgithubcom.WithMaxConcurrentReconciles(opts.RunnerMaxConcurrentReconciles))
|
||||
if err = (&actionsgithubcom.EphemeralRunnerReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Log: log.WithName("EphemeralRunner").WithValues("version", build.Version),
|
||||
Scheme: mgr.GetScheme(),
|
||||
ResourceBuilder: rb,
|
||||
}).SetupWithManager(mgr, actionsgithubcom.WithMaxConcurrentReconciles(opts.RunnerMaxConcurrentReconciles)); err != nil {
|
||||
}).SetupWithManager(mgr, runnerOpts...); err != nil {
|
||||
log.Error(err, "unable to create controller", "controller", "EphemeralRunner")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
|
@ -323,7 +343,7 @@ func main() {
|
|||
Scheme: mgr.GetScheme(),
|
||||
PublishMetrics: metricsAddr != "0",
|
||||
ResourceBuilder: rb,
|
||||
}).SetupWithManager(mgr); err != nil {
|
||||
}).SetupWithManager(mgr, controllerOpts...); err != nil {
|
||||
log.Error(err, "unable to create controller", "controller", "EphemeralRunnerSet")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
|
@ -335,7 +355,7 @@ func main() {
|
|||
ListenerMetricsAddr: listenerMetricsAddr,
|
||||
ListenerMetricsEndpoint: listenerMetricsEndpoint,
|
||||
ResourceBuilder: rb,
|
||||
}).SetupWithManager(mgr); err != nil {
|
||||
}).SetupWithManager(mgr, controllerOpts...); err != nil {
|
||||
log.Error(err, "unable to create controller", "controller", "AutoscalingListener")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue