fix(prometheus): serve scrapes from cached background poll (#1013)
Decouples Prometheus scrape cadence from upstream UniFi API calls so a 429 backoff loop on the controller side no longer stalls /metrics. The output plugin now owns a 60s background poller (configurable) whose result is served from an in-memory cache. Concurrent /scrape requests for the same target are coalesced via singleflight to prevent a noisy scraper from multiplying upstream load. Adds two new metrics so operators can detect cache staleness and refresh failures independently: - unpoller_prometheus_cache_age_seconds - unpoller_prometheus_refresh_failures_total Background goroutine recovers from panics so a malformed input payload no longer silently kills refreshes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
0b83c6ad19
commit
dabfeffe66
|
|
@ -35,6 +35,10 @@
|
|||
report_errors = false
|
||||
## Record data for disabled or down (unlinked) switch ports.
|
||||
dead_ports = false
|
||||
# How often the background poller refreshes the cache served to /metrics.
|
||||
# Decouples scrape cadence from UniFi API calls so 429 backoff loops no
|
||||
# longer block /metrics. Default: 60s. Values below 15s are clamped to 15s.
|
||||
interval = "60s"
|
||||
|
||||
[influxdb]
|
||||
disable = false
|
||||
|
|
|
|||
|
|
@ -11,7 +11,8 @@
|
|||
"http_listen": "0.0.0.0:9130",
|
||||
"ssl_cert_path": "",
|
||||
"ssl_key_path": "",
|
||||
"report_errors": false
|
||||
"report_errors": false,
|
||||
"interval": "60s"
|
||||
},
|
||||
|
||||
"influxdb": {
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ prometheus:
|
|||
ssl_cert_path: ""
|
||||
ssl_key_path: ""
|
||||
report_errors: false
|
||||
interval: "60s"
|
||||
|
||||
influxdb:
|
||||
disable: false
|
||||
|
|
|
|||
3
go.mod
3
go.mod
|
|
@ -20,6 +20,7 @@ require (
|
|||
go.opentelemetry.io/otel/sdk v1.43.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.43.0
|
||||
golang.org/x/crypto v0.51.0
|
||||
golang.org/x/sync v0.20.0
|
||||
golang.org/x/term v0.43.0
|
||||
golift.io/cnfg v0.2.5
|
||||
golift.io/cnfgfile v0.0.0-20240713024420-a5436d84eb48
|
||||
|
|
@ -57,7 +58,7 @@ require (
|
|||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/oapi-codegen/runtime v1.1.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/client_model v0.6.2
|
||||
github.com/prometheus/procfs v0.16.1 // indirect
|
||||
golang.org/x/sys v0.44.0 // indirect
|
||||
google.golang.org/protobuf v1.36.11 // indirect
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -124,6 +124,8 @@ golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
|
|||
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
|
|
|||
|
|
@ -2,3 +2,31 @@
|
|||
|
||||
This package provides the interface to turn UniFi measurements into prometheus
|
||||
exported metrics. Requires the poller package for actual UniFi data collection.
|
||||
|
||||
## Scrape cache
|
||||
|
||||
Prometheus scrapes are served from an in-memory cache refreshed by a background
|
||||
poller on a fixed interval. This decouples the scrape cadence from the UniFi
|
||||
API call cadence: scrapes always return immediately, and upstream backpressure
|
||||
(e.g. `429 Too Many Requests`) no longer stalls `/metrics`.
|
||||
|
||||
Config (TOML):
|
||||
|
||||
```toml
|
||||
[prometheus]
|
||||
http_listen = "0.0.0.0:9130"
|
||||
# How often the background poller refreshes the cache served to /metrics.
|
||||
# Default: 60s. Values below 15s are clamped to 15s.
|
||||
interval = "60s"
|
||||
```
|
||||
|
||||
Environment variable: `UP_PROMETHEUS_INTERVAL=60s`.
|
||||
|
||||
On poll error the last successful snapshot is preserved, so a transient 429 no
|
||||
longer empties `/metrics`. To monitor cache staleness, scrape the
|
||||
`unpoller_prometheus_cache_age_seconds` gauge — it reports seconds since the
|
||||
last successful background refresh, or `-1` if no refresh has succeeded yet.
|
||||
|
||||
The `/scrape` endpoint (per-target dynamic scrapes) still fetches live but
|
||||
coalesces concurrent requests for the same target via `singleflight`, so a
|
||||
noisy scraper cannot multiply upstream load.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,347 @@
|
|||
//nolint:testpackage // white-box tests exercise unexported cache + fetch routing.
|
||||
package promunifi
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/unpoller/unpoller/pkg/poller"
|
||||
"golift.io/cnfg"
|
||||
)
|
||||
|
||||
// stubCollect is a minimal poller.Collect implementation for cache tests.
|
||||
type stubCollect struct {
|
||||
mu sync.Mutex
|
||||
calls atomic.Int64
|
||||
delay time.Duration
|
||||
metrics *poller.Metrics
|
||||
err error
|
||||
panicMsg string
|
||||
errLogs atomic.Int64
|
||||
}
|
||||
|
||||
func (s *stubCollect) Metrics(_ *poller.Filter) (*poller.Metrics, error) {
|
||||
s.calls.Add(1)
|
||||
|
||||
s.mu.Lock()
|
||||
delay := s.delay
|
||||
m := s.metrics
|
||||
err := s.err
|
||||
panicMsg := s.panicMsg
|
||||
s.mu.Unlock()
|
||||
|
||||
if panicMsg != "" {
|
||||
panic(panicMsg)
|
||||
}
|
||||
|
||||
if delay > 0 {
|
||||
time.Sleep(delay)
|
||||
}
|
||||
|
||||
return m, err
|
||||
}
|
||||
|
||||
func (s *stubCollect) Events(_ *poller.Filter) (*poller.Events, error) { return &poller.Events{}, nil }
|
||||
func (s *stubCollect) Poller() poller.Poller { return poller.Poller{} }
|
||||
func (s *stubCollect) Inputs() []string { return nil }
|
||||
func (s *stubCollect) Outputs() []string { return nil }
|
||||
func (s *stubCollect) Logf(string, ...any) {}
|
||||
func (s *stubCollect) LogErrorf(string, ...any) { s.errLogs.Add(1) }
|
||||
func (s *stubCollect) LogDebugf(string, ...any) {}
|
||||
|
||||
func TestMetricsCacheSetKeepsLastGoodOnError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c := &metricsCache{}
|
||||
good := &poller.Metrics{}
|
||||
|
||||
c.set(good, nil)
|
||||
c.set(nil, errors.New("transient 429"))
|
||||
|
||||
m, fetchedAt, err := c.get()
|
||||
require.NotNil(t, m, "cache should retain last good snapshot on error")
|
||||
assert.Same(t, good, m)
|
||||
assert.False(t, fetchedAt.IsZero())
|
||||
assert.EqualError(t, err, "transient 429")
|
||||
}
|
||||
|
||||
func TestMetricsCacheSetReplacesOnSuccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c := &metricsCache{}
|
||||
first := &poller.Metrics{}
|
||||
second := &poller.Metrics{}
|
||||
|
||||
c.set(first, nil)
|
||||
c.set(second, nil)
|
||||
|
||||
m, _, err := c.get()
|
||||
assert.NoError(t, err)
|
||||
assert.Same(t, second, m)
|
||||
}
|
||||
|
||||
func TestFetchMetricsServesCacheForGlobalScrape(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stub := &stubCollect{}
|
||||
cached := &poller.Metrics{}
|
||||
|
||||
u := &promUnifi{Config: &Config{}, Collector: stub, cache: &metricsCache{}}
|
||||
u.cache.set(cached, nil)
|
||||
|
||||
got, err := u.fetchMetrics(nil)
|
||||
require.NoError(t, err)
|
||||
assert.Same(t, cached, got)
|
||||
assert.Zero(t, stub.calls.Load(), "global scrape must not hit upstream when cache has data")
|
||||
}
|
||||
|
||||
func TestFetchMetricsReturnsErrorWhenCacheEmpty(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stub := &stubCollect{}
|
||||
u := &promUnifi{Config: &Config{}, Collector: stub, cache: &metricsCache{}}
|
||||
|
||||
got, err := u.fetchMetrics(nil)
|
||||
assert.Nil(t, got)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
// TestFetchMetricsBypassesNilCache covers the nil-cache defensive branch.
|
||||
// Run() always initializes the cache in production; this branch exists only
|
||||
// for tests that exercise fetchMetrics directly without invoking Run.
|
||||
func TestFetchMetricsBypassesNilCache(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stub := &stubCollect{metrics: &poller.Metrics{}}
|
||||
u := &promUnifi{Config: &Config{}, Collector: stub}
|
||||
|
||||
got, err := u.fetchMetrics(nil)
|
||||
require.NoError(t, err)
|
||||
assert.Same(t, stub.metrics, got)
|
||||
assert.EqualValues(t, 1, stub.calls.Load())
|
||||
}
|
||||
|
||||
func TestFetchMetricsSingleflightCoalescesScrapes(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stub := &stubCollect{metrics: &poller.Metrics{}, delay: 50 * time.Millisecond}
|
||||
u := &promUnifi{Config: &Config{}, Collector: stub}
|
||||
|
||||
const concurrent = 20
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(concurrent)
|
||||
|
||||
for i := 0; i < concurrent; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
_, _ = u.fetchMetrics(&poller.Filter{Path: "https://controller.example/"})
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
assert.LessOrEqual(t, stub.calls.Load(), int64(2),
|
||||
"singleflight should coalesce concurrent scrapes to ~1 upstream call")
|
||||
}
|
||||
|
||||
func TestFetchMetricsScrapeKeyedByFilterPath(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stub := &stubCollect{metrics: &poller.Metrics{}, delay: 50 * time.Millisecond}
|
||||
u := &promUnifi{Config: &Config{}, Collector: stub}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
for i, path := range []string{"https://a.example/", "https://b.example/"} {
|
||||
i, path := i, path
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
_, _ = u.fetchMetrics(&poller.Filter{Path: path, Name: fmt.Sprintf("t%d", i)})
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
assert.EqualValues(t, 2, stub.calls.Load(),
|
||||
"distinct targets should not be coalesced")
|
||||
}
|
||||
|
||||
func TestNormalizeInterval(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
in time.Duration
|
||||
want time.Duration
|
||||
}{
|
||||
{"unset uses default", 0, defaultInterval},
|
||||
{"negative uses default", -1, defaultInterval},
|
||||
{"below minimum clamps", time.Second, minimumInterval},
|
||||
{"exact minimum unchanged", minimumInterval, minimumInterval},
|
||||
{"above minimum unchanged", 2 * time.Minute, 2 * time.Minute},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
u := &promUnifi{Config: &Config{Interval: cnfg.Duration{Duration: tc.in}}}
|
||||
u.normalizeInterval()
|
||||
assert.Equal(t, tc.want, u.Interval.Duration)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefreshCachePreservesLastGoodAcrossError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
good := &poller.Metrics{}
|
||||
stub := &stubCollect{metrics: good}
|
||||
u := &promUnifi{Config: &Config{}, Collector: stub, cache: &metricsCache{}}
|
||||
|
||||
u.refreshCache()
|
||||
stub.mu.Lock()
|
||||
stub.metrics = nil
|
||||
stub.err = errors.New("429 too many requests")
|
||||
stub.mu.Unlock()
|
||||
u.refreshCache()
|
||||
|
||||
m, fetchedAt, err := u.cache.get()
|
||||
require.Same(t, good, m, "good snapshot must survive a failed refresh")
|
||||
assert.False(t, fetchedAt.IsZero())
|
||||
assert.EqualError(t, err, "429 too many requests")
|
||||
assert.EqualValues(t, 1, stub.errLogs.Load(), "failed refresh should log via LogErrorf")
|
||||
}
|
||||
|
||||
func TestCacheAgeGaugeReportsUnpopulated(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
u := &promUnifi{Config: &Config{Namespace: "test"}, cache: &metricsCache{}}
|
||||
gauge := u.cacheAgeGauge()
|
||||
|
||||
require.Equal(t, -1.0, currentGaugeValue(t, gauge),
|
||||
"unpopulated cache should report -1 sentinel")
|
||||
}
|
||||
|
||||
func TestCacheAgeGaugeReportsFreshness(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
u := &promUnifi{Config: &Config{Namespace: "test"}, cache: &metricsCache{}}
|
||||
u.cache.set(&poller.Metrics{}, nil)
|
||||
|
||||
gauge := u.cacheAgeGauge()
|
||||
age := currentGaugeValue(t, gauge)
|
||||
assert.GreaterOrEqual(t, age, 0.0)
|
||||
assert.Less(t, age, 5.0, "freshly populated cache should report a small age")
|
||||
}
|
||||
|
||||
func TestSafeRefreshRecoversFromPanic(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
good := &poller.Metrics{}
|
||||
stub := &stubCollect{metrics: good}
|
||||
u := &promUnifi{Config: &Config{}, Collector: stub, cache: &metricsCache{}}
|
||||
|
||||
// First a successful refresh so we have a good snapshot to preserve.
|
||||
u.safeRefresh()
|
||||
stub.mu.Lock()
|
||||
stub.metrics = nil
|
||||
stub.panicMsg = "simulated input plugin panic"
|
||||
stub.mu.Unlock()
|
||||
|
||||
require.NotPanics(t, u.safeRefresh, "safeRefresh must swallow upstream panics")
|
||||
|
||||
m, fetchedAt, err := u.cache.get()
|
||||
assert.Same(t, good, m, "panic must not blank out the cache")
|
||||
assert.False(t, fetchedAt.IsZero())
|
||||
assert.ErrorContains(t, err, "refresh panicked")
|
||||
assert.EqualValues(t, 1, stub.errLogs.Load(), "panic should be logged once")
|
||||
}
|
||||
|
||||
func TestRefreshFailuresCounterIncrementsOnErrorAndPanic(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stub := &stubCollect{err: errors.New("upstream down")}
|
||||
counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test_refresh_failures_total"})
|
||||
u := &promUnifi{
|
||||
Config: &Config{},
|
||||
Collector: stub,
|
||||
cache: &metricsCache{},
|
||||
refreshFailures: counter,
|
||||
}
|
||||
|
||||
u.refreshCache()
|
||||
u.refreshCache()
|
||||
assert.Equal(t, 2.0, currentCounterValue(t, counter), "error path should increment counter")
|
||||
|
||||
stub.mu.Lock()
|
||||
stub.err = nil
|
||||
stub.panicMsg = "boom"
|
||||
stub.mu.Unlock()
|
||||
u.safeRefresh()
|
||||
assert.Equal(t, 3.0, currentCounterValue(t, counter), "panic path should increment counter")
|
||||
}
|
||||
|
||||
func TestFetchMetricsScrapeReturnsUpstreamError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stub := &stubCollect{err: errors.New("controller unreachable")}
|
||||
u := &promUnifi{Config: &Config{}, Collector: stub}
|
||||
|
||||
got, err := u.fetchMetrics(&poller.Filter{Path: "https://x.example/"})
|
||||
assert.Nil(t, got)
|
||||
assert.EqualError(t, err, "controller unreachable")
|
||||
}
|
||||
|
||||
// currentGaugeValue reads the current value out of a GaugeFunc by collecting
|
||||
// it through Prometheus' standard interface — avoids depending on internal
|
||||
// client_golang APIs.
|
||||
func currentGaugeValue(t *testing.T, c prometheus.Collector) float64 {
|
||||
t.Helper()
|
||||
|
||||
ch := make(chan prometheus.Metric, 1)
|
||||
c.Collect(ch)
|
||||
close(ch)
|
||||
|
||||
m := <-ch
|
||||
|
||||
var pb dto.Metric
|
||||
|
||||
require.NoError(t, m.Write(&pb))
|
||||
|
||||
return pb.GetGauge().GetValue()
|
||||
}
|
||||
|
||||
func currentCounterValue(t *testing.T, c prometheus.Collector) float64 {
|
||||
t.Helper()
|
||||
|
||||
ch := make(chan prometheus.Metric, 1)
|
||||
c.Collect(ch)
|
||||
close(ch)
|
||||
|
||||
m := <-ch
|
||||
|
||||
var pb dto.Metric
|
||||
|
||||
require.NoError(t, m.Write(&pb))
|
||||
|
||||
return pb.GetCounter().GetValue()
|
||||
}
|
||||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"net"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -18,6 +19,8 @@ import (
|
|||
"github.com/unpoller/unifi/v5"
|
||||
"github.com/unpoller/unpoller/pkg/poller"
|
||||
"github.com/unpoller/unpoller/pkg/webserver"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"golift.io/cnfg"
|
||||
"golift.io/version"
|
||||
)
|
||||
|
||||
|
|
@ -28,11 +31,18 @@ const (
|
|||
// channel buffer, fits at least one batch.
|
||||
defaultBuffer = 50
|
||||
defaultHTTPListen = "0.0.0.0:9130"
|
||||
// defaultInterval matches the typical UniFi Retry-After window; gives the
|
||||
// controller time to recover between background polls without starving
|
||||
// scrapes for fresh data.
|
||||
defaultInterval = 60 * time.Second
|
||||
minimumInterval = 15 * time.Second
|
||||
// simply fewer letters.
|
||||
counter = prometheus.CounterValue
|
||||
gauge = prometheus.GaugeValue
|
||||
)
|
||||
|
||||
// ErrMetricFetchFailed is reported as an invalid metric description when a
|
||||
// scrape cannot obtain data from the configured collector.
|
||||
var ErrMetricFetchFailed = fmt.Errorf("metric fetch failed")
|
||||
|
||||
type promUnifi struct {
|
||||
|
|
@ -76,12 +86,60 @@ type promUnifi struct {
|
|||
PendingDevice *pendingDevice
|
||||
Country *country
|
||||
// controllerUp tracks per-controller poll success (1) or failure (0).
|
||||
// Reflects the most recent background poll — when /metrics is served from
|
||||
// a stale cache, controllerUp lags real-time health; pair with
|
||||
// unpoller_prometheus_cache_age_seconds for staleness signals.
|
||||
controllerUp *prometheus.GaugeVec
|
||||
// refreshFailures counts background refresh failures since process start
|
||||
// so operators can alert on failure rate independently of cache staleness.
|
||||
refreshFailures prometheus.Counter
|
||||
// cache holds the last successful metrics snapshot from the background
|
||||
// poller. Run() always initializes it; the nil-guards in cache-using
|
||||
// methods exist only for tests that exercise those methods directly
|
||||
// without invoking Run().
|
||||
cache *metricsCache
|
||||
// scrapeFlight coalesces concurrent /scrape requests targeting the same
|
||||
// controller URL so a noisy scraper can't multiply upstream load.
|
||||
scrapeFlight singleflight.Group
|
||||
// This interface is passed to the Collect() method. The Collect method uses
|
||||
// this interface to retrieve the latest UniFi measurements and export them.
|
||||
Collector poller.Collect
|
||||
}
|
||||
|
||||
// metricsCache stores the latest background-poller snapshot. Reads and writes
|
||||
// are serialized by an RWMutex so scrapes observe a consistent (metrics,
|
||||
// fetchedAt, lastErr) triple while the background ticker refreshes in place.
|
||||
// Failed refreshes preserve the prior good snapshot so /metrics never blanks
|
||||
// out under transient 429 backoffs.
|
||||
type metricsCache struct {
|
||||
mu sync.RWMutex
|
||||
metrics *poller.Metrics
|
||||
fetchedAt time.Time
|
||||
lastErr error
|
||||
}
|
||||
|
||||
func (c *metricsCache) get() (*poller.Metrics, time.Time, error) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
return c.metrics, c.fetchedAt, c.lastErr
|
||||
}
|
||||
|
||||
func (c *metricsCache) set(m *poller.Metrics, err error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.lastErr = err
|
||||
// Keep the last good snapshot on error so /metrics never blanks out
|
||||
// during transient upstream failures (e.g. 429 backoff). Also reject
|
||||
// (nil, nil) — a "successful" empty fetch would otherwise leave us with
|
||||
// nil metrics but a fresh fetchedAt, fooling the cache-age gauge.
|
||||
if err == nil && m != nil {
|
||||
c.metrics = m
|
||||
c.fetchedAt = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
var _ poller.OutputPlugin = &promUnifi{}
|
||||
|
||||
// Config is the input (config file) data used to initialize this output plugin.
|
||||
|
|
@ -104,6 +162,13 @@ type Config struct {
|
|||
Disable bool `json:"disable" toml:"disable" xml:"disable" yaml:"disable"`
|
||||
// Save data for dead ports? ie. ports that are down or disabled.
|
||||
DeadPorts bool `json:"dead_ports" toml:"dead_ports" xml:"dead_ports" yaml:"dead_ports"`
|
||||
// Interval controls how often the background poller refreshes the cached
|
||||
// metrics that /metrics scrapes are served from. Decouples Prometheus
|
||||
// scrape cadence from upstream UniFi API calls so 429 backoff loops cannot
|
||||
// stall scrapes. Defaults to defaultInterval; values below minimumInterval
|
||||
// are clamped up. Must be > 0 before use; normalizeInterval applies the
|
||||
// default and floor during Run().
|
||||
Interval cnfg.Duration `json:"interval" toml:"interval" xml:"interval" yaml:"interval"`
|
||||
}
|
||||
|
||||
type metric struct {
|
||||
|
|
@ -155,6 +220,8 @@ func init() { // nolint: gochecknoinits
|
|||
})
|
||||
}
|
||||
|
||||
// DebugOutput validates the Prometheus output configuration: address format
|
||||
// and (outside of health-check mode) bindability of the listen port.
|
||||
func (u *promUnifi) DebugOutput() (bool, error) {
|
||||
if u == nil {
|
||||
return true, nil
|
||||
|
|
@ -190,6 +257,7 @@ func (u *promUnifi) DebugOutput() (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
// Enabled reports whether this output plugin is configured and active.
|
||||
func (u *promUnifi) Enabled() bool {
|
||||
if u == nil {
|
||||
return false
|
||||
|
|
@ -227,6 +295,8 @@ func (u *promUnifi) Run(c poller.Collect) error {
|
|||
u.Buffer = defaultBuffer
|
||||
}
|
||||
|
||||
u.normalizeInterval()
|
||||
|
||||
u.Client = descClient(u.Namespace + "_client_")
|
||||
u.Device = descDevice(u.Namespace + "_device_") // stats for all device types.
|
||||
u.UAP = descUAP(u.Namespace + "_device_")
|
||||
|
|
@ -267,8 +337,14 @@ func (u *promUnifi) Run(c poller.Collect) error {
|
|||
u.Country = descCountry(u.Namespace + "_")
|
||||
u.controllerUp = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: u.Namespace + "_controller_up",
|
||||
Help: "Whether the last poll of the UniFi controller succeeded (1) or failed (0).",
|
||||
Help: "Whether the most recent background poll of the UniFi controller succeeded (1) or failed (0). " +
|
||||
"Reflects the last poll attempt, not real-time health; pair with " +
|
||||
u.Namespace + "_prometheus_cache_age_seconds for liveness signals when scrapes are served from a stale cache.",
|
||||
}, []string{"source"})
|
||||
u.refreshFailures = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: u.Namespace + "_prometheus_refresh_failures_total",
|
||||
Help: "Total background metrics refresh failures since process start.",
|
||||
})
|
||||
|
||||
mux := http.NewServeMux()
|
||||
promver.Version = version.Version
|
||||
|
|
@ -278,7 +354,19 @@ func (u *promUnifi) Run(c poller.Collect) error {
|
|||
webserver.UpdateOutput(&webserver.Output{Name: PluginName, Config: u.Config})
|
||||
prometheus.MustRegister(collectors.NewBuildInfoCollector())
|
||||
prometheus.MustRegister(u.controllerUp)
|
||||
prometheus.MustRegister(u.refreshFailures)
|
||||
prometheus.MustRegister(u)
|
||||
|
||||
u.cache = &metricsCache{}
|
||||
prometheus.MustRegister(u.cacheAgeGauge())
|
||||
// safeRefresh (not refreshCache) because a panic in the initial upstream
|
||||
// fetch must not kill Run() before the HTTP listener starts.
|
||||
u.safeRefresh()
|
||||
|
||||
go u.backgroundPoll()
|
||||
|
||||
u.Logf("Prometheus scrape cache enabled, refresh interval: %v", u.Interval.Duration)
|
||||
|
||||
mux.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer,
|
||||
promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError},
|
||||
))
|
||||
|
|
@ -297,6 +385,163 @@ func (u *promUnifi) Run(c poller.Collect) error {
|
|||
}
|
||||
}
|
||||
|
||||
// normalizeInterval applies defaults and the minimum-interval floor to the
|
||||
// configured scrape cache refresh interval. Values <= 0 use the default.
|
||||
func (u *promUnifi) normalizeInterval() {
|
||||
if u.Interval.Duration <= 0 {
|
||||
u.Interval.Duration = defaultInterval
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if u.Interval.Duration < minimumInterval {
|
||||
u.Logf("Prometheus interval %v is below minimum %v; clamping to minimum",
|
||||
u.Interval.Duration, minimumInterval)
|
||||
|
||||
u.Interval.Duration = minimumInterval
|
||||
}
|
||||
}
|
||||
|
||||
// backgroundPoll runs forever, refreshing the metrics cache on the configured
|
||||
// interval. Returns immediately if the cache is not configured. A panic in
|
||||
// upstream collection is logged and the loop continues so one bad payload
|
||||
// doesn't silently stop refreshes (operator would only see cache_age climb).
|
||||
func (u *promUnifi) backgroundPoll() {
|
||||
if u.cache == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(u.Interval.Duration)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
u.safeRefresh()
|
||||
}
|
||||
}
|
||||
|
||||
// safeRefresh wraps refreshCache with a recover so a panic in an input
|
||||
// plugin doesn't kill the background poller. The panic message and stack
|
||||
// trace are logged separately so log aggregators (Sentry, Datadog, Loki)
|
||||
// group panics by their headline rather than treating each stack line as
|
||||
// an independent event.
|
||||
func (u *promUnifi) safeRefresh() {
|
||||
defer func() {
|
||||
r := recover()
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
|
||||
u.LogErrorf("background metrics refresh panicked; continuing: %v", r)
|
||||
u.LogDebugf("panic stack:\n%s", debug.Stack())
|
||||
|
||||
if u.cache != nil {
|
||||
u.cache.set(nil, fmt.Errorf("refresh panicked: %v", r))
|
||||
}
|
||||
|
||||
if u.refreshFailures != nil {
|
||||
u.refreshFailures.Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
u.refreshCache()
|
||||
}
|
||||
|
||||
// refreshCache polls upstream once and updates the cache. On error the last
|
||||
// successful snapshot is preserved so scrapes keep returning data during
|
||||
// transient upstream failures (e.g. 429 backoff loops). Failures are also
|
||||
// counted in refreshFailures so operators can alert on failure rate
|
||||
// independently of cache staleness.
|
||||
//
|
||||
// The cache.set call happens before logging and counter increment so the
|
||||
// cache update is the most-protected statement — if anything in the
|
||||
// post-set bookkeeping ever panics, safeRefresh's recover still sees a
|
||||
// correctly updated cache.
|
||||
func (u *promUnifi) refreshCache() {
|
||||
if u.cache == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m, err := u.Collector.Metrics(nil)
|
||||
u.cache.set(m, err)
|
||||
|
||||
if err != nil {
|
||||
u.LogErrorf("background metrics refresh failed (serving last good snapshot): %v", err)
|
||||
|
||||
if u.refreshFailures != nil {
|
||||
u.refreshFailures.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cacheAgeGauge returns a GaugeFunc reporting seconds since the last
|
||||
// successful background refresh. Returns -1 if no refresh has succeeded yet.
|
||||
func (u *promUnifi) cacheAgeGauge() prometheus.Collector {
|
||||
return prometheus.NewGaugeFunc(
|
||||
prometheus.GaugeOpts{
|
||||
Name: u.Namespace + "_prometheus_cache_age_seconds",
|
||||
Help: "Seconds since the last successful background metrics refresh. -1 means no refresh has succeeded yet.",
|
||||
},
|
||||
func() float64 {
|
||||
if u.cache == nil {
|
||||
return -1
|
||||
}
|
||||
|
||||
_, fetchedAt, _ := u.cache.get()
|
||||
if fetchedAt.IsZero() {
|
||||
return -1
|
||||
}
|
||||
|
||||
return time.Since(fetchedAt).Seconds()
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// fetchMetrics returns the metrics for a scrape, using the cache for global
|
||||
// /metrics scrapes and singleflight-coalesced live calls for per-target
|
||||
// /scrape requests.
|
||||
func (u *promUnifi) fetchMetrics(filter *poller.Filter) (*poller.Metrics, error) {
|
||||
if filter == nil {
|
||||
if u.cache == nil {
|
||||
return u.Collector.Metrics(nil)
|
||||
}
|
||||
|
||||
m, _, err := u.cache.get()
|
||||
if m != nil {
|
||||
// Serve cached data even if the most recent refresh errored.
|
||||
return m, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("metrics cache not yet populated")
|
||||
}
|
||||
|
||||
// /scrape path: coalesce concurrent scrapes for the same target so a
|
||||
// noisy scraper can't multiply upstream API load.
|
||||
key := filter.Path
|
||||
if key == "" {
|
||||
key = filter.Name
|
||||
}
|
||||
|
||||
result, err, _ := u.scrapeFlight.Do(key, func() (any, error) {
|
||||
return u.Collector.Metrics(filter)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Strict assertion: silently dropping a wrong-type result would turn an
|
||||
// upstream regression into an empty 200 OK scrape with no log entry.
|
||||
m, ok := result.(*poller.Metrics)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("singleflight returned unexpected type %T", result)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// ScrapeHandler allows prometheus to scrape a single source, instead of all sources.
|
||||
func (u *promUnifi) ScrapeHandler(w http.ResponseWriter, r *http.Request) {
|
||||
t := &target{u: u, Filter: &poller.Filter{
|
||||
|
|
@ -339,6 +584,8 @@ func (u *promUnifi) ScrapeHandler(w http.ResponseWriter, r *http.Request) {
|
|||
).ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// DefaultHandler serves the HTTP root with a simple liveness response naming
|
||||
// the application.
|
||||
func (u *promUnifi) DefaultHandler(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(poller.AppName + "\n"))
|
||||
|
|
@ -396,7 +643,7 @@ func (u *promUnifi) collect(ch chan<- prometheus.Metric, filter *poller.Filter)
|
|||
}
|
||||
defer r.close()
|
||||
|
||||
r.Metrics, err = u.Collector.Metrics(filter)
|
||||
r.Metrics, err = u.fetchMetrics(filter)
|
||||
r.Fetch = time.Since(r.Start)
|
||||
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in New Issue