// Package datadogunifi provides the methods to turn UniFi measurements into Datadog // data points with appropriate tags and fields. package datadogunifi import ( "reflect" "time" "github.com/DataDog/datadog-go/statsd" "github.com/unpoller/unifi" "github.com/unpoller/unpoller/pkg/poller" "golift.io/cnfg" ) const ( defaultInterval = 30 * time.Second minimumInterval = 10 * time.Second ) // Config defines the data needed to store metrics in Datadog. type Config struct { // Required Config // Interval controls the collection and reporting interval Interval cnfg.Duration `json:"interval,omitempty" toml:"interval,omitempty" xml:"interval,omitempty" yaml:"interval,omitempty"` // Save data for dead ports? ie. ports that are down or disabled. DeadPorts bool `json:"dead_ports,omitempty" toml:"dead_ports,omitempty" xml:"dead_ports,omitempty" yaml:"dead_ports,omitempty"` // Enable when true, enables this output plugin Enable *bool `json:"enable" toml:"enable" xml:"enable,attr" yaml:"enable"` // Address determines how to talk to the Datadog agent Address string `json:"address" toml:"address" xml:"address,attr" yaml:"address"` // Optional Statsd Options - mirrored from statsd.Options // Namespace to prepend to all metrics, events and service checks name. Namespace *string `json:"namespace" toml:"namespace" xml:"namespace,attr" yaml:"namespace"` // Tags are global tags to be applied to every metrics, events and service checks. Tags []string `json:"tags" toml:"tags" xml:"tags,attr" yaml:"tags"` // MaxBytesPerPayload is the maximum number of bytes a single payload will contain. // The magic value 0 will set the option to the optimal size for the transport // protocol used when creating the client: 1432 for UDP and 8192 for UDS. MaxBytesPerPayload *int `json:"max_bytes_per_payload" toml:"max_bytes_per_payload" xml:"max_bytes_per_payload,attr" yaml:"max_bytes_per_payload"` // MaxMessagesPerPayload is the maximum number of metrics, events and/or service checks a single payload will contain. // This option can be set to `1` to create an unbuffered client. MaxMessagesPerPayload *int `json:"max_messages_per_payload" toml:"max_messages_per_payload" xml:"max_messages_per_payload,attr" yaml:"max_messages_per_payload"` // BufferPoolSize is the size of the pool of buffers in number of buffers. // The magic value 0 will set the option to the optimal size for the transport // protocol used when creating the client: 2048 for UDP and 512 for UDS. BufferPoolSize *int `json:"buffer_pool_size" toml:"buffer_pool_size" xml:"buffer_pool_size,attr" yaml:"buffer_pool_size"` // BufferFlushInterval is the interval after which the current buffer will get flushed. BufferFlushInterval *cnfg.Duration `json:"buffer_flush_interval" toml:"buffer_flush_interval" xml:"buffer_flush_interval,attr" yaml:"buffer_flush_interval"` // BufferShardCount is the number of buffer "shards" that will be used. // Those shards allows the use of multiple buffers at the same time to reduce // lock contention. BufferShardCount *int `json:"buffer_shard_count" toml:"buffer_shard_count" xml:"buffer_shard_count,attr" yaml:"buffer_shard_count"` // SenderQueueSize is the size of the sender queue in number of buffers. // The magic value 0 will set the option to the optimal size for the transport // protocol used when creating the client: 2048 for UDP and 512 for UDS. SenderQueueSize *int `json:"sender_queue_size" toml:"sender_queue_size" xml:"sender_queue_size,attr" yaml:"sender_queue_size"` // WriteTimeoutUDS is the timeout after which a UDS packet is dropped. WriteTimeoutUDS *cnfg.Duration `json:"write_timeout_uds" toml:"write_timeout_uds" xml:"write_timeout_uds,attr" yaml:"write_timeout_uds"` // ReceiveMode determines the behavior of the client when receiving to many // metrics. The client will either drop the metrics if its buffers are // full (ChannelMode mode) or block the caller until the metric can be // handled (MutexMode mode). By default the client will MutexMode. This // option should be set to ChannelMode only when use under very high // load. // // MutexMode uses a mutex internally which is much faster than // channel but causes some lock contention when used with a high number // of threads. Mutex are sharded based on the metrics name which // limit mutex contention when goroutines send different metrics. // // ChannelMode: uses channel (of ChannelModeBufferSize size) to send // metrics and drop metrics if the channel is full. Sending metrics in // this mode is slower that MutexMode (because of the channel), but // will not block the application. This mode is made for application // using many goroutines, sending the same metrics at a very high // volume. The goal is to not slow down the application at the cost of // dropping metrics and having a lower max throughput. ReceiveMode *statsd.ReceivingMode `json:"receive_mode" toml:"receive_mode" xml:"receive_mode,attr" yaml:"receive_mode"` // ChannelModeBufferSize is the size of the channel holding incoming metrics ChannelModeBufferSize *int `json:"channel_mode_buffer_size" toml:"channel_mode_buffer_size" xml:"channel_mode_buffer_size,attr" yaml:"channel_mode_buffer_size"` // AggregationFlushInterval is the interval for the aggregator to flush metrics AggregationFlushInterval *time.Duration `json:"aggregation_flush_interval" toml:"aggregation_flush_interval" xml:"aggregation_flush_interval,attr" yaml:"aggregation_flush_interval"` } // Datadog allows the data to be context aware with configuration type Datadog struct { *Config `json:"datadog" toml:"datadog" xml:"datadog" yaml:"datadog"` options []statsd.Option // nolint } // DatadogUnifi is returned by New() after you provide a Config. type DatadogUnifi struct { Collector poller.Collect datadog statsd.ClientInterface LastCheck time.Time *Datadog } func init() { // nolint: gochecknoinits u := &DatadogUnifi{Datadog: &Datadog{}, LastCheck: time.Now()} poller.NewOutput(&poller.Output{ Name: "datadog", Config: u.Datadog, Method: u.Run, }) } func (u *DatadogUnifi) setConfigDefaults() { if u.Interval.Duration == 0 { u.Interval = cnfg.Duration{Duration: defaultInterval} } else if u.Interval.Duration < minimumInterval { u.Interval = cnfg.Duration{Duration: minimumInterval} } u.Interval = cnfg.Duration{Duration: u.Interval.Duration.Round(time.Second)} u.options = make([]statsd.Option, 0) if u.Namespace != nil { u.options = append(u.options, statsd.WithNamespace(*u.Namespace)) } if u.Tags != nil && len(u.Tags) > 0 { u.options = append(u.options, statsd.WithTags(u.Tags)) } if u.MaxBytesPerPayload != nil { u.options = append(u.options, statsd.WithMaxBytesPerPayload(*u.MaxBytesPerPayload)) } if u.MaxMessagesPerPayload != nil { u.options = append(u.options, statsd.WithMaxMessagesPerPayload(*u.MaxMessagesPerPayload)) } if u.BufferPoolSize != nil { u.options = append(u.options, statsd.WithBufferPoolSize(*u.BufferPoolSize)) } if u.BufferFlushInterval != nil { u.options = append(u.options, statsd.WithBufferFlushInterval((*u.BufferFlushInterval).Duration)) } if u.BufferShardCount != nil { u.options = append(u.options, statsd.WithBufferShardCount(*u.BufferShardCount)) } if u.SenderQueueSize != nil { u.options = append(u.options, statsd.WithSenderQueueSize(*u.SenderQueueSize)) } if u.WriteTimeoutUDS != nil { u.options = append(u.options, statsd.WithWriteTimeoutUDS((*u.WriteTimeoutUDS).Duration)) } if u.ReceiveMode != nil { switch *u.ReceiveMode { case statsd.ChannelMode: u.options = append(u.options, statsd.WithChannelMode()) case statsd.MutexMode: u.options = append(u.options, statsd.WithMutexMode()) } } if u.ChannelModeBufferSize != nil { u.options = append(u.options, statsd.WithChannelModeBufferSize(*u.ChannelModeBufferSize)) } if u.AggregationFlushInterval != nil { u.options = append(u.options, statsd.WithAggregationInterval(*u.AggregationFlushInterval)) } } // Run runs a ticker to poll the unifi server and update Datadog. func (u *DatadogUnifi) Run(c poller.Collect) error { disabled := u == nil || u.Enable == nil || !(*u.Enable) || u.Config == nil if disabled { u.LogDebugf("Datadog config is disabled, output is disabled.") return nil } u.Collector = c u.Logf("Datadog is configured.") u.setConfigDefaults() var err error u.datadog, err = statsd.New(u.Address, u.options...) if err != nil { u.LogErrorf("Error configuration Datadog agent reporting: %+v", err) return err } u.PollController() return nil } // PollController runs forever, polling UniFi and pushing to Datadog // This is started by Run() or RunBoth() after everything is validated. func (u *DatadogUnifi) PollController() { interval := u.Interval.Round(time.Second) ticker := time.NewTicker(interval) u.Logf("Everything checks out! Poller started, interval=%+v", interval) for u.LastCheck = range ticker.C { metrics, err := u.Collector.Metrics(&poller.Filter{Name: "unifi"}) if err != nil { u.LogErrorf("metric fetch for Datadog failed: %v", err) continue } events, err := u.Collector.Events(&poller.Filter{Name: "unifi", Dur: interval}) if err != nil { u.LogErrorf("event fetch for Datadog failed", err) continue } report, err := u.ReportMetrics(metrics, events) if err != nil { // Is the agent down? u.LogErrorf("unable to report metrics and events", err) _ = report.reportCount("unifi.collect.errors", 1, []string{}) continue } _ = report.reportCount("unifi.collect.success", 1, []string{}) u.LogDatadogReport(report) } } // ReportMetrics batches all device and client data into datadog data points. // Call this after you've collected all the data you care about. // Returns an error if datadog statsd calls fail, otherwise returns a report. func (u *DatadogUnifi) ReportMetrics(m *poller.Metrics, e *poller.Events) (*Report, error) { r := &Report{ Metrics: m, Events: e, Start: time.Now(), Counts: &Counts{Val: make(map[item]int)}, Collector: u.Collector, client: u.datadog, } // batch all the points. u.loopPoints(r) r.End = time.Now() r.Elapsed = r.End.Sub(r.Start) _ = r.reportTiming("unifi.collector_timing", r.Elapsed, []string{}) return r, nil } // loopPoints collects all the data to immediately report to Datadog. func (u *DatadogUnifi) loopPoints(r report) { m := r.metrics() for _, s := range m.RogueAPs { u.switchExport(r, s) } for _, s := range m.Sites { u.switchExport(r, s) } for _, s := range m.SitesDPI { u.reportSiteDPI(r, s.(*unifi.DPITable)) } for _, s := range m.Clients { u.switchExport(r, s) } for _, s := range m.Devices { u.switchExport(r, s) } for _, s := range r.events().Logs { u.switchExport(r, s) } appTotal := make(totalsDPImap) catTotal := make(totalsDPImap) for _, s := range m.ClientsDPI { u.batchClientDPI(r, s, appTotal, catTotal) } reportClientDPItotals(r, appTotal, catTotal) } func (u *DatadogUnifi) switchExport(r report, v interface{}) { //nolint:cyclop switch v := v.(type) { case *unifi.RogueAP: u.batchRogueAP(r, v) case *unifi.UAP: u.batchUAP(r, v) case *unifi.USW: u.batchUSW(r, v) case *unifi.USG: u.batchUSG(r, v) case *unifi.UXG: u.batchUXG(r, v) case *unifi.UDM: u.batchUDM(r, v) case *unifi.Site: u.reportSite(r, v) case *unifi.Client: u.batchClient(r, v) case *unifi.Event: u.batchEvent(r, v) case *unifi.IDS: u.batchIDS(r, v) case *unifi.Alarm: u.batchAlarms(r, v) case *unifi.Anomaly: u.batchAnomaly(r, v) default: u.LogErrorf("invalid export, type=%+v", reflect.TypeOf(v)) } } // LogDatadogReport writes a log message after exporting to Datadog. func (u *DatadogUnifi) LogDatadogReport(r *Report) { m := r.Metrics u.Logf("UniFi Metrics Recorded num_sites=%d num_sites_dpi=%d num_clients=%d num_clients_dpi=%d num_rogue_ap=%d num_devices=%d errors=%v elapsec=%v", len(m.Sites), len(m.SitesDPI), len(m.Clients), len(m.ClientsDPI), len(m.RogueAPs), len(m.Devices), r.Errors, r.Elapsed, ) metricName := metricNamespace("collector") _ = r.reportCount(metricName("num_sites"), int64(len(m.Sites)), u.Tags) _ = r.reportCount(metricName("num_sites_dpi"), int64(len(m.SitesDPI)), u.Tags) _ = r.reportCount(metricName("num_clients"), int64(len(m.Clients)), u.Tags) _ = r.reportCount(metricName("num_clients_dpi"), int64(len(m.ClientsDPI)), u.Tags) _ = r.reportCount(metricName("num_rogue_ap"), int64(len(m.RogueAPs)), u.Tags) _ = r.reportCount(metricName("num_devices"), int64(len(m.Devices)), u.Tags) _ = r.reportCount(metricName("num_errors"), int64(len(r.Errors)), u.Tags) _ = r.reportTiming(metricName("elapsed_time"), r.Elapsed, u.Tags) }