358 lines
12 KiB
Go
358 lines
12 KiB
Go
// Package datadogunifi provides the methods to turn UniFi measurements into Datadog
|
|
// data points with appropriate tags and fields.
|
|
package datadogunifi
|
|
|
|
import (
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/DataDog/datadog-go/statsd"
|
|
"github.com/unpoller/unifi"
|
|
"github.com/unpoller/unpoller/core/poller"
|
|
"golift.io/cnfg"
|
|
)
|
|
|
|
const (
|
|
defaultInterval = 30 * time.Second
|
|
minimumInterval = 10 * time.Second
|
|
)
|
|
|
|
// Config defines the data needed to store metrics in Datadog.
|
|
type Config struct {
|
|
// Required Config
|
|
|
|
// Interval controls the collection and reporting interval
|
|
Interval cnfg.Duration `json:"interval,omitempty" toml:"interval,omitempty" xml:"interval,omitempty" yaml:"interval,omitempty"`
|
|
|
|
// Save data for dead ports? ie. ports that are down or disabled.
|
|
DeadPorts bool `json:"dead_ports,omitempty" toml:"dead_ports,omitempty" xml:"dead_ports,omitempty" yaml:"dead_ports,omitempty"`
|
|
|
|
// Enable when true, enables this output plugin
|
|
Enable *bool `json:"enable" toml:"enable" xml:"enable,attr" yaml:"enable"`
|
|
// Address determines how to talk to the Datadog agent
|
|
Address string `json:"address" toml:"address" xml:"address,attr" yaml:"address"`
|
|
|
|
// Optional Statsd Options - mirrored from statsd.Options
|
|
|
|
// Namespace to prepend to all metrics, events and service checks name.
|
|
Namespace *string `json:"namespace" toml:"namespace" xml:"namespace,attr" yaml:"namespace"`
|
|
|
|
// Tags are global tags to be applied to every metrics, events and service checks.
|
|
Tags []string `json:"tags" toml:"tags" xml:"tags,attr" yaml:"tags"`
|
|
|
|
// MaxBytesPerPayload is the maximum number of bytes a single payload will contain.
|
|
// The magic value 0 will set the option to the optimal size for the transport
|
|
// protocol used when creating the client: 1432 for UDP and 8192 for UDS.
|
|
MaxBytesPerPayload *int `json:"max_bytes_per_payload" toml:"max_bytes_per_payload" xml:"max_bytes_per_payload,attr" yaml:"max_bytes_per_payload"`
|
|
|
|
// MaxMessagesPerPayload is the maximum number of metrics, events and/or service checks a single payload will contain.
|
|
// This option can be set to `1` to create an unbuffered client.
|
|
MaxMessagesPerPayload *int `json:"max_messages_per_payload" toml:"max_messages_per_payload" xml:"max_messages_per_payload,attr" yaml:"max_messages_per_payload"`
|
|
|
|
// BufferPoolSize is the size of the pool of buffers in number of buffers.
|
|
// The magic value 0 will set the option to the optimal size for the transport
|
|
// protocol used when creating the client: 2048 for UDP and 512 for UDS.
|
|
BufferPoolSize *int `json:"buffer_pool_size" toml:"buffer_pool_size" xml:"buffer_pool_size,attr" yaml:"buffer_pool_size"`
|
|
|
|
// BufferFlushInterval is the interval after which the current buffer will get flushed.
|
|
BufferFlushInterval *cnfg.Duration `json:"buffer_flush_interval" toml:"buffer_flush_interval" xml:"buffer_flush_interval,attr" yaml:"buffer_flush_interval"`
|
|
|
|
// BufferShardCount is the number of buffer "shards" that will be used.
|
|
// Those shards allows the use of multiple buffers at the same time to reduce
|
|
// lock contention.
|
|
BufferShardCount *int `json:"buffer_shard_count" toml:"buffer_shard_count" xml:"buffer_shard_count,attr" yaml:"buffer_shard_count"`
|
|
|
|
// SenderQueueSize is the size of the sender queue in number of buffers.
|
|
// The magic value 0 will set the option to the optimal size for the transport
|
|
// protocol used when creating the client: 2048 for UDP and 512 for UDS.
|
|
SenderQueueSize *int `json:"sender_queue_size" toml:"sender_queue_size" xml:"sender_queue_size,attr" yaml:"sender_queue_size"`
|
|
|
|
// WriteTimeoutUDS is the timeout after which a UDS packet is dropped.
|
|
WriteTimeoutUDS *cnfg.Duration `json:"write_timeout_uds" toml:"write_timeout_uds" xml:"write_timeout_uds,attr" yaml:"write_timeout_uds"`
|
|
|
|
// ReceiveMode determines the behavior of the client when receiving to many
|
|
// metrics. The client will either drop the metrics if its buffers are
|
|
// full (ChannelMode mode) or block the caller until the metric can be
|
|
// handled (MutexMode mode). By default the client will MutexMode. This
|
|
// option should be set to ChannelMode only when use under very high
|
|
// load.
|
|
//
|
|
// MutexMode uses a mutex internally which is much faster than
|
|
// channel but causes some lock contention when used with a high number
|
|
// of threads. Mutex are sharded based on the metrics name which
|
|
// limit mutex contention when goroutines send different metrics.
|
|
//
|
|
// ChannelMode: uses channel (of ChannelModeBufferSize size) to send
|
|
// metrics and drop metrics if the channel is full. Sending metrics in
|
|
// this mode is slower that MutexMode (because of the channel), but
|
|
// will not block the application. This mode is made for application
|
|
// using many goroutines, sending the same metrics at a very high
|
|
// volume. The goal is to not slow down the application at the cost of
|
|
// dropping metrics and having a lower max throughput.
|
|
ReceiveMode *statsd.ReceivingMode `json:"receive_mode" toml:"receive_mode" xml:"receive_mode,attr" yaml:"receive_mode"`
|
|
|
|
// ChannelModeBufferSize is the size of the channel holding incoming metrics
|
|
ChannelModeBufferSize *int `json:"channel_mode_buffer_size" toml:"channel_mode_buffer_size" xml:"channel_mode_buffer_size,attr" yaml:"channel_mode_buffer_size"`
|
|
|
|
// AggregationFlushInterval is the interval for the aggregator to flush metrics
|
|
AggregationFlushInterval *time.Duration `json:"aggregation_flush_interval" toml:"aggregation_flush_interval" xml:"aggregation_flush_interval,attr" yaml:"aggregation_flush_interval"`
|
|
}
|
|
|
|
// Datadog allows the data to be context aware with configuration
|
|
type Datadog struct {
|
|
*Config `json:"datadog" toml:"datadog" xml:"datadog" yaml:"datadog"`
|
|
options []statsd.Option // nolint
|
|
}
|
|
|
|
// DatadogUnifi is returned by New() after you provide a Config.
|
|
type DatadogUnifi struct {
|
|
Collector poller.Collect
|
|
datadog statsd.ClientInterface
|
|
LastCheck time.Time
|
|
*Datadog
|
|
}
|
|
|
|
func init() { // nolint: gochecknoinits
|
|
u := &DatadogUnifi{Datadog: &Datadog{}, LastCheck: time.Now()}
|
|
|
|
poller.NewOutput(&poller.Output{
|
|
Name: "datadog",
|
|
Config: u.Datadog,
|
|
Method: u.Run,
|
|
})
|
|
}
|
|
|
|
func (u *DatadogUnifi) setConfigDefaults() {
|
|
if u.Interval.Duration == 0 {
|
|
u.Interval = cnfg.Duration{Duration: defaultInterval}
|
|
} else if u.Interval.Duration < minimumInterval {
|
|
u.Interval = cnfg.Duration{Duration: minimumInterval}
|
|
}
|
|
|
|
u.Interval = cnfg.Duration{Duration: u.Interval.Duration.Round(time.Second)}
|
|
|
|
u.options = make([]statsd.Option, 0)
|
|
|
|
if u.Namespace != nil {
|
|
u.options = append(u.options, statsd.WithNamespace(*u.Namespace))
|
|
}
|
|
|
|
if u.Tags != nil && len(u.Tags) > 0 {
|
|
u.options = append(u.options, statsd.WithTags(u.Tags))
|
|
}
|
|
|
|
if u.MaxBytesPerPayload != nil {
|
|
u.options = append(u.options, statsd.WithMaxBytesPerPayload(*u.MaxBytesPerPayload))
|
|
}
|
|
|
|
if u.MaxMessagesPerPayload != nil {
|
|
u.options = append(u.options, statsd.WithMaxMessagesPerPayload(*u.MaxMessagesPerPayload))
|
|
}
|
|
|
|
if u.BufferPoolSize != nil {
|
|
u.options = append(u.options, statsd.WithBufferPoolSize(*u.BufferPoolSize))
|
|
}
|
|
|
|
if u.BufferFlushInterval != nil {
|
|
u.options = append(u.options, statsd.WithBufferFlushInterval((*u.BufferFlushInterval).Duration))
|
|
}
|
|
|
|
if u.BufferShardCount != nil {
|
|
u.options = append(u.options, statsd.WithBufferShardCount(*u.BufferShardCount))
|
|
}
|
|
|
|
if u.SenderQueueSize != nil {
|
|
u.options = append(u.options, statsd.WithSenderQueueSize(*u.SenderQueueSize))
|
|
}
|
|
|
|
if u.WriteTimeoutUDS != nil {
|
|
u.options = append(u.options, statsd.WithWriteTimeoutUDS((*u.WriteTimeoutUDS).Duration))
|
|
}
|
|
|
|
if u.ReceiveMode != nil {
|
|
switch *u.ReceiveMode {
|
|
case statsd.ChannelMode:
|
|
u.options = append(u.options, statsd.WithChannelMode())
|
|
case statsd.MutexMode:
|
|
u.options = append(u.options, statsd.WithMutexMode())
|
|
}
|
|
}
|
|
|
|
if u.ChannelModeBufferSize != nil {
|
|
u.options = append(u.options, statsd.WithChannelModeBufferSize(*u.ChannelModeBufferSize))
|
|
}
|
|
|
|
if u.AggregationFlushInterval != nil {
|
|
u.options = append(u.options, statsd.WithAggregationInterval(*u.AggregationFlushInterval))
|
|
}
|
|
|
|
}
|
|
|
|
// Run runs a ticker to poll the unifi server and update Datadog.
|
|
func (u *DatadogUnifi) Run(c poller.Collect) error {
|
|
disabled := u == nil || u.Enable == nil || !(*u.Enable) || u.Config == nil
|
|
if disabled {
|
|
u.LogDebugf("Datadog config is disabled, output is disabled.")
|
|
return nil
|
|
}
|
|
u.Collector = c
|
|
u.Logf("Datadog is configured.")
|
|
u.setConfigDefaults()
|
|
|
|
var err error
|
|
u.datadog, err = statsd.New(u.Address, u.options...)
|
|
if err != nil {
|
|
u.LogErrorf("Error configuration Datadog agent reporting: %+v", err)
|
|
return err
|
|
}
|
|
|
|
u.PollController()
|
|
|
|
return nil
|
|
}
|
|
|
|
// PollController runs forever, polling UniFi and pushing to Datadog
|
|
// This is started by Run() or RunBoth() after everything is validated.
|
|
func (u *DatadogUnifi) PollController() {
|
|
interval := u.Interval.Round(time.Second)
|
|
ticker := time.NewTicker(interval)
|
|
u.Logf("Everything checks out! Poller started, interval=%+v", interval)
|
|
|
|
for u.LastCheck = range ticker.C {
|
|
metrics, err := u.Collector.Metrics(&poller.Filter{Name: "unifi"})
|
|
if err != nil {
|
|
u.LogErrorf("metric fetch for Datadog failed: %v", err)
|
|
continue
|
|
}
|
|
|
|
events, err := u.Collector.Events(&poller.Filter{Name: "unifi", Dur: interval})
|
|
if err != nil {
|
|
u.LogErrorf("event fetch for Datadog failed", err)
|
|
continue
|
|
}
|
|
|
|
report, err := u.ReportMetrics(metrics, events)
|
|
if err != nil {
|
|
// Is the agent down?
|
|
u.LogErrorf("unable to report metrics and events", err)
|
|
_ = report.reportCount("unifi.collect.errors", 1, []string{})
|
|
continue
|
|
}
|
|
_ = report.reportCount("unifi.collect.success", 1, []string{})
|
|
u.LogDatadogReport(report)
|
|
}
|
|
}
|
|
|
|
// ReportMetrics batches all device and client data into datadog data points.
|
|
// Call this after you've collected all the data you care about.
|
|
// Returns an error if datadog statsd calls fail, otherwise returns a report.
|
|
func (u *DatadogUnifi) ReportMetrics(m *poller.Metrics, e *poller.Events) (*Report, error) {
|
|
r := &Report{
|
|
Metrics: m,
|
|
Events: e,
|
|
Start: time.Now(),
|
|
Counts: &Counts{Val: make(map[item]int)},
|
|
Collector: u.Collector,
|
|
client: u.datadog,
|
|
}
|
|
// batch all the points.
|
|
u.loopPoints(r)
|
|
r.End = time.Now()
|
|
r.Elapsed = r.End.Sub(r.Start)
|
|
_ = r.reportTiming("unifi.collector_timing", r.Elapsed, []string{})
|
|
return r, nil
|
|
}
|
|
|
|
// loopPoints collects all the data to immediately report to Datadog.
|
|
func (u *DatadogUnifi) loopPoints(r report) {
|
|
m := r.metrics()
|
|
|
|
for _, s := range m.RogueAPs {
|
|
u.switchExport(r, s)
|
|
}
|
|
|
|
for _, s := range m.Sites {
|
|
u.switchExport(r, s)
|
|
}
|
|
|
|
for _, s := range m.SitesDPI {
|
|
u.reportSiteDPI(r, s.(*unifi.DPITable))
|
|
}
|
|
|
|
for _, s := range m.Clients {
|
|
u.switchExport(r, s)
|
|
}
|
|
|
|
for _, s := range m.Devices {
|
|
u.switchExport(r, s)
|
|
}
|
|
|
|
for _, s := range r.events().Logs {
|
|
u.switchExport(r, s)
|
|
}
|
|
|
|
appTotal := make(totalsDPImap)
|
|
catTotal := make(totalsDPImap)
|
|
|
|
for _, s := range m.ClientsDPI {
|
|
u.batchClientDPI(r, s, appTotal, catTotal)
|
|
}
|
|
|
|
reportClientDPItotals(r, appTotal, catTotal)
|
|
}
|
|
|
|
func (u *DatadogUnifi) switchExport(r report, v interface{}) { //nolint:cyclop
|
|
switch v := v.(type) {
|
|
case *unifi.RogueAP:
|
|
u.batchRogueAP(r, v)
|
|
case *unifi.UAP:
|
|
u.batchUAP(r, v)
|
|
case *unifi.USW:
|
|
u.batchUSW(r, v)
|
|
case *unifi.USG:
|
|
u.batchUSG(r, v)
|
|
case *unifi.UXG:
|
|
u.batchUXG(r, v)
|
|
case *unifi.UDM:
|
|
u.batchUDM(r, v)
|
|
case *unifi.Site:
|
|
u.reportSite(r, v)
|
|
case *unifi.Client:
|
|
u.batchClient(r, v)
|
|
case *unifi.Event:
|
|
u.batchEvent(r, v)
|
|
case *unifi.IDS:
|
|
u.batchIDS(r, v)
|
|
case *unifi.Alarm:
|
|
u.batchAlarms(r, v)
|
|
case *unifi.Anomaly:
|
|
u.batchAnomaly(r, v)
|
|
default:
|
|
u.LogErrorf("invalid export, type=%+v", reflect.TypeOf(v))
|
|
}
|
|
}
|
|
|
|
// LogDatadogReport writes a log message after exporting to Datadog.
|
|
func (u *DatadogUnifi) LogDatadogReport(r *Report) {
|
|
m := r.Metrics
|
|
u.Logf("UniFi Metrics Recorded num_sites=%d num_sites_dpi=%d num_clients=%d num_clients_dpi=%d num_rogue_ap=%d num_devices=%d errors=%v elapsec=%v",
|
|
len(m.Sites),
|
|
len(m.SitesDPI),
|
|
len(m.Clients),
|
|
len(m.ClientsDPI),
|
|
len(m.RogueAPs),
|
|
len(m.Devices),
|
|
r.Errors,
|
|
r.Elapsed,
|
|
)
|
|
metricName := metricNamespace("collector")
|
|
_ = r.reportCount(metricName("num_sites"), int64(len(m.Sites)), u.Tags)
|
|
_ = r.reportCount(metricName("num_sites_dpi"), int64(len(m.SitesDPI)), u.Tags)
|
|
_ = r.reportCount(metricName("num_clients"), int64(len(m.Clients)), u.Tags)
|
|
_ = r.reportCount(metricName("num_clients_dpi"), int64(len(m.ClientsDPI)), u.Tags)
|
|
_ = r.reportCount(metricName("num_rogue_ap"), int64(len(m.RogueAPs)), u.Tags)
|
|
_ = r.reportCount(metricName("num_devices"), int64(len(m.Devices)), u.Tags)
|
|
_ = r.reportCount(metricName("num_errors"), int64(len(r.Errors)), u.Tags)
|
|
_ = r.reportTiming(metricName("elapsed_time"), r.Elapsed, u.Tags)
|
|
}
|