From dabfeffe6674e89fe8f16e096cafdf3c155189d3 Mon Sep 17 00:00:00 2001 From: Cody Lee Date: Mon, 11 May 2026 17:43:44 -0500 Subject: [PATCH] fix(prometheus): serve scrapes from cached background poll (#1013) Decouples Prometheus scrape cadence from upstream UniFi API calls so a 429 backoff loop on the controller side no longer stalls /metrics. The output plugin now owns a 60s background poller (configurable) whose result is served from an in-memory cache. Concurrent /scrape requests for the same target are coalesced via singleflight to prevent a noisy scraper from multiplying upstream load. Adds two new metrics so operators can detect cache staleness and refresh failures independently: - unpoller_prometheus_cache_age_seconds - unpoller_prometheus_refresh_failures_total Background goroutine recovers from panics so a malformed input payload no longer silently kills refreshes. Co-Authored-By: Claude Opus 4.7 (1M context) --- examples/up.conf.example | 4 + examples/up.json.example | 3 +- examples/up.yaml.example | 1 + go.mod | 3 +- go.sum | 2 + pkg/promunifi/README.md | 28 +++ pkg/promunifi/cache_test.go | 347 ++++++++++++++++++++++++++++++++++++ pkg/promunifi/collector.go | 251 +++++++++++++++++++++++++- 8 files changed, 635 insertions(+), 4 deletions(-) create mode 100644 pkg/promunifi/cache_test.go diff --git a/examples/up.conf.example b/examples/up.conf.example index ca38bb8d..3991a4ca 100644 --- a/examples/up.conf.example +++ b/examples/up.conf.example @@ -35,6 +35,10 @@ report_errors = false ## Record data for disabled or down (unlinked) switch ports. dead_ports = false + # How often the background poller refreshes the cache served to /metrics. + # Decouples scrape cadence from UniFi API calls so 429 backoff loops no + # longer block /metrics. Default: 60s. Values below 15s are clamped to 15s. + interval = "60s" [influxdb] disable = false diff --git a/examples/up.json.example b/examples/up.json.example index e5bdebcc..aa63775a 100644 --- a/examples/up.json.example +++ b/examples/up.json.example @@ -11,7 +11,8 @@ "http_listen": "0.0.0.0:9130", "ssl_cert_path": "", "ssl_key_path": "", - "report_errors": false + "report_errors": false, + "interval": "60s" }, "influxdb": { diff --git a/examples/up.yaml.example b/examples/up.yaml.example index 1c8d3a0e..7cc02fd8 100644 --- a/examples/up.yaml.example +++ b/examples/up.yaml.example @@ -20,6 +20,7 @@ prometheus: ssl_cert_path: "" ssl_key_path: "" report_errors: false + interval: "60s" influxdb: disable: false diff --git a/go.mod b/go.mod index 9fa8a98d..6701afec 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( go.opentelemetry.io/otel/sdk v1.43.0 go.opentelemetry.io/otel/sdk/metric v1.43.0 golang.org/x/crypto v0.51.0 + golang.org/x/sync v0.20.0 golang.org/x/term v0.43.0 golift.io/cnfg v0.2.5 golift.io/cnfgfile v0.0.0-20240713024420-a5436d84eb48 @@ -57,7 +58,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/oapi-codegen/runtime v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/client_model v0.6.2 github.com/prometheus/procfs v0.16.1 // indirect golang.org/x/sys v0.44.0 // indirect google.golang.org/protobuf v1.36.11 // indirect diff --git a/go.sum b/go.sum index 8ff823be..b150d2ce 100644 --- a/go.sum +++ b/go.sum @@ -124,6 +124,8 @@ golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/promunifi/README.md b/pkg/promunifi/README.md index d0ca905d..a2993054 100644 --- a/pkg/promunifi/README.md +++ b/pkg/promunifi/README.md @@ -2,3 +2,31 @@ This package provides the interface to turn UniFi measurements into prometheus exported metrics. Requires the poller package for actual UniFi data collection. + +## Scrape cache + +Prometheus scrapes are served from an in-memory cache refreshed by a background +poller on a fixed interval. This decouples the scrape cadence from the UniFi +API call cadence: scrapes always return immediately, and upstream backpressure +(e.g. `429 Too Many Requests`) no longer stalls `/metrics`. + +Config (TOML): + +```toml +[prometheus] + http_listen = "0.0.0.0:9130" + # How often the background poller refreshes the cache served to /metrics. + # Default: 60s. Values below 15s are clamped to 15s. + interval = "60s" +``` + +Environment variable: `UP_PROMETHEUS_INTERVAL=60s`. + +On poll error the last successful snapshot is preserved, so a transient 429 no +longer empties `/metrics`. To monitor cache staleness, scrape the +`unpoller_prometheus_cache_age_seconds` gauge — it reports seconds since the +last successful background refresh, or `-1` if no refresh has succeeded yet. + +The `/scrape` endpoint (per-target dynamic scrapes) still fetches live but +coalesces concurrent requests for the same target via `singleflight`, so a +noisy scraper cannot multiply upstream load. diff --git a/pkg/promunifi/cache_test.go b/pkg/promunifi/cache_test.go new file mode 100644 index 00000000..c84e3bd5 --- /dev/null +++ b/pkg/promunifi/cache_test.go @@ -0,0 +1,347 @@ +//nolint:testpackage // white-box tests exercise unexported cache + fetch routing. +package promunifi + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + dto "github.com/prometheus/client_model/go" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/unpoller/unpoller/pkg/poller" + "golift.io/cnfg" +) + +// stubCollect is a minimal poller.Collect implementation for cache tests. +type stubCollect struct { + mu sync.Mutex + calls atomic.Int64 + delay time.Duration + metrics *poller.Metrics + err error + panicMsg string + errLogs atomic.Int64 +} + +func (s *stubCollect) Metrics(_ *poller.Filter) (*poller.Metrics, error) { + s.calls.Add(1) + + s.mu.Lock() + delay := s.delay + m := s.metrics + err := s.err + panicMsg := s.panicMsg + s.mu.Unlock() + + if panicMsg != "" { + panic(panicMsg) + } + + if delay > 0 { + time.Sleep(delay) + } + + return m, err +} + +func (s *stubCollect) Events(_ *poller.Filter) (*poller.Events, error) { return &poller.Events{}, nil } +func (s *stubCollect) Poller() poller.Poller { return poller.Poller{} } +func (s *stubCollect) Inputs() []string { return nil } +func (s *stubCollect) Outputs() []string { return nil } +func (s *stubCollect) Logf(string, ...any) {} +func (s *stubCollect) LogErrorf(string, ...any) { s.errLogs.Add(1) } +func (s *stubCollect) LogDebugf(string, ...any) {} + +func TestMetricsCacheSetKeepsLastGoodOnError(t *testing.T) { + t.Parallel() + + c := &metricsCache{} + good := &poller.Metrics{} + + c.set(good, nil) + c.set(nil, errors.New("transient 429")) + + m, fetchedAt, err := c.get() + require.NotNil(t, m, "cache should retain last good snapshot on error") + assert.Same(t, good, m) + assert.False(t, fetchedAt.IsZero()) + assert.EqualError(t, err, "transient 429") +} + +func TestMetricsCacheSetReplacesOnSuccess(t *testing.T) { + t.Parallel() + + c := &metricsCache{} + first := &poller.Metrics{} + second := &poller.Metrics{} + + c.set(first, nil) + c.set(second, nil) + + m, _, err := c.get() + assert.NoError(t, err) + assert.Same(t, second, m) +} + +func TestFetchMetricsServesCacheForGlobalScrape(t *testing.T) { + t.Parallel() + + stub := &stubCollect{} + cached := &poller.Metrics{} + + u := &promUnifi{Config: &Config{}, Collector: stub, cache: &metricsCache{}} + u.cache.set(cached, nil) + + got, err := u.fetchMetrics(nil) + require.NoError(t, err) + assert.Same(t, cached, got) + assert.Zero(t, stub.calls.Load(), "global scrape must not hit upstream when cache has data") +} + +func TestFetchMetricsReturnsErrorWhenCacheEmpty(t *testing.T) { + t.Parallel() + + stub := &stubCollect{} + u := &promUnifi{Config: &Config{}, Collector: stub, cache: &metricsCache{}} + + got, err := u.fetchMetrics(nil) + assert.Nil(t, got) + assert.Error(t, err) +} + +// TestFetchMetricsBypassesNilCache covers the nil-cache defensive branch. +// Run() always initializes the cache in production; this branch exists only +// for tests that exercise fetchMetrics directly without invoking Run. +func TestFetchMetricsBypassesNilCache(t *testing.T) { + t.Parallel() + + stub := &stubCollect{metrics: &poller.Metrics{}} + u := &promUnifi{Config: &Config{}, Collector: stub} + + got, err := u.fetchMetrics(nil) + require.NoError(t, err) + assert.Same(t, stub.metrics, got) + assert.EqualValues(t, 1, stub.calls.Load()) +} + +func TestFetchMetricsSingleflightCoalescesScrapes(t *testing.T) { + t.Parallel() + + stub := &stubCollect{metrics: &poller.Metrics{}, delay: 50 * time.Millisecond} + u := &promUnifi{Config: &Config{}, Collector: stub} + + const concurrent = 20 + + var wg sync.WaitGroup + + wg.Add(concurrent) + + for i := 0; i < concurrent; i++ { + go func() { + defer wg.Done() + + _, _ = u.fetchMetrics(&poller.Filter{Path: "https://controller.example/"}) + }() + } + + wg.Wait() + + assert.LessOrEqual(t, stub.calls.Load(), int64(2), + "singleflight should coalesce concurrent scrapes to ~1 upstream call") +} + +func TestFetchMetricsScrapeKeyedByFilterPath(t *testing.T) { + t.Parallel() + + stub := &stubCollect{metrics: &poller.Metrics{}, delay: 50 * time.Millisecond} + u := &promUnifi{Config: &Config{}, Collector: stub} + + var wg sync.WaitGroup + + wg.Add(2) + + for i, path := range []string{"https://a.example/", "https://b.example/"} { + i, path := i, path + + go func() { + defer wg.Done() + + _, _ = u.fetchMetrics(&poller.Filter{Path: path, Name: fmt.Sprintf("t%d", i)}) + }() + } + + wg.Wait() + + assert.EqualValues(t, 2, stub.calls.Load(), + "distinct targets should not be coalesced") +} + +func TestNormalizeInterval(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + in time.Duration + want time.Duration + }{ + {"unset uses default", 0, defaultInterval}, + {"negative uses default", -1, defaultInterval}, + {"below minimum clamps", time.Second, minimumInterval}, + {"exact minimum unchanged", minimumInterval, minimumInterval}, + {"above minimum unchanged", 2 * time.Minute, 2 * time.Minute}, + } + + for _, tc := range cases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + u := &promUnifi{Config: &Config{Interval: cnfg.Duration{Duration: tc.in}}} + u.normalizeInterval() + assert.Equal(t, tc.want, u.Interval.Duration) + }) + } +} + +func TestRefreshCachePreservesLastGoodAcrossError(t *testing.T) { + t.Parallel() + + good := &poller.Metrics{} + stub := &stubCollect{metrics: good} + u := &promUnifi{Config: &Config{}, Collector: stub, cache: &metricsCache{}} + + u.refreshCache() + stub.mu.Lock() + stub.metrics = nil + stub.err = errors.New("429 too many requests") + stub.mu.Unlock() + u.refreshCache() + + m, fetchedAt, err := u.cache.get() + require.Same(t, good, m, "good snapshot must survive a failed refresh") + assert.False(t, fetchedAt.IsZero()) + assert.EqualError(t, err, "429 too many requests") + assert.EqualValues(t, 1, stub.errLogs.Load(), "failed refresh should log via LogErrorf") +} + +func TestCacheAgeGaugeReportsUnpopulated(t *testing.T) { + t.Parallel() + + u := &promUnifi{Config: &Config{Namespace: "test"}, cache: &metricsCache{}} + gauge := u.cacheAgeGauge() + + require.Equal(t, -1.0, currentGaugeValue(t, gauge), + "unpopulated cache should report -1 sentinel") +} + +func TestCacheAgeGaugeReportsFreshness(t *testing.T) { + t.Parallel() + + u := &promUnifi{Config: &Config{Namespace: "test"}, cache: &metricsCache{}} + u.cache.set(&poller.Metrics{}, nil) + + gauge := u.cacheAgeGauge() + age := currentGaugeValue(t, gauge) + assert.GreaterOrEqual(t, age, 0.0) + assert.Less(t, age, 5.0, "freshly populated cache should report a small age") +} + +func TestSafeRefreshRecoversFromPanic(t *testing.T) { + t.Parallel() + + good := &poller.Metrics{} + stub := &stubCollect{metrics: good} + u := &promUnifi{Config: &Config{}, Collector: stub, cache: &metricsCache{}} + + // First a successful refresh so we have a good snapshot to preserve. + u.safeRefresh() + stub.mu.Lock() + stub.metrics = nil + stub.panicMsg = "simulated input plugin panic" + stub.mu.Unlock() + + require.NotPanics(t, u.safeRefresh, "safeRefresh must swallow upstream panics") + + m, fetchedAt, err := u.cache.get() + assert.Same(t, good, m, "panic must not blank out the cache") + assert.False(t, fetchedAt.IsZero()) + assert.ErrorContains(t, err, "refresh panicked") + assert.EqualValues(t, 1, stub.errLogs.Load(), "panic should be logged once") +} + +func TestRefreshFailuresCounterIncrementsOnErrorAndPanic(t *testing.T) { + t.Parallel() + + stub := &stubCollect{err: errors.New("upstream down")} + counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_refresh_failures_total"}) + u := &promUnifi{ + Config: &Config{}, + Collector: stub, + cache: &metricsCache{}, + refreshFailures: counter, + } + + u.refreshCache() + u.refreshCache() + assert.Equal(t, 2.0, currentCounterValue(t, counter), "error path should increment counter") + + stub.mu.Lock() + stub.err = nil + stub.panicMsg = "boom" + stub.mu.Unlock() + u.safeRefresh() + assert.Equal(t, 3.0, currentCounterValue(t, counter), "panic path should increment counter") +} + +func TestFetchMetricsScrapeReturnsUpstreamError(t *testing.T) { + t.Parallel() + + stub := &stubCollect{err: errors.New("controller unreachable")} + u := &promUnifi{Config: &Config{}, Collector: stub} + + got, err := u.fetchMetrics(&poller.Filter{Path: "https://x.example/"}) + assert.Nil(t, got) + assert.EqualError(t, err, "controller unreachable") +} + +// currentGaugeValue reads the current value out of a GaugeFunc by collecting +// it through Prometheus' standard interface — avoids depending on internal +// client_golang APIs. +func currentGaugeValue(t *testing.T, c prometheus.Collector) float64 { + t.Helper() + + ch := make(chan prometheus.Metric, 1) + c.Collect(ch) + close(ch) + + m := <-ch + + var pb dto.Metric + + require.NoError(t, m.Write(&pb)) + + return pb.GetGauge().GetValue() +} + +func currentCounterValue(t *testing.T, c prometheus.Collector) float64 { + t.Helper() + + ch := make(chan prometheus.Metric, 1) + c.Collect(ch) + close(ch) + + m := <-ch + + var pb dto.Metric + + require.NoError(t, m.Write(&pb)) + + return pb.GetCounter().GetValue() +} diff --git a/pkg/promunifi/collector.go b/pkg/promunifi/collector.go index fec7061c..82f158e2 100644 --- a/pkg/promunifi/collector.go +++ b/pkg/promunifi/collector.go @@ -6,6 +6,7 @@ import ( "net" "net/http" "reflect" + "runtime/debug" "strings" "sync" "time" @@ -18,6 +19,8 @@ import ( "github.com/unpoller/unifi/v5" "github.com/unpoller/unpoller/pkg/poller" "github.com/unpoller/unpoller/pkg/webserver" + "golang.org/x/sync/singleflight" + "golift.io/cnfg" "golift.io/version" ) @@ -28,11 +31,18 @@ const ( // channel buffer, fits at least one batch. defaultBuffer = 50 defaultHTTPListen = "0.0.0.0:9130" + // defaultInterval matches the typical UniFi Retry-After window; gives the + // controller time to recover between background polls without starving + // scrapes for fresh data. + defaultInterval = 60 * time.Second + minimumInterval = 15 * time.Second // simply fewer letters. counter = prometheus.CounterValue gauge = prometheus.GaugeValue ) +// ErrMetricFetchFailed is reported as an invalid metric description when a +// scrape cannot obtain data from the configured collector. var ErrMetricFetchFailed = fmt.Errorf("metric fetch failed") type promUnifi struct { @@ -76,12 +86,60 @@ type promUnifi struct { PendingDevice *pendingDevice Country *country // controllerUp tracks per-controller poll success (1) or failure (0). + // Reflects the most recent background poll — when /metrics is served from + // a stale cache, controllerUp lags real-time health; pair with + // unpoller_prometheus_cache_age_seconds for staleness signals. controllerUp *prometheus.GaugeVec + // refreshFailures counts background refresh failures since process start + // so operators can alert on failure rate independently of cache staleness. + refreshFailures prometheus.Counter + // cache holds the last successful metrics snapshot from the background + // poller. Run() always initializes it; the nil-guards in cache-using + // methods exist only for tests that exercise those methods directly + // without invoking Run(). + cache *metricsCache + // scrapeFlight coalesces concurrent /scrape requests targeting the same + // controller URL so a noisy scraper can't multiply upstream load. + scrapeFlight singleflight.Group // This interface is passed to the Collect() method. The Collect method uses // this interface to retrieve the latest UniFi measurements and export them. Collector poller.Collect } +// metricsCache stores the latest background-poller snapshot. Reads and writes +// are serialized by an RWMutex so scrapes observe a consistent (metrics, +// fetchedAt, lastErr) triple while the background ticker refreshes in place. +// Failed refreshes preserve the prior good snapshot so /metrics never blanks +// out under transient 429 backoffs. +type metricsCache struct { + mu sync.RWMutex + metrics *poller.Metrics + fetchedAt time.Time + lastErr error +} + +func (c *metricsCache) get() (*poller.Metrics, time.Time, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.metrics, c.fetchedAt, c.lastErr +} + +func (c *metricsCache) set(m *poller.Metrics, err error) { + c.mu.Lock() + defer c.mu.Unlock() + + c.lastErr = err + // Keep the last good snapshot on error so /metrics never blanks out + // during transient upstream failures (e.g. 429 backoff). Also reject + // (nil, nil) — a "successful" empty fetch would otherwise leave us with + // nil metrics but a fresh fetchedAt, fooling the cache-age gauge. + if err == nil && m != nil { + c.metrics = m + c.fetchedAt = time.Now() + } +} + var _ poller.OutputPlugin = &promUnifi{} // Config is the input (config file) data used to initialize this output plugin. @@ -104,6 +162,13 @@ type Config struct { Disable bool `json:"disable" toml:"disable" xml:"disable" yaml:"disable"` // Save data for dead ports? ie. ports that are down or disabled. DeadPorts bool `json:"dead_ports" toml:"dead_ports" xml:"dead_ports" yaml:"dead_ports"` + // Interval controls how often the background poller refreshes the cached + // metrics that /metrics scrapes are served from. Decouples Prometheus + // scrape cadence from upstream UniFi API calls so 429 backoff loops cannot + // stall scrapes. Defaults to defaultInterval; values below minimumInterval + // are clamped up. Must be > 0 before use; normalizeInterval applies the + // default and floor during Run(). + Interval cnfg.Duration `json:"interval" toml:"interval" xml:"interval" yaml:"interval"` } type metric struct { @@ -155,6 +220,8 @@ func init() { // nolint: gochecknoinits }) } +// DebugOutput validates the Prometheus output configuration: address format +// and (outside of health-check mode) bindability of the listen port. func (u *promUnifi) DebugOutput() (bool, error) { if u == nil { return true, nil @@ -190,6 +257,7 @@ func (u *promUnifi) DebugOutput() (bool, error) { return true, nil } +// Enabled reports whether this output plugin is configured and active. func (u *promUnifi) Enabled() bool { if u == nil { return false @@ -227,6 +295,8 @@ func (u *promUnifi) Run(c poller.Collect) error { u.Buffer = defaultBuffer } + u.normalizeInterval() + u.Client = descClient(u.Namespace + "_client_") u.Device = descDevice(u.Namespace + "_device_") // stats for all device types. u.UAP = descUAP(u.Namespace + "_device_") @@ -267,8 +337,14 @@ func (u *promUnifi) Run(c poller.Collect) error { u.Country = descCountry(u.Namespace + "_") u.controllerUp = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: u.Namespace + "_controller_up", - Help: "Whether the last poll of the UniFi controller succeeded (1) or failed (0).", + Help: "Whether the most recent background poll of the UniFi controller succeeded (1) or failed (0). " + + "Reflects the last poll attempt, not real-time health; pair with " + + u.Namespace + "_prometheus_cache_age_seconds for liveness signals when scrapes are served from a stale cache.", }, []string{"source"}) + u.refreshFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Name: u.Namespace + "_prometheus_refresh_failures_total", + Help: "Total background metrics refresh failures since process start.", + }) mux := http.NewServeMux() promver.Version = version.Version @@ -278,7 +354,19 @@ func (u *promUnifi) Run(c poller.Collect) error { webserver.UpdateOutput(&webserver.Output{Name: PluginName, Config: u.Config}) prometheus.MustRegister(collectors.NewBuildInfoCollector()) prometheus.MustRegister(u.controllerUp) + prometheus.MustRegister(u.refreshFailures) prometheus.MustRegister(u) + + u.cache = &metricsCache{} + prometheus.MustRegister(u.cacheAgeGauge()) + // safeRefresh (not refreshCache) because a panic in the initial upstream + // fetch must not kill Run() before the HTTP listener starts. + u.safeRefresh() + + go u.backgroundPoll() + + u.Logf("Prometheus scrape cache enabled, refresh interval: %v", u.Interval.Duration) + mux.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}, )) @@ -297,6 +385,163 @@ func (u *promUnifi) Run(c poller.Collect) error { } } +// normalizeInterval applies defaults and the minimum-interval floor to the +// configured scrape cache refresh interval. Values <= 0 use the default. +func (u *promUnifi) normalizeInterval() { + if u.Interval.Duration <= 0 { + u.Interval.Duration = defaultInterval + + return + } + + if u.Interval.Duration < minimumInterval { + u.Logf("Prometheus interval %v is below minimum %v; clamping to minimum", + u.Interval.Duration, minimumInterval) + + u.Interval.Duration = minimumInterval + } +} + +// backgroundPoll runs forever, refreshing the metrics cache on the configured +// interval. Returns immediately if the cache is not configured. A panic in +// upstream collection is logged and the loop continues so one bad payload +// doesn't silently stop refreshes (operator would only see cache_age climb). +func (u *promUnifi) backgroundPoll() { + if u.cache == nil { + return + } + + ticker := time.NewTicker(u.Interval.Duration) + defer ticker.Stop() + + for range ticker.C { + u.safeRefresh() + } +} + +// safeRefresh wraps refreshCache with a recover so a panic in an input +// plugin doesn't kill the background poller. The panic message and stack +// trace are logged separately so log aggregators (Sentry, Datadog, Loki) +// group panics by their headline rather than treating each stack line as +// an independent event. +func (u *promUnifi) safeRefresh() { + defer func() { + r := recover() + if r == nil { + return + } + + u.LogErrorf("background metrics refresh panicked; continuing: %v", r) + u.LogDebugf("panic stack:\n%s", debug.Stack()) + + if u.cache != nil { + u.cache.set(nil, fmt.Errorf("refresh panicked: %v", r)) + } + + if u.refreshFailures != nil { + u.refreshFailures.Inc() + } + }() + + u.refreshCache() +} + +// refreshCache polls upstream once and updates the cache. On error the last +// successful snapshot is preserved so scrapes keep returning data during +// transient upstream failures (e.g. 429 backoff loops). Failures are also +// counted in refreshFailures so operators can alert on failure rate +// independently of cache staleness. +// +// The cache.set call happens before logging and counter increment so the +// cache update is the most-protected statement — if anything in the +// post-set bookkeeping ever panics, safeRefresh's recover still sees a +// correctly updated cache. +func (u *promUnifi) refreshCache() { + if u.cache == nil { + return + } + + m, err := u.Collector.Metrics(nil) + u.cache.set(m, err) + + if err != nil { + u.LogErrorf("background metrics refresh failed (serving last good snapshot): %v", err) + + if u.refreshFailures != nil { + u.refreshFailures.Inc() + } + } +} + +// cacheAgeGauge returns a GaugeFunc reporting seconds since the last +// successful background refresh. Returns -1 if no refresh has succeeded yet. +func (u *promUnifi) cacheAgeGauge() prometheus.Collector { + return prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: u.Namespace + "_prometheus_cache_age_seconds", + Help: "Seconds since the last successful background metrics refresh. -1 means no refresh has succeeded yet.", + }, + func() float64 { + if u.cache == nil { + return -1 + } + + _, fetchedAt, _ := u.cache.get() + if fetchedAt.IsZero() { + return -1 + } + + return time.Since(fetchedAt).Seconds() + }, + ) +} + +// fetchMetrics returns the metrics for a scrape, using the cache for global +// /metrics scrapes and singleflight-coalesced live calls for per-target +// /scrape requests. +func (u *promUnifi) fetchMetrics(filter *poller.Filter) (*poller.Metrics, error) { + if filter == nil { + if u.cache == nil { + return u.Collector.Metrics(nil) + } + + m, _, err := u.cache.get() + if m != nil { + // Serve cached data even if the most recent refresh errored. + return m, nil + } + + if err != nil { + return nil, err + } + + return nil, fmt.Errorf("metrics cache not yet populated") + } + + // /scrape path: coalesce concurrent scrapes for the same target so a + // noisy scraper can't multiply upstream API load. + key := filter.Path + if key == "" { + key = filter.Name + } + + result, err, _ := u.scrapeFlight.Do(key, func() (any, error) { + return u.Collector.Metrics(filter) + }) + if err != nil { + return nil, err + } + + // Strict assertion: silently dropping a wrong-type result would turn an + // upstream regression into an empty 200 OK scrape with no log entry. + m, ok := result.(*poller.Metrics) + if !ok { + return nil, fmt.Errorf("singleflight returned unexpected type %T", result) + } + + return m, nil +} + // ScrapeHandler allows prometheus to scrape a single source, instead of all sources. func (u *promUnifi) ScrapeHandler(w http.ResponseWriter, r *http.Request) { t := &target{u: u, Filter: &poller.Filter{ @@ -339,6 +584,8 @@ func (u *promUnifi) ScrapeHandler(w http.ResponseWriter, r *http.Request) { ).ServeHTTP(w, r) } +// DefaultHandler serves the HTTP root with a simple liveness response naming +// the application. func (u *promUnifi) DefaultHandler(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(poller.AppName + "\n")) @@ -396,7 +643,7 @@ func (u *promUnifi) collect(ch chan<- prometheus.Metric, filter *poller.Filter) } defer r.close() - r.Metrics, err = u.Collector.Metrics(filter) + r.Metrics, err = u.fetchMetrics(filter) r.Fetch = time.Since(r.Start) if err != nil {