diff --git a/pkg/influxunifi/clients.go b/pkg/influxunifi/clients.go index 0ff5fcd8..f954da17 100644 --- a/pkg/influxunifi/clients.go +++ b/pkg/influxunifi/clients.go @@ -1,93 +1,77 @@ package influxunifi import ( - "time" - - influx "github.com/influxdata/influxdb1-client/v2" "golift.io/unifi" ) -// ClientPoints generates Unifi Client datapoints for InfluxDB. +// batchClient generates Unifi Client datapoints for InfluxDB. // These points can be passed directly to influx. -func ClientPoints(c *unifi.Client, now time.Time) ([]*influx.Point, error) { +func (u *InfluxUnifi) batchClient(r report, s *unifi.Client) { tags := map[string]string{ - "id": c.ID, - "mac": c.Mac, - "user_id": c.UserID, - "site_id": c.SiteID, - "site_name": c.SiteName, - "ap_mac": c.ApMac, - "gw_mac": c.GwMac, - "sw_mac": c.SwMac, - "ap_name": c.ApName, - "gw_name": c.GwName, - "sw_name": c.SwName, - "oui": c.Oui, - "radio_name": c.RadioName, - "radio": c.Radio, - "radio_proto": c.RadioProto, - "name": c.Name, - "fixed_ip": c.FixedIP, - "sw_port": c.SwPort.Txt, - "os_class": c.OsClass.Txt, - "os_name": c.OsName.Txt, - "dev_cat": c.DevCat.Txt, - "dev_id": c.DevID.Txt, - "dev_vendor": c.DevVendor.Txt, - "dev_family": c.DevFamily.Txt, - "is_wired": c.IsWired.Txt, - "is_guest": c.IsGuest.Txt, - "use_fixedip": c.UseFixedIP.Txt, - "channel": c.Channel.Txt, - "vlan": c.Vlan.Txt, + "mac": s.Mac, + "site_name": s.SiteName, + "ap_name": s.ApName, + "gw_name": s.GwName, + "sw_name": s.SwName, + "oui": s.Oui, + "radio_name": s.RadioName, + "radio": s.Radio, + "radio_proto": s.RadioProto, + "name": s.Name, + "fixed_ip": s.FixedIP, + "sw_port": s.SwPort.Txt, + "os_class": s.OsClass.Txt, + "os_name": s.OsName.Txt, + "dev_cat": s.DevCat.Txt, + "dev_id": s.DevID.Txt, + "dev_vendor": s.DevVendor.Txt, + "dev_family": s.DevFamily.Txt, + "is_wired": s.IsWired.Txt, + "is_guest": s.IsGuest.Txt, + "use_fixedip": s.UseFixedIP.Txt, + "channel": s.Channel.Txt, + "vlan": s.Vlan.Txt, } fields := map[string]interface{}{ - "anomalies": c.Anomalies, - "ip": c.IP, - "essid": c.Essid, - "bssid": c.Bssid, - "radio_desc": c.RadioDescription, - "satisfaction": c.Satisfaction.Val, - "hostname": c.Hostname, - "bytes_r": c.BytesR, - "ccq": c.Ccq, - "first_seen": c.FirstSeen, - "idle_time": c.IdleTime, - "last_seen": c.LastSeen, - "network": c.Network, - "noise": c.Noise, - "note": c.Note, - "roam_count": c.RoamCount, - "rssi": c.Rssi, - "rx_bytes": c.RxBytes, - "rx_bytes_r": c.RxBytesR, - "rx_packets": c.RxPackets, - "rx_rate": c.RxRate, - "signal": c.Signal, - "tx_bytes": c.TxBytes, - "tx_bytes_r": c.TxBytesR, - "tx_packets": c.TxPackets, - "tx_retries": c.TxRetries, - "tx_power": c.TxPower, - "tx_rate": c.TxRate, - "uptime": c.Uptime, - "wifi_tx_attempts": c.WifiTxAttempts, - "wired-rx_bytes": c.WiredRxBytes, - "wired-rx_bytes-r": c.WiredRxBytesR, - "wired-rx_packets": c.WiredRxPackets, - "wired-tx_bytes": c.WiredTxBytes, - "wired-tx_bytes-r": c.WiredTxBytesR, - "wired-tx_packets": c.WiredTxPackets, - "dpi_app": c.DpiStats.App.Val, - "dpi_cat": c.DpiStats.Cat.Val, - "dpi_rx_bytes": c.DpiStats.RxBytes.Val, - "dpi_rx_packets": c.DpiStats.RxPackets.Val, - "dpi_tx_bytes": c.DpiStats.TxBytes.Val, - "dpi_tx_packets": c.DpiStats.TxPackets.Val, + "anomalies": s.Anomalies, + "ip": s.IP, + "essid": s.Essid, + "bssid": s.Bssid, + "radio_desc": s.RadioDescription, + "satisfaction": s.Satisfaction.Val, + "bytes_r": s.BytesR, + "ccq": s.Ccq, + "noise": s.Noise, + "note": s.Note, + "roam_count": s.RoamCount, + "rssi": s.Rssi, + "rx_bytes": s.RxBytes, + "rx_bytes_r": s.RxBytesR, + "rx_packets": s.RxPackets, + "rx_rate": s.RxRate, + "signal": s.Signal, + "tx_bytes": s.TxBytes, + "tx_bytes_r": s.TxBytesR, + "tx_packets": s.TxPackets, + "tx_retries": s.TxRetries, + "tx_power": s.TxPower, + "tx_rate": s.TxRate, + "uptime": s.Uptime, + "wifi_tx_attempts": s.WifiTxAttempts, + "wired-rx_bytes": s.WiredRxBytes, + "wired-rx_bytes-r": s.WiredRxBytesR, + "wired-rx_packets": s.WiredRxPackets, + "wired-tx_bytes": s.WiredTxBytes, + "wired-tx_bytes-r": s.WiredTxBytesR, + "wired-tx_packets": s.WiredTxPackets, + /* + "dpi_app": c.DpiStats.App.Val, + "dpi_cat": c.DpiStats.Cat.Val, + "dpi_rx_bytes": c.DpiStats.RxBytes.Val, + "dpi_rx_packets": c.DpiStats.RxPackets.Val, + "dpi_tx_bytes": c.DpiStats.TxBytes.Val, + "dpi_tx_packets": c.DpiStats.TxPackets.Val, + */ } - pt, err := influx.NewPoint("clients", tags, fields, now) - if err != nil { - return nil, err - } - return []*influx.Point{pt}, nil + r.send(&metric{Table: "clients", Tags: tags, Fields: fields}) } diff --git a/pkg/influxunifi/ids.go b/pkg/influxunifi/ids.go index bf444f5f..c7b8edba 100644 --- a/pkg/influxunifi/ids.go +++ b/pkg/influxunifi/ids.go @@ -1,13 +1,12 @@ package influxunifi import ( - influx "github.com/influxdata/influxdb1-client/v2" "golift.io/unifi" ) -// IDSPoints generates intrusion detection datapoints for InfluxDB. +// batchIDS generates intrusion detection datapoints for InfluxDB. // These points can be passed directly to influx. -func IDSPoints(i *unifi.IDS) ([]*influx.Point, error) { +func (u *InfluxUnifi) batchIDS(r report, i *unifi.IDS) { tags := map[string]string{ "in_iface": i.InIface, "event_type": i.EventType, @@ -36,9 +35,5 @@ func IDSPoints(i *unifi.IDS) ([]*influx.Point, error) { "srcipASN": i.SrcipASN, "usgipASN": i.UsgipASN, } - pt, err := influx.NewPoint("intrusion_detect", tags, fields, i.Datetime) - if err != nil { - return nil, err - } - return []*influx.Point{pt}, nil + r.send(&metric{Table: "intrusion_detect", Tags: tags, Fields: fields}) } diff --git a/pkg/influxunifi/metrics.go b/pkg/influxunifi/metrics.go index f4a8c86b..af8bd274 100644 --- a/pkg/influxunifi/metrics.go +++ b/pkg/influxunifi/metrics.go @@ -3,67 +3,141 @@ package influxunifi import ( + "crypto/tls" + "fmt" + "time" + "github.com/davidnewhall/unifi-poller/pkg/metrics" - client "github.com/influxdata/influxdb1-client/v2" + _ "github.com/influxdata/influxdb1-client" + influx "github.com/influxdata/influxdb1-client/v2" ) -// Metrics contains all the data from the controller and an influx endpoint to send it to. -type Metrics struct { - *metrics.Metrics - client.BatchPoints +// Config defines the data needed to store metrics in InfluxDB +type Config struct { + Database string + URL string + User string + Pass string + BadSSL bool } -// ProcessPoints batches all device and client data into influxdb data points. +// InfluxUnifi is returned by New() after you provide a Config. +type InfluxUnifi struct { + cf *Config + influx influx.Client +} + +type metric struct { + Table string + Tags map[string]string + Fields map[string]interface{} +} + +// New returns an InfluxDB interface. +func New(c *Config) (*InfluxUnifi, error) { + i, err := influx.NewHTTPClient(influx.HTTPConfig{ + Addr: c.URL, + Username: c.User, + Password: c.Pass, + TLSConfig: &tls.Config{InsecureSkipVerify: c.BadSSL}, + }) + return &InfluxUnifi{cf: c, influx: i}, err +} + +// ReportMetrics batches all device and client data into influxdb data points. // Call this after you've collected all the data you care about. -// This function is sorta weird and returns a slice of errors. The reasoning is -// that some points may process while others fail, so we attempt to process them -// all. This is (usually) run in a loop, so we can't really exit on error, -// we just log the errors and tally them on a counter. In reality, this never -// returns any errors because we control the data going in; cool right? But we -// still check&log it in case the data going is skewed up and causes errors! -func (m *Metrics) ProcessPoints() []error { - errs := []error{} - processPoints := func(m *Metrics, p []*client.Point, err error) { - switch { - case err != nil: - errs = append(errs, err) - case p == nil: - default: - m.AddPoints(p) - } +// Returns an error if influxdb calls fail, otherwise returns a report. +func (u *InfluxUnifi) ReportMetrics(m *metrics.Metrics) (*Report, error) { + r := &Report{Metrics: m, ch: make(chan *metric), Start: time.Now()} + defer close(r.ch) + // Make a new Influx Points Batcher. + var err error + r.bp, err = influx.NewBatchPoints(influx.BatchPointsConfig{Database: u.cf.Database}) + if err != nil { + return nil, fmt.Errorf("influx.NewBatchPoints: %v", err) } - for _, asset := range m.Sites { - pts, err := SitePoints(asset, m.TS) - processPoints(m, pts, err) - } - for _, asset := range m.Clients { - pts, err := ClientPoints(asset, m.TS) - processPoints(m, pts, err) - } - for _, asset := range m.IDSList { - pts, err := IDSPoints(asset) // no m.TS. - processPoints(m, pts, err) - } + go u.collect(r, r.ch) + // Batch all the points. + u.loopPoints(r) - if m.Devices == nil { - return errs + // Send all the points. + if err = u.influx.Write(r.bp); err != nil { + return nil, fmt.Errorf("influxdb.Write(points): %v", err) } - for _, asset := range m.Devices.UAPs { - pts, err := UAPPoints(asset, m.TS) - processPoints(m, pts, err) - } - for _, asset := range m.Devices.USGs { - pts, err := USGPoints(asset, m.TS) - processPoints(m, pts, err) - } - for _, asset := range m.Devices.USWs { - pts, err := USWPoints(asset, m.TS) - processPoints(m, pts, err) - } - for _, asset := range m.Devices.UDMs { - pts, err := UDMPoints(asset, m.TS) - processPoints(m, pts, err) - } - return errs + r.Elapsed = time.Since(r.Start) + return r, nil +} + +// collect runs in a go routine and batches all the points. +func (u *InfluxUnifi) collect(r report, ch chan *metric) { + for m := range ch { + pt, err := influx.NewPoint(m.Table, m.Tags, m.Fields, r.metrics().TS) + if err != nil { + r.error(err) + } else { + r.batch(m, pt) + } + r.done() + } +} + +// loopPoints kicks off 3 or 7 go routines to process metrics and send them +// to the collect routine through the metric channel. +func (u *InfluxUnifi) loopPoints(r report) { + m := r.metrics() + r.add() + go func() { + defer r.done() + for _, s := range m.Sites { + u.batchSite(r, s) + } + }() + r.add() + go func() { + defer r.done() + for _, s := range m.Clients { + u.batchClient(r, s) + } + }() + r.add() + go func() { + defer r.done() + for _, s := range m.IDSList { + u.batchIDS(r, s) + } + }() + if m.Devices == nil { + return + } + + r.add() + go func() { + defer r.done() + for _, s := range m.UAPs { + u.batchUAP(r, s) + } + }() + r.add() + go func() { + defer r.done() + for _, s := range m.USGs { + u.batchUSG(r, s) + } + }() + r.add() + go func() { + defer r.done() + for _, s := range m.USWs { + u.batchUSW(r, s) + } + }() + r.add() + go func() { + defer r.done() + for _, s := range m.UDMs { + u.batchUDM(r, s) + } + }() + r.wait() } diff --git a/pkg/influxunifi/report.go b/pkg/influxunifi/report.go new file mode 100644 index 00000000..42211c26 --- /dev/null +++ b/pkg/influxunifi/report.go @@ -0,0 +1,69 @@ +package influxunifi + +import ( + "sync" + "time" + + "github.com/davidnewhall/unifi-poller/pkg/metrics" + influx "github.com/influxdata/influxdb1-client/v2" +) + +// Report is returned to the calling procedure after everything is processed. +type Report struct { + Metrics *metrics.Metrics + Errors []error + Total int + Fields int + Start time.Time + Elapsed time.Duration + ch chan *metric + wg sync.WaitGroup + bp influx.BatchPoints +} + +// satisfy gomnd +const one = 1 + +// report is an internal interface that can be mocked and overrridden for tests. +type report interface { + add() + done() + wait() + send(m *metric) + error(err error) + batch(m *metric, pt *influx.Point) + metrics() *metrics.Metrics +} + +func (r *Report) metrics() *metrics.Metrics { + return r.Metrics +} + +func (r *Report) add() { + r.wg.Add(one) +} + +func (r *Report) done() { + r.wg.Add(-one) +} + +func (r *Report) send(m *metric) { + r.wg.Add(one) + r.ch <- m +} + +func (r *Report) wait() { + r.wg.Wait() +} + +/* The following methods are not thread safe. */ + +func (r *Report) error(err error) { + r.Errors = append(r.Errors, err) +} + +func (r *Report) batch(m *metric, p *influx.Point) { + r.Total++ + r.Fields += len(m.Fields) + r.bp.AddPoint(p) +} diff --git a/pkg/influxunifi/site.go b/pkg/influxunifi/site.go index dbfdac38..243d2acc 100644 --- a/pkg/influxunifi/site.go +++ b/pkg/influxunifi/site.go @@ -1,67 +1,56 @@ package influxunifi import ( - "time" - - influx "github.com/influxdata/influxdb1-client/v2" "golift.io/unifi" ) -// SitePoints generates Unifi Sites' datapoints for InfluxDB. +// batchSite generates Unifi Sites' datapoints for InfluxDB. // These points can be passed directly to influx. -func SitePoints(u *unifi.Site, now time.Time) ([]*influx.Point, error) { - points := []*influx.Point{} - for _, s := range u.Health { +func (u *InfluxUnifi) batchSite(r report, s *unifi.Site) { + for _, h := range s.Health { tags := map[string]string{ - "name": u.Name, - "site_name": u.SiteName, - "desc": u.Desc, - "status": s.Status, - "subsystem": s.Subsystem, - "wan_ip": s.WanIP, - "netmask": s.Netmask, - "gw_name": s.GwName, - "gw_mac": s.GwMac, - "lan_ip": s.LanIP, + "name": s.Name, + "site_name": s.SiteName, + "desc": s.Desc, + "status": h.Status, + "subsystem": h.Subsystem, + "wan_ip": h.WanIP, + "gw_name": h.GwName, + "lan_ip": h.LanIP, } fields := map[string]interface{}{ - "num_user": s.NumUser.Val, - "num_guest": s.NumGuest.Val, - "num_iot": s.NumIot.Val, - "tx_bytes-r": s.TxBytesR.Val, - "rx_bytes-r": s.RxBytesR.Val, - "num_ap": s.NumAp.Val, - "num_adopted": s.NumAdopted.Val, - "num_disabled": s.NumDisabled.Val, - "num_disconnected": s.NumDisconnected.Val, - "num_pending": s.NumPending.Val, - "num_gw": s.NumGw.Val, - "wan_ip": s.WanIP, - "num_sta": s.NumSta.Val, - "gw_cpu": s.GwSystemStats.CPU.Val, - "gw_mem": s.GwSystemStats.Mem.Val, - "gw_uptime": s.GwSystemStats.Uptime.Val, - "latency": s.Latency.Val, - "uptime": s.Uptime.Val, - "drops": s.Drops.Val, - "xput_up": s.XputUp.Val, - "xput_down": s.XputDown.Val, - "speedtest_ping": s.SpeedtestPing.Val, - "speedtest_lastrun": s.SpeedtestLastrun.Val, - "num_sw": s.NumSw.Val, - "remote_user_num_active": s.RemoteUserNumActive.Val, - "remote_user_num_inactive": s.RemoteUserNumInactive.Val, - "remote_user_rx_bytes": s.RemoteUserRxBytes.Val, - "remote_user_tx_bytes": s.RemoteUserTxBytes.Val, - "remote_user_rx_packets": s.RemoteUserRxPackets.Val, - "remote_user_tx_packets": s.RemoteUserTxPackets.Val, - "num_new_alarms": u.NumNewAlarms.Val, + "num_user": h.NumUser.Val, + "num_guest": h.NumGuest.Val, + "num_iot": h.NumIot.Val, + "tx_bytes-r": h.TxBytesR.Val, + "rx_bytes-r": h.RxBytesR.Val, + "num_ap": h.NumAp.Val, + "num_adopted": h.NumAdopted.Val, + "num_disabled": h.NumDisabled.Val, + "num_disconnected": h.NumDisconnected.Val, + "num_pending": h.NumPending.Val, + "num_gw": h.NumGw.Val, + "wan_ip": h.WanIP, + "num_sta": h.NumSta.Val, + "gw_cpu": h.GwSystemStats.CPU.Val, + "gw_mem": h.GwSystemStats.Mem.Val, + "gw_uptime": h.GwSystemStats.Uptime.Val, + "latency": h.Latency.Val, + "uptime": h.Uptime.Val, + "drops": h.Drops.Val, + "xput_up": h.XputUp.Val, + "xput_down": h.XputDown.Val, + "speedtest_ping": h.SpeedtestPing.Val, + "speedtest_lastrun": h.SpeedtestLastrun.Val, + "num_sw": h.NumSw.Val, + "remote_user_num_active": h.RemoteUserNumActive.Val, + "remote_user_num_inactive": h.RemoteUserNumInactive.Val, + "remote_user_rx_bytes": h.RemoteUserRxBytes.Val, + "remote_user_tx_bytes": h.RemoteUserTxBytes.Val, + "remote_user_rx_packets": h.RemoteUserRxPackets.Val, + "remote_user_tx_packets": h.RemoteUserTxPackets.Val, + "num_new_alarms": s.NumNewAlarms.Val, } - pt, err := influx.NewPoint("subsystems", tags, fields, time.Now()) - if err != nil { - return points, err - } - points = append(points, pt) + r.send(&metric{Table: "subsystems", Tags: tags, Fields: fields}) } - return points, nil } diff --git a/pkg/influxunifi/uap.go b/pkg/influxunifi/uap.go index 423c00cb..fd1a1f96 100644 --- a/pkg/influxunifi/uap.go +++ b/pkg/influxunifi/uap.go @@ -1,103 +1,89 @@ package influxunifi import ( - "time" - - influx "github.com/influxdata/influxdb1-client/v2" "golift.io/unifi" ) -// UAPPoints generates Wireless-Access-Point datapoints for InfluxDB. +// batchUAP generates Wireless-Access-Point datapoints for InfluxDB. // These points can be passed directly to influx. -func UAPPoints(u *unifi.UAP, now time.Time) ([]*influx.Point, error) { - if u.Stat.Ap == nil { - u.Stat.Ap = &unifi.Ap{} +func (u *InfluxUnifi) batchUAP(r report, s *unifi.UAP) { + if s.Stat.Ap == nil { + s.Stat.Ap = &unifi.Ap{} } tags := map[string]string{ - "ip": u.IP, - "mac": u.Mac, - "site_id": u.SiteID, - "site_name": u.SiteName, - "name": u.Name, - "cfgversion": u.Cfgversion, - "model": u.Model, - "serial": u.Serial, - "type": u.Type, + "mac": s.Mac, + "site_name": s.SiteName, + "name": s.Name, + "version": s.Version, + "model": s.Model, + "serial": s.Serial, + "type": s.Type, } fields := map[string]interface{}{ - "ip": u.IP, - "bytes": u.Bytes.Val, - "last_seen": u.LastSeen.Val, - "rx_bytes": u.RxBytes.Val, - "tx_bytes": u.TxBytes.Val, - "uptime": u.Uptime.Val, - "state": u.State, - "user-num_sta": int(u.UserNumSta.Val), - "guest-num_sta": int(u.GuestNumSta.Val), - "num_sta": u.NumSta.Val, - "loadavg_1": u.SysStats.Loadavg1.Val, - "loadavg_5": u.SysStats.Loadavg5.Val, - "loadavg_15": u.SysStats.Loadavg15.Val, - "mem_buffer": u.SysStats.MemBuffer.Val, - "mem_total": u.SysStats.MemTotal.Val, - "mem_used": u.SysStats.MemUsed.Val, - "cpu": u.SystemStats.CPU.Val, - "mem": u.SystemStats.Mem.Val, - "system_uptime": u.SystemStats.Uptime.Val, + "ip": s.IP, + "bytes": s.Bytes.Val, + "last_seen": s.LastSeen.Val, + "rx_bytes": s.RxBytes.Val, + "tx_bytes": s.TxBytes.Val, + "uptime": s.Uptime.Val, + "state": s.State, + "user-num_sta": int(s.UserNumSta.Val), + "guest-num_sta": int(s.GuestNumSta.Val), + "num_sta": s.NumSta.Val, + "loadavg_1": s.SysStats.Loadavg1.Val, + "loadavg_5": s.SysStats.Loadavg5.Val, + "loadavg_15": s.SysStats.Loadavg15.Val, + "mem_buffer": s.SysStats.MemBuffer.Val, + "mem_total": s.SysStats.MemTotal.Val, + "mem_used": s.SysStats.MemUsed.Val, + "cpu": s.SystemStats.CPU.Val, + "mem": s.SystemStats.Mem.Val, + "system_uptime": s.SystemStats.Uptime.Val, // Accumulative Statistics. - "stat_user-rx_packets": u.Stat.Ap.UserRxPackets.Val, - "stat_guest-rx_packets": u.Stat.Ap.GuestRxPackets.Val, - "stat_rx_packets": u.Stat.Ap.RxPackets.Val, - "stat_user-rx_bytes": u.Stat.Ap.UserRxBytes.Val, - "stat_guest-rx_bytes": u.Stat.Ap.GuestRxBytes.Val, - "stat_rx_bytes": u.Stat.Ap.RxBytes.Val, - "stat_user-rx_errors": u.Stat.Ap.UserRxErrors.Val, - "stat_guest-rx_errors": u.Stat.Ap.GuestRxErrors.Val, - "stat_rx_errors": u.Stat.Ap.RxErrors.Val, - "stat_user-rx_dropped": u.Stat.Ap.UserRxDropped.Val, - "stat_guest-rx_dropped": u.Stat.Ap.GuestRxDropped.Val, - "stat_rx_dropped": u.Stat.Ap.RxDropped.Val, - "stat_user-rx_crypts": u.Stat.Ap.UserRxCrypts.Val, - "stat_guest-rx_crypts": u.Stat.Ap.GuestRxCrypts.Val, - "stat_rx_crypts": u.Stat.Ap.RxCrypts.Val, - "stat_user-rx_frags": u.Stat.Ap.UserRxFrags.Val, - "stat_guest-rx_frags": u.Stat.Ap.GuestRxFrags.Val, - "stat_rx_frags": u.Stat.Ap.RxFrags.Val, - "stat_user-tx_packets": u.Stat.Ap.UserTxPackets.Val, - "stat_guest-tx_packets": u.Stat.Ap.GuestTxPackets.Val, - "stat_tx_packets": u.Stat.Ap.TxPackets.Val, - "stat_user-tx_bytes": u.Stat.Ap.UserTxBytes.Val, - "stat_guest-tx_bytes": u.Stat.Ap.GuestTxBytes.Val, - "stat_tx_bytes": u.Stat.Ap.TxBytes.Val, - "stat_user-tx_errors": u.Stat.Ap.UserTxErrors.Val, - "stat_guest-tx_errors": u.Stat.Ap.GuestTxErrors.Val, - "stat_tx_errors": u.Stat.Ap.TxErrors.Val, - "stat_user-tx_dropped": u.Stat.Ap.UserTxDropped.Val, - "stat_guest-tx_dropped": u.Stat.Ap.GuestTxDropped.Val, - "stat_tx_dropped": u.Stat.Ap.TxDropped.Val, - "stat_user-tx_retries": u.Stat.Ap.UserTxRetries.Val, - "stat_guest-tx_retries": u.Stat.Ap.GuestTxRetries.Val, + "stat_user-rx_packets": s.Stat.Ap.UserRxPackets.Val, + "stat_guest-rx_packets": s.Stat.Ap.GuestRxPackets.Val, + "stat_rx_packets": s.Stat.Ap.RxPackets.Val, + "stat_user-rx_bytes": s.Stat.Ap.UserRxBytes.Val, + "stat_guest-rx_bytes": s.Stat.Ap.GuestRxBytes.Val, + "stat_rx_bytes": s.Stat.Ap.RxBytes.Val, + "stat_user-rx_errors": s.Stat.Ap.UserRxErrors.Val, + "stat_guest-rx_errors": s.Stat.Ap.GuestRxErrors.Val, + "stat_rx_errors": s.Stat.Ap.RxErrors.Val, + "stat_user-rx_dropped": s.Stat.Ap.UserRxDropped.Val, + "stat_guest-rx_dropped": s.Stat.Ap.GuestRxDropped.Val, + "stat_rx_dropped": s.Stat.Ap.RxDropped.Val, + "stat_user-rx_crypts": s.Stat.Ap.UserRxCrypts.Val, + "stat_guest-rx_crypts": s.Stat.Ap.GuestRxCrypts.Val, + "stat_rx_crypts": s.Stat.Ap.RxCrypts.Val, + "stat_user-rx_frags": s.Stat.Ap.UserRxFrags.Val, + "stat_guest-rx_frags": s.Stat.Ap.GuestRxFrags.Val, + "stat_rx_frags": s.Stat.Ap.RxFrags.Val, + "stat_user-tx_packets": s.Stat.Ap.UserTxPackets.Val, + "stat_guest-tx_packets": s.Stat.Ap.GuestTxPackets.Val, + "stat_tx_packets": s.Stat.Ap.TxPackets.Val, + "stat_user-tx_bytes": s.Stat.Ap.UserTxBytes.Val, + "stat_guest-tx_bytes": s.Stat.Ap.GuestTxBytes.Val, + "stat_tx_bytes": s.Stat.Ap.TxBytes.Val, + "stat_user-tx_errors": s.Stat.Ap.UserTxErrors.Val, + "stat_guest-tx_errors": s.Stat.Ap.GuestTxErrors.Val, + "stat_tx_errors": s.Stat.Ap.TxErrors.Val, + "stat_user-tx_dropped": s.Stat.Ap.UserTxDropped.Val, + "stat_guest-tx_dropped": s.Stat.Ap.GuestTxDropped.Val, + "stat_tx_dropped": s.Stat.Ap.TxDropped.Val, + "stat_user-tx_retries": s.Stat.Ap.UserTxRetries.Val, + "stat_guest-tx_retries": s.Stat.Ap.GuestTxRetries.Val, } - pt, err := influx.NewPoint("uap", tags, fields, now) - if err != nil { - return nil, err - } - morePoints, err := processVAPs(u.VapTable, u.RadioTable, u.RadioTableStats, u.Name, u.Mac, u.SiteName, now) - if err != nil { - return nil, err - } - return append(morePoints, pt), nil + r.send(&metric{Table: "uap", Tags: tags, Fields: fields}) + u.processVAPs(r, s.VapTable, s.RadioTable, s.RadioTableStats, s.Name, s.Mac, s.SiteName) } // processVAPs creates points for Wifi Radios. This works with several types of UAP-capable devices. -func processVAPs(vt unifi.VapTable, rt unifi.RadioTable, rts unifi.RadioTableStats, name, mac, sitename string, ts time.Time) ([]*influx.Point, error) { - tags := make(map[string]string) - fields := make(map[string]interface{}) - points := []*influx.Point{} - +func (u *InfluxUnifi) processVAPs(r report, vt unifi.VapTable, rt unifi.RadioTable, rts unifi.RadioTableStats, name, mac, sitename string) { // Loop each virtual AP (ESSID) and extract data for it // from radio_tables and radio_table_stats. for _, s := range vt { + tags := make(map[string]string) + fields := make(map[string]interface{}) tags["device_name"] = name tags["device_mac"] = mac tags["site_name"] = sitename @@ -113,7 +99,6 @@ func processVAPs(vt unifi.VapTable, rt unifi.RadioTable, rts unifi.RadioTableSta tags["is_guest"] = s.IsGuest.Txt fields["ccq"] = s.Ccq - fields["extchannel"] = s.Extchannel fields["mac_filter_rejections"] = s.MacFilterRejections fields["num_satisfaction_sta"] = s.NumSatisfactionSta.Val fields["avg_client_signal"] = s.AvgClientSignal.Val @@ -185,12 +170,6 @@ func processVAPs(vt unifi.VapTable, rt unifi.RadioTable, rts unifi.RadioTableSta fields["tx_retries"] = p.TxRetries.Val fields["user-num_sta"] = p.UserNumSta.Val } - - pt, err := influx.NewPoint("uap_vaps", tags, fields, ts) - if err != nil { - return points, err - } - points = append(points, pt) + r.send(&metric{Table: "uap_vaps", Tags: tags, Fields: fields}) } - return points, nil } diff --git a/pkg/influxunifi/udm.go b/pkg/influxunifi/udm.go index f679c922..4b01deb0 100644 --- a/pkg/influxunifi/udm.go +++ b/pkg/influxunifi/udm.go @@ -1,200 +1,163 @@ package influxunifi import ( - "time" - - influx "github.com/influxdata/influxdb1-client/v2" "golift.io/unifi" ) -// UDMPoints generates Unifi Gateway datapoints for InfluxDB. +// batchUDM generates Unifi Gateway datapoints for InfluxDB. // These points can be passed directly to influx. -func UDMPoints(u *unifi.UDM, now time.Time) ([]*influx.Point, error) { - if u.Stat.Sw == nil { - u.Stat.Sw = &unifi.Sw{} +func (u *InfluxUnifi) batchUDM(r report, s *unifi.UDM) { + if s.Stat.Sw == nil { + s.Stat.Sw = &unifi.Sw{} } - if u.Stat.Gw == nil { - u.Stat.Gw = &unifi.Gw{} + if s.Stat.Gw == nil { + s.Stat.Gw = &unifi.Gw{} } tags := map[string]string{ - "ip": u.IP, - "mac": u.Mac, - "site_id": u.SiteID, - "site_name": u.SiteName, - "name": u.Name, - "cfgversion": u.Cfgversion, - "model": u.Model, - "serial": u.Serial, - "type": u.Type, + "mac": s.Mac, + "site_name": s.SiteName, + "name": s.Name, + "version": s.Version, + "model": s.Model, + "serial": s.Serial, + "type": s.Type, } fields := map[string]interface{}{ - "ip": u.IP, - "bytes": u.Bytes.Val, - "last_seen": u.LastSeen.Val, - "license_state": u.LicenseState, - "fw_caps": u.FwCaps.Val, - "guest-num_sta": u.GuestNumSta.Val, - "rx_bytes": u.RxBytes.Val, - "tx_bytes": u.TxBytes.Val, - "uptime": u.Uptime.Val, - "state": u.State.Val, - "user-num_sta": u.UserNumSta.Val, - "num_sta": u.NumSta.Val, - "version": u.Version, - "num_desktop": u.NumDesktop.Val, - "num_handheld": u.NumHandheld.Val, - "num_mobile": u.NumMobile.Val, - "speedtest-status_latency": u.SpeedtestStatus.Latency.Val, - "speedtest-status_rundate": u.SpeedtestStatus.Rundate.Val, - "speedtest-status_runtime": u.SpeedtestStatus.Runtime.Val, - "speedtest-status_download": u.SpeedtestStatus.StatusDownload.Val, - "speedtest-status_ping": u.SpeedtestStatus.StatusPing.Val, - "speedtest-status_summary": u.SpeedtestStatus.StatusSummary.Val, - "speedtest-status_upload": u.SpeedtestStatus.StatusUpload.Val, - "speedtest-status_xput_download": u.SpeedtestStatus.XputDownload.Val, - "speedtest-status_xput_upload": u.SpeedtestStatus.XputUpload.Val, - "config_network_wan_type": u.ConfigNetwork.Type, - "wan1_bytes-r": u.Wan1.BytesR.Val, - "wan1_enable": u.Wan1.Enable.Val, - "wan1_full_duplex": u.Wan1.FullDuplex.Val, - "wan1_gateway": u.Wan1.Gateway, - "wan1_ifname": u.Wan1.Ifname, - "wan1_ip": u.Wan1.IP, - "wan1_mac": u.Wan1.Mac, - "wan1_max_speed": u.Wan1.MaxSpeed.Val, - "wan1_name": u.Wan1.Name, - "wan1_netmask": u.Wan1.Netmask, - "wan1_rx_bytes": u.Wan1.RxBytes.Val, - "wan1_rx_bytes-r": u.Wan1.RxBytesR.Val, - "wan1_rx_dropped": u.Wan1.RxDropped.Val, - "wan1_rx_errors": u.Wan1.RxErrors.Val, - "wan1_rx_multicast": u.Wan1.RxMulticast.Val, - "wan1_rx_packets": u.Wan1.RxPackets.Val, - "wan1_type": u.Wan1.Type, - "wan1_speed": u.Wan1.Speed.Val, - "wan1_up": u.Wan1.Up.Val, - "wan1_tx_bytes": u.Wan1.TxBytes.Val, - "wan1_tx_bytes-r": u.Wan1.TxBytesR.Val, - "wan1_tx_dropped": u.Wan1.TxDropped.Val, - "wan1_tx_errors": u.Wan1.TxErrors.Val, - "wan1_tx_packets": u.Wan1.TxPackets.Val, - "wan2_bytes-r": u.Wan2.BytesR.Val, - "wan2_enable": u.Wan2.Enable.Val, - "wan2_full_duplex": u.Wan2.FullDuplex.Val, - "wan2_gateway": u.Wan2.Gateway, - "wan2_ifname": u.Wan2.Ifname, - "wan2_ip": u.Wan2.IP, - "wan2_mac": u.Wan2.Mac, - "wan2_max_speed": u.Wan2.MaxSpeed.Val, - "wan2_name": u.Wan2.Name, - "wan2_netmask": u.Wan2.Netmask, - "wan2_rx_bytes": u.Wan2.RxBytes.Val, - "wan2_rx_bytes-r": u.Wan2.RxBytesR.Val, - "wan2_rx_dropped": u.Wan2.RxDropped.Val, - "wan2_rx_errors": u.Wan2.RxErrors.Val, - "wan2_rx_multicast": u.Wan2.RxMulticast.Val, - "wan2_rx_packets": u.Wan2.RxPackets.Val, - "wan2_type": u.Wan2.Type, - "wan2_speed": u.Wan2.Speed.Val, - "wan2_up": u.Wan2.Up.Val, - "wan2_tx_bytes": u.Wan2.TxBytes.Val, - "wan2_tx_bytes-r": u.Wan2.TxBytesR.Val, - "wan2_tx_dropped": u.Wan2.TxDropped.Val, - "wan2_tx_errors": u.Wan2.TxErrors.Val, - "wan2_tx_packets": u.Wan2.TxPackets.Val, - "loadavg_1": u.SysStats.Loadavg1.Val, - "loadavg_5": u.SysStats.Loadavg5.Val, - "loadavg_15": u.SysStats.Loadavg15.Val, - "mem_used": u.SysStats.MemUsed.Val, - "mem_buffer": u.SysStats.MemBuffer.Val, - "mem_total": u.SysStats.MemTotal.Val, - "cpu": u.SystemStats.CPU.Val, - "mem": u.SystemStats.Mem.Val, - "system_uptime": u.SystemStats.Uptime.Val, - "gw": u.Stat.Gw, - "lan-rx_bytes": u.Stat.LanRxBytes.Val, - "lan-rx_packets": u.Stat.LanRxPackets.Val, - "lan-tx_bytes": u.Stat.LanTxBytes.Val, - "lan-tx_packets": u.Stat.LanTxPackets.Val, - "wan-rx_bytes": u.Stat.WanRxBytes.Val, - "wan-rx_dropped": u.Stat.WanRxDropped.Val, - "wan-rx_packets": u.Stat.WanRxPackets.Val, - "wan-tx_bytes": u.Stat.WanTxBytes.Val, - "wan-tx_packets": u.Stat.WanTxPackets.Val, - "uplink_name": u.Uplink.Name, - "uplink_latency": u.Uplink.Latency.Val, - "uplink_speed": u.Uplink.Speed.Val, - "uplink_num_ports": u.Uplink.NumPort.Val, - "uplink_max_speed": u.Uplink.MaxSpeed.Val, + "ip": s.IP, + "bytes": s.Bytes.Val, + "last_seen": s.LastSeen.Val, + "license_state": s.LicenseState, + "guest-num_sta": s.GuestNumSta.Val, + "rx_bytes": s.RxBytes.Val, + "tx_bytes": s.TxBytes.Val, + "uptime": s.Uptime.Val, + "state": s.State.Val, + "user-num_sta": s.UserNumSta.Val, + "version": s.Version, + "num_desktop": s.NumDesktop.Val, + "num_handheld": s.NumHandheld.Val, + "num_mobile": s.NumMobile.Val, + "speedtest-status_latency": s.SpeedtestStatus.Latency.Val, + "speedtest-status_runtime": s.SpeedtestStatus.Runtime.Val, + "speedtest-status_ping": s.SpeedtestStatus.StatusPing.Val, + "speedtest-status_xput_download": s.SpeedtestStatus.XputDownload.Val, + "speedtest-status_xput_upload": s.SpeedtestStatus.XputUpload.Val, + "wan1_bytes-r": s.Wan1.BytesR.Val, + "wan1_enable": s.Wan1.Enable.Val, + "wan1_full_duplex": s.Wan1.FullDuplex.Val, + "wan1_gateway": s.Wan1.Gateway, + "wan1_ifname": s.Wan1.Ifname, + "wan1_ip": s.Wan1.IP, + "wan1_mac": s.Wan1.Mac, + "wan1_max_speed": s.Wan1.MaxSpeed.Val, + "wan1_name": s.Wan1.Name, + "wan1_rx_bytes": s.Wan1.RxBytes.Val, + "wan1_rx_bytes-r": s.Wan1.RxBytesR.Val, + "wan1_rx_dropped": s.Wan1.RxDropped.Val, + "wan1_rx_errors": s.Wan1.RxErrors.Val, + "wan1_rx_multicast": s.Wan1.RxMulticast.Val, + "wan1_rx_packets": s.Wan1.RxPackets.Val, + "wan1_type": s.Wan1.Type, + "wan1_speed": s.Wan1.Speed.Val, + "wan1_up": s.Wan1.Up.Val, + "wan1_tx_bytes": s.Wan1.TxBytes.Val, + "wan1_tx_bytes-r": s.Wan1.TxBytesR.Val, + "wan1_tx_dropped": s.Wan1.TxDropped.Val, + "wan1_tx_errors": s.Wan1.TxErrors.Val, + "wan1_tx_packets": s.Wan1.TxPackets.Val, + "wan2_bytes-r": s.Wan2.BytesR.Val, + "wan2_enable": s.Wan2.Enable.Val, + "wan2_full_duplex": s.Wan2.FullDuplex.Val, + "wan2_gateway": s.Wan2.Gateway, + "wan2_ifname": s.Wan2.Ifname, + "wan2_ip": s.Wan2.IP, + "wan2_mac": s.Wan2.Mac, + "wan2_max_speed": s.Wan2.MaxSpeed.Val, + "wan2_name": s.Wan2.Name, + "wan2_rx_bytes": s.Wan2.RxBytes.Val, + "wan2_rx_bytes-r": s.Wan2.RxBytesR.Val, + "wan2_rx_dropped": s.Wan2.RxDropped.Val, + "wan2_rx_errors": s.Wan2.RxErrors.Val, + "wan2_rx_multicast": s.Wan2.RxMulticast.Val, + "wan2_rx_packets": s.Wan2.RxPackets.Val, + "wan2_type": s.Wan2.Type, + "wan2_speed": s.Wan2.Speed.Val, + "wan2_up": s.Wan2.Up.Val, + "wan2_tx_bytes": s.Wan2.TxBytes.Val, + "wan2_tx_bytes-r": s.Wan2.TxBytesR.Val, + "wan2_tx_dropped": s.Wan2.TxDropped.Val, + "wan2_tx_errors": s.Wan2.TxErrors.Val, + "wan2_tx_packets": s.Wan2.TxPackets.Val, + "loadavg_1": s.SysStats.Loadavg1.Val, + "loadavg_5": s.SysStats.Loadavg5.Val, + "loadavg_15": s.SysStats.Loadavg15.Val, + "mem_used": s.SysStats.MemUsed.Val, + "mem_buffer": s.SysStats.MemBuffer.Val, + "mem_total": s.SysStats.MemTotal.Val, + "cpu": s.SystemStats.CPU.Val, + "mem": s.SystemStats.Mem.Val, + "system_uptime": s.SystemStats.Uptime.Val, + "lan-rx_bytes": s.Stat.LanRxBytes.Val, + "lan-rx_packets": s.Stat.LanRxPackets.Val, + "lan-tx_bytes": s.Stat.LanTxBytes.Val, + "lan-tx_packets": s.Stat.LanTxPackets.Val, + "wan-rx_bytes": s.Stat.WanRxBytes.Val, + "wan-rx_dropped": s.Stat.WanRxDropped.Val, + "wan-rx_packets": s.Stat.WanRxPackets.Val, + "wan-tx_bytes": s.Stat.WanTxBytes.Val, + "wan-tx_packets": s.Stat.WanTxPackets.Val, } - pt, err := influx.NewPoint("usg", tags, fields, now) - if err != nil { - return nil, err - } - points := []*influx.Point{pt} + r.send(&metric{Table: "usg", Tags: tags, Fields: fields}) + tags = map[string]string{ - "ip": u.IP, - "mac": u.Mac, - "site_id": u.SiteID, - "site_name": u.SiteName, - "name": u.Name, - "cfgversion": u.Cfgversion, - "model": u.Model, - "serial": u.Serial, - "type": u.Type, + "mac": s.Mac, + "site_name": s.SiteName, + "name": s.Name, + "version": s.Version, + "model": s.Model, + "serial": s.Serial, + "type": s.Type, } fields = map[string]interface{}{ - "fw_caps": u.FwCaps.Val, - "guest-num_sta": u.GuestLanNumSta.Val, - "ip": u.IP, - "bytes": u.Bytes.Val, - "fan_level": float64(0), - "general_temperature": float64(0), - "last_seen": u.LastSeen.Val, - "license_state": u.LicenseState, - "overheating": u.Overheating.Val, - "rx_bytes": u.RxBytes.Val, - "tx_bytes": u.TxBytes.Val, - "uptime": u.Uptime.Val, - "state": u.State.Val, - "user-num_sta": u.UserLanNumSta.Val, - "num_sta": u.LanNumSta.Val, - "version": u.Version, - "loadavg_1": u.SysStats.Loadavg1.Val, - "loadavg_5": u.SysStats.Loadavg5.Val, - "loadavg_15": u.SysStats.Loadavg15.Val, - "mem_buffer": u.SysStats.MemBuffer.Val, - "mem_used": u.SysStats.MemUsed.Val, - "mem_total": u.SysStats.MemTotal.Val, - "cpu": u.SystemStats.CPU.Val, - "mem": u.SystemStats.Mem.Val, - "system_uptime": u.SystemStats.Uptime.Val, - "stp_priority": u.StpPriority.Val, - "stat_bytes": u.Stat.Sw.Bytes.Val, - "stat_rx_bytes": u.Stat.Sw.RxBytes.Val, - "stat_rx_crypts": u.Stat.Sw.RxCrypts.Val, - "stat_rx_dropped": u.Stat.Sw.RxDropped.Val, - "stat_rx_errors": u.Stat.Sw.RxErrors.Val, - "stat_rx_frags": u.Stat.Sw.RxFrags.Val, - "stat_rx_packets": u.Stat.Sw.TxPackets.Val, - "stat_tx_bytes": u.Stat.Sw.TxBytes.Val, - "stat_tx_dropped": u.Stat.Sw.TxDropped.Val, - "stat_tx_errors": u.Stat.Sw.TxErrors.Val, - "stat_tx_packets": u.Stat.Sw.TxPackets.Val, - "stat_tx_retries": u.Stat.Sw.TxRetries.Val, + "guest-num_sta": s.GuestNumSta.Val, + "ip": s.IP, + "bytes": s.Bytes.Val, + "last_seen": s.LastSeen.Val, + "rx_bytes": s.RxBytes.Val, + "tx_bytes": s.TxBytes.Val, + "uptime": s.Uptime.Val, + "state": s.State.Val, + "user-num_sta": s.UserNumSta.Val, + "loadavg_1": s.SysStats.Loadavg1.Val, + "loadavg_5": s.SysStats.Loadavg5.Val, + "loadavg_15": s.SysStats.Loadavg15.Val, + "mem_buffer": s.SysStats.MemBuffer.Val, + "mem_used": s.SysStats.MemUsed.Val, + "mem_total": s.SysStats.MemTotal.Val, + "cpu": s.SystemStats.CPU.Val, + "mem": s.SystemStats.Mem.Val, + "system_uptime": s.SystemStats.Uptime.Val, + "stat_bytes": s.Stat.Sw.Bytes.Val, + "stat_rx_bytes": s.Stat.Sw.RxBytes.Val, + "stat_rx_crypts": s.Stat.Sw.RxCrypts.Val, + "stat_rx_dropped": s.Stat.Sw.RxDropped.Val, + "stat_rx_errors": s.Stat.Sw.RxErrors.Val, + "stat_rx_frags": s.Stat.Sw.RxFrags.Val, + "stat_rx_packets": s.Stat.Sw.TxPackets.Val, + "stat_tx_bytes": s.Stat.Sw.TxBytes.Val, + "stat_tx_dropped": s.Stat.Sw.TxDropped.Val, + "stat_tx_errors": s.Stat.Sw.TxErrors.Val, + "stat_tx_packets": s.Stat.Sw.TxPackets.Val, + "stat_tx_retries": s.Stat.Sw.TxRetries.Val, } - pt, err = influx.NewPoint("usw", tags, fields, now) - if err != nil { - return nil, err - } - points = append(points, pt) + r.send(&metric{Table: "usw", Tags: tags, Fields: fields}) - for _, p := range u.NetworkTable { + for _, p := range s.NetworkTable { tags := map[string]string{ - "device_name": u.Name, - "device_id": u.ID, - "device_mac": u.Mac, - "site_name": u.SiteName, + "device_name": s.Name, + "device_id": s.ID, + "device_mac": s.Mac, + "site_name": s.SiteName, "up": p.Up.Txt, "enabled": p.Enabled.Txt, "site_id": p.SiteID, @@ -222,25 +185,21 @@ func UDMPoints(u *unifi.UDM, now time.Time) ([]*influx.Point, error) { "ipv6_interface_type": p.Ipv6InterfaceType, "attr_hidden_id": p.AttrHiddenID, } - pt, err = influx.NewPoint("usg_networks", tags, fields, now) - if err != nil { - return points, err - } - points = append(points, pt) + r.send(&metric{Table: "usg_networks", Tags: tags, Fields: fields}) } - for _, p := range u.PortTable { + for _, p := range s.PortTable { tags := map[string]string{ - "site_id": u.SiteID, - "site_name": u.SiteName, - "device_name": u.Name, + "site_id": s.SiteID, + "site_name": s.SiteName, + "device_name": s.Name, "name": p.Name, "enable": p.Enable.Txt, "up": p.Up.Txt, "poe_mode": p.PoeMode, "port_poe": p.PortPoe.Txt, "port_idx": p.PortIdx.Txt, - "port_id": u.Name + " Port " + p.PortIdx.Txt, + "port_id": s.Name + " Port " + p.PortIdx.Txt, "poe_enable": p.PoeEnable.Txt, "flowctrl_rx": p.FlowctrlRx.Txt, "flowctrl_tx": p.FlowctrlTx.Txt, @@ -269,88 +228,75 @@ func UDMPoints(u *unifi.UDM, now time.Time) ([]*influx.Point, error) { "poe_power": p.PoePower.Val, "poe_voltage": p.PoeVoltage.Val, } - pt, err = influx.NewPoint("usw_ports", tags, fields, now) - if err != nil { - return points, err - } - points = append(points, pt) + r.send(&metric{Table: "usw_ports", Tags: tags, Fields: fields}) } - if u.Stat.Ap == nil { - return points, nil + if s.Stat.Ap == nil { + return // we're done now. the following code process UDM (non-pro) UAP data. } tags = map[string]string{ - "ip": u.IP, - "mac": u.Mac, - "site_id": u.SiteID, - "site_name": u.SiteName, - "name": u.Name, - "cfgversion": u.Cfgversion, - "model": u.Model, - "serial": u.Serial, - "type": u.Type, + "mac": s.Mac, + "site_name": s.SiteName, + "name": s.Name, + "version": s.Version, + "model": s.Model, + "serial": s.Serial, + "type": s.Type, } fields = map[string]interface{}{ - "ip": u.IP, - "bytes": u.Bytes.Val, - "last_seen": u.LastSeen.Val, - "rx_bytes": u.RxBytes.Val, - "tx_bytes": u.TxBytes.Val, - "uptime": u.Uptime.Val, - "state": int(u.State.Val), - "user-num_sta": int(u.UserWlanNumSta.Val), - "guest-num_sta": int(u.GuestWlanNumSta.Val), - "num_sta": u.WlanNumSta.Val, - "loadavg_1": u.SysStats.Loadavg1.Val, - "loadavg_5": u.SysStats.Loadavg5.Val, - "loadavg_15": u.SysStats.Loadavg15.Val, - "mem_buffer": u.SysStats.MemBuffer.Val, - "mem_total": u.SysStats.MemTotal.Val, - "mem_used": u.SysStats.MemUsed.Val, - "cpu": u.SystemStats.CPU.Val, - "mem": u.SystemStats.Mem.Val, - "system_uptime": u.SystemStats.Uptime.Val, + "ip": s.IP, + "bytes": s.Bytes.Val, + "last_seen": s.LastSeen.Val, + "rx_bytes": s.RxBytes.Val, + "tx_bytes": s.TxBytes.Val, + "uptime": s.Uptime.Val, + "state": int(s.State.Val), + "user-num_sta": int(s.UserWlanNumSta.Val), + "guest-num_sta": int(s.GuestWlanNumSta.Val), + "num_sta": s.WlanNumSta.Val, + "loadavg_1": s.SysStats.Loadavg1.Val, + "loadavg_5": s.SysStats.Loadavg5.Val, + "loadavg_15": s.SysStats.Loadavg15.Val, + "mem_buffer": s.SysStats.MemBuffer.Val, + "mem_total": s.SysStats.MemTotal.Val, + "mem_used": s.SysStats.MemUsed.Val, + "cpu": s.SystemStats.CPU.Val, + "mem": s.SystemStats.Mem.Val, + "system_uptime": s.SystemStats.Uptime.Val, // Accumulative Statistics. - "stat_user-rx_packets": u.Stat.Ap.UserRxPackets.Val, - "stat_guest-rx_packets": u.Stat.Ap.GuestRxPackets.Val, - "stat_rx_packets": u.Stat.Ap.RxPackets.Val, - "stat_user-rx_bytes": u.Stat.Ap.UserRxBytes.Val, - "stat_guest-rx_bytes": u.Stat.Ap.GuestRxBytes.Val, - "stat_rx_bytes": u.Stat.Ap.RxBytes.Val, - "stat_user-rx_errors": u.Stat.Ap.UserRxErrors.Val, - "stat_guest-rx_errors": u.Stat.Ap.GuestRxErrors.Val, - "stat_rx_errors": u.Stat.Ap.RxErrors.Val, - "stat_user-rx_dropped": u.Stat.Ap.UserRxDropped.Val, - "stat_guest-rx_dropped": u.Stat.Ap.GuestRxDropped.Val, - "stat_rx_dropped": u.Stat.Ap.RxDropped.Val, - "stat_user-rx_crypts": u.Stat.Ap.UserRxCrypts.Val, - "stat_guest-rx_crypts": u.Stat.Ap.GuestRxCrypts.Val, - "stat_rx_crypts": u.Stat.Ap.RxCrypts.Val, - "stat_user-rx_frags": u.Stat.Ap.UserRxFrags.Val, - "stat_guest-rx_frags": u.Stat.Ap.GuestRxFrags.Val, - "stat_rx_frags": u.Stat.Ap.RxFrags.Val, - "stat_user-tx_packets": u.Stat.Ap.UserTxPackets.Val, - "stat_guest-tx_packets": u.Stat.Ap.GuestTxPackets.Val, - "stat_tx_packets": u.Stat.Ap.TxPackets.Val, - "stat_user-tx_bytes": u.Stat.Ap.UserTxBytes.Val, - "stat_guest-tx_bytes": u.Stat.Ap.GuestTxBytes.Val, - "stat_tx_bytes": u.Stat.Ap.TxBytes.Val, - "stat_user-tx_errors": u.Stat.Ap.UserTxErrors.Val, - "stat_guest-tx_errors": u.Stat.Ap.GuestTxErrors.Val, - "stat_tx_errors": u.Stat.Ap.TxErrors.Val, - "stat_user-tx_dropped": u.Stat.Ap.UserTxDropped.Val, - "stat_guest-tx_dropped": u.Stat.Ap.GuestTxDropped.Val, - "stat_tx_dropped": u.Stat.Ap.TxDropped.Val, - "stat_user-tx_retries": u.Stat.Ap.UserTxRetries.Val, - "stat_guest-tx_retries": u.Stat.Ap.GuestTxRetries.Val, + "stat_user-rx_packets": s.Stat.Ap.UserRxPackets.Val, + "stat_guest-rx_packets": s.Stat.Ap.GuestRxPackets.Val, + "stat_rx_packets": s.Stat.Ap.RxPackets.Val, + "stat_user-rx_bytes": s.Stat.Ap.UserRxBytes.Val, + "stat_guest-rx_bytes": s.Stat.Ap.GuestRxBytes.Val, + "stat_rx_bytes": s.Stat.Ap.RxBytes.Val, + "stat_user-rx_errors": s.Stat.Ap.UserRxErrors.Val, + "stat_guest-rx_errors": s.Stat.Ap.GuestRxErrors.Val, + "stat_rx_errors": s.Stat.Ap.RxErrors.Val, + "stat_user-rx_dropped": s.Stat.Ap.UserRxDropped.Val, + "stat_guest-rx_dropped": s.Stat.Ap.GuestRxDropped.Val, + "stat_rx_dropped": s.Stat.Ap.RxDropped.Val, + "stat_user-rx_crypts": s.Stat.Ap.UserRxCrypts.Val, + "stat_guest-rx_crypts": s.Stat.Ap.GuestRxCrypts.Val, + "stat_rx_crypts": s.Stat.Ap.RxCrypts.Val, + "stat_user-rx_frags": s.Stat.Ap.UserRxFrags.Val, + "stat_guest-rx_frags": s.Stat.Ap.GuestRxFrags.Val, + "stat_rx_frags": s.Stat.Ap.RxFrags.Val, + "stat_user-tx_packets": s.Stat.Ap.UserTxPackets.Val, + "stat_guest-tx_packets": s.Stat.Ap.GuestTxPackets.Val, + "stat_tx_packets": s.Stat.Ap.TxPackets.Val, + "stat_user-tx_bytes": s.Stat.Ap.UserTxBytes.Val, + "stat_guest-tx_bytes": s.Stat.Ap.GuestTxBytes.Val, + "stat_tx_bytes": s.Stat.Ap.TxBytes.Val, + "stat_user-tx_errors": s.Stat.Ap.UserTxErrors.Val, + "stat_guest-tx_errors": s.Stat.Ap.GuestTxErrors.Val, + "stat_tx_errors": s.Stat.Ap.TxErrors.Val, + "stat_user-tx_dropped": s.Stat.Ap.UserTxDropped.Val, + "stat_guest-tx_dropped": s.Stat.Ap.GuestTxDropped.Val, + "stat_tx_dropped": s.Stat.Ap.TxDropped.Val, + "stat_user-tx_retries": s.Stat.Ap.UserTxRetries.Val, + "stat_guest-tx_retries": s.Stat.Ap.GuestTxRetries.Val, } - pt, err = influx.NewPoint("uap", tags, fields, now) - if err != nil { - return nil, err - } - uapPoints, err := processVAPs(*u.VapTable, *u.RadioTable, *u.RadioTableStats, u.Name, u.Mac, u.SiteName, now) - if err != nil { - return nil, err - } - return append(append(points, pt), uapPoints...), nil + r.send(&metric{Table: "uap", Tags: tags, Fields: fields}) + u.processVAPs(r, *s.VapTable, *s.RadioTable, *s.RadioTableStats, s.Name, s.Mac, s.SiteName) } diff --git a/pkg/influxunifi/usg.go b/pkg/influxunifi/usg.go index 81ae87dc..07a479d4 100644 --- a/pkg/influxunifi/usg.go +++ b/pkg/influxunifi/usg.go @@ -2,143 +2,118 @@ package influxunifi import ( "strings" - "time" - influx "github.com/influxdata/influxdb1-client/v2" "golift.io/unifi" ) -// USGPoints generates Unifi Gateway datapoints for InfluxDB. +// batchUSG generates Unifi Gateway datapoints for InfluxDB. // These points can be passed directly to influx. -func USGPoints(u *unifi.USG, now time.Time) ([]*influx.Point, error) { - if u.Stat.Gw == nil { - u.Stat.Gw = &unifi.Gw{} +func (u *InfluxUnifi) batchUSG(r report, s *unifi.USG) { + if s.Stat.Gw == nil { + s.Stat.Gw = &unifi.Gw{} } tags := map[string]string{ - "ip": u.IP, - "mac": u.Mac, - "site_id": u.SiteID, - "site_name": u.SiteName, - "name": u.Name, - "cfgversion": u.Cfgversion, - "model": u.Model, - "serial": u.Serial, - "type": u.Type, + "mac": s.Mac, + "site_name": s.SiteName, + "name": s.Name, + "version": s.Version, + "model": s.Model, + "serial": s.Serial, + "type": s.Type, } fields := map[string]interface{}{ - "ip": u.IP, - "bytes": u.Bytes.Val, - "last_seen": u.LastSeen.Val, - "license_state": u.LicenseState, - "fw_caps": u.FwCaps.Val, - "guest-num_sta": u.GuestNumSta.Val, - "rx_bytes": u.RxBytes.Val, - "tx_bytes": u.TxBytes.Val, - "uptime": u.Uptime.Val, - "roll_upgrade": u.Rollupgrade.Val, - "state": u.State.Val, - "upgradable": u.Upgradable.Val, - "user-num_sta": u.UserNumSta.Val, - "version": u.Version, - "num_desktop": u.NumDesktop.Val, - "num_handheld": u.NumHandheld.Val, - "num_mobile": u.NumMobile.Val, - "speedtest-status_latency": u.SpeedtestStatus.Latency.Val, - "speedtest-status_rundate": u.SpeedtestStatus.Rundate.Val, - "speedtest-status_runtime": u.SpeedtestStatus.Runtime.Val, - "speedtest-status_download": u.SpeedtestStatus.StatusDownload.Val, - "speedtest-status_ping": u.SpeedtestStatus.StatusPing.Val, - "speedtest-status_summary": u.SpeedtestStatus.StatusSummary.Val, - "speedtest-status_upload": u.SpeedtestStatus.StatusUpload.Val, - "speedtest-status_xput_download": u.SpeedtestStatus.XputDownload.Val, - "speedtest-status_xput_upload": u.SpeedtestStatus.XputUpload.Val, - "config_network_wan_type": u.ConfigNetwork.Type, - "wan1_bytes-r": u.Wan1.BytesR.Val, - "wan1_enable": u.Wan1.Enable.Val, - "wan1_full_duplex": u.Wan1.FullDuplex.Val, - "wan1_gateway": u.Wan1.Gateway, - "wan1_ifname": u.Wan1.Ifname, - "wan1_ip": u.Wan1.IP, - "wan1_mac": u.Wan1.Mac, - "wan1_max_speed": u.Wan1.MaxSpeed.Val, - "wan1_name": u.Wan1.Name, - "wan1_netmask": u.Wan1.Netmask, - "wan1_rx_bytes": u.Wan1.RxBytes.Val, - "wan1_rx_bytes-r": u.Wan1.RxBytesR.Val, - "wan1_rx_dropped": u.Wan1.RxDropped.Val, - "wan1_rx_errors": u.Wan1.RxErrors.Val, - "wan1_rx_multicast": u.Wan1.RxMulticast.Val, - "wan1_rx_packets": u.Wan1.RxPackets.Val, - "wan1_type": u.Wan1.Type, - "wan1_speed": u.Wan1.Speed.Val, - "wan1_up": u.Wan1.Up.Val, - "wan1_tx_bytes": u.Wan1.TxBytes.Val, - "wan1_tx_bytes-r": u.Wan1.TxBytesR.Val, - "wan1_tx_dropped": u.Wan1.TxDropped.Val, - "wan1_tx_errors": u.Wan1.TxErrors.Val, - "wan1_tx_packets": u.Wan1.TxPackets.Val, - "wan2_bytes-r": u.Wan2.BytesR.Val, - "wan2_enable": u.Wan2.Enable.Val, - "wan2_full_duplex": u.Wan2.FullDuplex.Val, - "wan2_gateway": u.Wan2.Gateway, - "wan2_ifname": u.Wan2.Ifname, - "wan2_ip": u.Wan2.IP, - "wan2_mac": u.Wan2.Mac, - "wan2_max_speed": u.Wan2.MaxSpeed.Val, - "wan2_name": u.Wan2.Name, - "wan2_netmask": u.Wan2.Netmask, - "wan2_rx_bytes": u.Wan2.RxBytes.Val, - "wan2_rx_bytes-r": u.Wan2.RxBytesR.Val, - "wan2_rx_dropped": u.Wan2.RxDropped.Val, - "wan2_rx_errors": u.Wan2.RxErrors.Val, - "wan2_rx_multicast": u.Wan2.RxMulticast.Val, - "wan2_rx_packets": u.Wan2.RxPackets.Val, - "wan2_type": u.Wan2.Type, - "wan2_speed": u.Wan2.Speed.Val, - "wan2_up": u.Wan2.Up.Val, - "wan2_tx_bytes": u.Wan2.TxBytes.Val, - "wan2_tx_bytes-r": u.Wan2.TxBytesR.Val, - "wan2_tx_dropped": u.Wan2.TxDropped.Val, - "wan2_tx_errors": u.Wan2.TxErrors.Val, - "wan2_tx_packets": u.Wan2.TxPackets.Val, - "loadavg_1": u.SysStats.Loadavg1.Val, - "loadavg_5": u.SysStats.Loadavg5.Val, - "loadavg_15": u.SysStats.Loadavg15.Val, - "mem_used": u.SysStats.MemUsed.Val, - "mem_buffer": u.SysStats.MemBuffer.Val, - "mem_total": u.SysStats.MemTotal.Val, - "cpu": u.SystemStats.CPU.Val, - "mem": u.SystemStats.Mem.Val, - "system_uptime": u.SystemStats.Uptime.Val, - "stat_duration": u.Stat.Duration.Val, - "stat_datetime": u.Stat.Datetime, - "gw": u.Stat.Gw, - "lan-rx_bytes": u.Stat.LanRxBytes.Val, - "lan-rx_packets": u.Stat.LanRxPackets.Val, - "lan-tx_bytes": u.Stat.LanTxBytes.Val, - "lan-tx_packets": u.Stat.LanTxPackets.Val, - "wan-rx_bytes": u.Stat.WanRxBytes.Val, - "wan-rx_dropped": u.Stat.WanRxDropped.Val, - "wan-rx_packets": u.Stat.WanRxPackets.Val, - "wan-tx_bytes": u.Stat.WanTxBytes.Val, - "wan-tx_packets": u.Stat.WanTxPackets.Val, - "uplink_name": u.Uplink.Name, - "uplink_latency": u.Uplink.Latency.Val, - "uplink_speed": u.Uplink.Speed.Val, - "uplink_num_ports": u.Uplink.NumPort.Val, - "uplink_max_speed": u.Uplink.MaxSpeed.Val, + "ip": s.IP, + "bytes": s.Bytes.Val, + "last_seen": s.LastSeen.Val, + "license_state": s.LicenseState, + "guest-num_sta": s.GuestNumSta.Val, + "rx_bytes": s.RxBytes.Val, + "tx_bytes": s.TxBytes.Val, + "uptime": s.Uptime.Val, + "state": s.State.Val, + "user-num_sta": s.UserNumSta.Val, + "version": s.Version, + "num_desktop": s.NumDesktop.Val, + "num_handheld": s.NumHandheld.Val, + "num_mobile": s.NumMobile.Val, + "speedtest-status_latency": s.SpeedtestStatus.Latency.Val, + "speedtest-status_runtime": s.SpeedtestStatus.Runtime.Val, + "speedtest-status_ping": s.SpeedtestStatus.StatusPing.Val, + "speedtest-status_xput_download": s.SpeedtestStatus.XputDownload.Val, + "speedtest-status_xput_upload": s.SpeedtestStatus.XputUpload.Val, + "wan1_bytes-r": s.Wan1.BytesR.Val, + "wan1_enable": s.Wan1.Enable.Val, + "wan1_full_duplex": s.Wan1.FullDuplex.Val, + "wan1_gateway": s.Wan1.Gateway, + "wan1_ifname": s.Wan1.Ifname, + "wan1_ip": s.Wan1.IP, + "wan1_mac": s.Wan1.Mac, + "wan1_max_speed": s.Wan1.MaxSpeed.Val, + "wan1_name": s.Wan1.Name, + "wan1_rx_bytes": s.Wan1.RxBytes.Val, + "wan1_rx_bytes-r": s.Wan1.RxBytesR.Val, + "wan1_rx_dropped": s.Wan1.RxDropped.Val, + "wan1_rx_errors": s.Wan1.RxErrors.Val, + "wan1_rx_multicast": s.Wan1.RxMulticast.Val, + "wan1_rx_packets": s.Wan1.RxPackets.Val, + "wan1_type": s.Wan1.Type, + "wan1_speed": s.Wan1.Speed.Val, + "wan1_up": s.Wan1.Up.Val, + "wan1_tx_bytes": s.Wan1.TxBytes.Val, + "wan1_tx_bytes-r": s.Wan1.TxBytesR.Val, + "wan1_tx_dropped": s.Wan1.TxDropped.Val, + "wan1_tx_errors": s.Wan1.TxErrors.Val, + "wan1_tx_packets": s.Wan1.TxPackets.Val, + "wan2_bytes-r": s.Wan2.BytesR.Val, + "wan2_enable": s.Wan2.Enable.Val, + "wan2_full_duplex": s.Wan2.FullDuplex.Val, + "wan2_gateway": s.Wan2.Gateway, + "wan2_ifname": s.Wan2.Ifname, + "wan2_ip": s.Wan2.IP, + "wan2_mac": s.Wan2.Mac, + "wan2_max_speed": s.Wan2.MaxSpeed.Val, + "wan2_name": s.Wan2.Name, + "wan2_rx_bytes": s.Wan2.RxBytes.Val, + "wan2_rx_bytes-r": s.Wan2.RxBytesR.Val, + "wan2_rx_dropped": s.Wan2.RxDropped.Val, + "wan2_rx_errors": s.Wan2.RxErrors.Val, + "wan2_rx_multicast": s.Wan2.RxMulticast.Val, + "wan2_rx_packets": s.Wan2.RxPackets.Val, + "wan2_type": s.Wan2.Type, + "wan2_speed": s.Wan2.Speed.Val, + "wan2_up": s.Wan2.Up.Val, + "wan2_tx_bytes": s.Wan2.TxBytes.Val, + "wan2_tx_bytes-r": s.Wan2.TxBytesR.Val, + "wan2_tx_dropped": s.Wan2.TxDropped.Val, + "wan2_tx_errors": s.Wan2.TxErrors.Val, + "wan2_tx_packets": s.Wan2.TxPackets.Val, + "loadavg_1": s.SysStats.Loadavg1.Val, + "loadavg_5": s.SysStats.Loadavg5.Val, + "loadavg_15": s.SysStats.Loadavg15.Val, + "mem_used": s.SysStats.MemUsed.Val, + "mem_buffer": s.SysStats.MemBuffer.Val, + "mem_total": s.SysStats.MemTotal.Val, + "cpu": s.SystemStats.CPU.Val, + "mem": s.SystemStats.Mem.Val, + "system_uptime": s.SystemStats.Uptime.Val, + "lan-rx_bytes": s.Stat.LanRxBytes.Val, + "lan-rx_packets": s.Stat.LanRxPackets.Val, + "lan-tx_bytes": s.Stat.LanTxBytes.Val, + "lan-tx_packets": s.Stat.LanTxPackets.Val, + "wan-rx_bytes": s.Stat.WanRxBytes.Val, + "wan-rx_dropped": s.Stat.WanRxDropped.Val, + "wan-rx_packets": s.Stat.WanRxPackets.Val, + "wan-tx_bytes": s.Stat.WanTxBytes.Val, + "wan-tx_packets": s.Stat.WanTxPackets.Val, } - pt, err := influx.NewPoint("usg", tags, fields, now) - if err != nil { - return nil, err - } - points := []*influx.Point{pt} - for _, p := range u.NetworkTable { + r.send(&metric{Table: "usg", Tags: tags, Fields: fields}) + + for _, p := range s.NetworkTable { tags := map[string]string{ - "device_name": u.Name, - "device_id": u.ID, - "device_mac": u.Mac, - "site_name": u.SiteName, + "device_name": s.Name, + "device_id": s.ID, + "device_mac": s.Mac, + "site_name": s.SiteName, "up": p.Up.Txt, "enabled": p.Enabled.Txt, "site_id": p.SiteID, @@ -156,18 +131,14 @@ func USGPoints(u *unifi.USG, now time.Time) ([]*influx.Point, error) { "tx_bytes": p.TxBytes.Val, "tx_packets": p.TxPackets.Val, } - pt, err = influx.NewPoint("usg_networks", tags, fields, now) - if err != nil { - return points, err - } - points = append(points, pt) + r.send(&metric{Table: "usg_networks", Tags: tags, Fields: fields}) } - for _, p := range u.PortTable { + for _, p := range s.PortTable { tags := map[string]string{ - "device_name": u.Name, - "device_id": u.ID, - "device_mac": u.Mac, - "site_name": u.SiteName, + "device_name": s.Name, + "device_id": s.ID, + "device_mac": s.Mac, + "site_name": s.SiteName, "name": p.Name, "ifname": p.Ifname, "ip": p.IP, @@ -189,11 +160,7 @@ func USGPoints(u *unifi.USG, now time.Time) ([]*influx.Point, error) { "rx_multicast": p.RxMulticast.Val, "dns_servers": strings.Join(p.DNS, ","), } - pt, err = influx.NewPoint("usg_ports", tags, fields, now) - if err != nil { - return points, err - } - points = append(points, pt) + r.send(&metric{Table: "usg_ports", Tags: tags, Fields: fields}) + } - return points, nil } diff --git a/pkg/influxunifi/usw.go b/pkg/influxunifi/usw.go index 7f392962..60a88853 100644 --- a/pkg/influxunifi/usw.go +++ b/pkg/influxunifi/usw.go @@ -1,91 +1,76 @@ package influxunifi import ( - "time" - - influx "github.com/influxdata/influxdb1-client/v2" "golift.io/unifi" ) -// USWPoints generates Unifi Switch datapoints for InfluxDB. +// batchUSW generates Unifi Switch datapoints for InfluxDB. // These points can be passed directly to influx. -func USWPoints(u *unifi.USW, now time.Time) ([]*influx.Point, error) { - if u.Stat.Sw == nil { - u.Stat.Sw = &unifi.Sw{} +func (u *InfluxUnifi) batchUSW(r report, s *unifi.USW) { + if s.Stat.Sw == nil { + s.Stat.Sw = &unifi.Sw{} } tags := map[string]string{ - "ip": u.IP, - "mac": u.Mac, - "site_id": u.SiteID, - "site_name": u.SiteName, - "name": u.Name, - "cfgversion": u.Cfgversion, - "model": u.Model, - "serial": u.Serial, - "type": u.Type, + "mac": s.Mac, + "site_name": s.SiteName, + "name": s.Name, + "version": s.Version, + "model": s.Model, + "serial": s.Serial, + "type": s.Type, } fields := map[string]interface{}{ - "fw_caps": u.FwCaps.Val, - "guest-num_sta": u.GuestNumSta.Val, - "ip": u.IP, - "bytes": u.Bytes.Val, - "fan_level": u.FanLevel.Val, - "general_temperature": u.GeneralTemperature.Val, - "last_seen": u.LastSeen.Val, - "license_state": u.LicenseState, - "overheating": u.Overheating.Val, - "rx_bytes": u.RxBytes.Val, - "tx_bytes": u.TxBytes.Val, - "uptime": u.Uptime.Val, - "state": u.State.Val, - "user-num_sta": u.UserNumSta.Val, - "version": u.Version, - "loadavg_1": u.SysStats.Loadavg1.Val, - "loadavg_5": u.SysStats.Loadavg5.Val, - "loadavg_15": u.SysStats.Loadavg15.Val, - "mem_buffer": u.SysStats.MemBuffer.Val, - "mem_used": u.SysStats.MemUsed.Val, - "mem_total": u.SysStats.MemTotal.Val, - "cpu": u.SystemStats.CPU.Val, - "mem": u.SystemStats.Mem.Val, - "stp_priority": u.StpPriority.Val, - "system_uptime": u.SystemStats.Uptime.Val, - "stat_bytes": u.Stat.Bytes.Val, - "stat_rx_bytes": u.Stat.RxBytes.Val, - "stat_rx_crypts": u.Stat.RxCrypts.Val, - "stat_rx_dropped": u.Stat.RxDropped.Val, - "stat_rx_errors": u.Stat.RxErrors.Val, - "stat_rx_frags": u.Stat.RxFrags.Val, - "stat_rx_packets": u.Stat.TxPackets.Val, - "stat_tx_bytes": u.Stat.TxBytes.Val, - "stat_tx_dropped": u.Stat.TxDropped.Val, - "stat_tx_errors": u.Stat.TxErrors.Val, - "stat_tx_packets": u.Stat.TxPackets.Val, - "stat_tx_retries": u.Stat.TxRetries.Val, - "uplink_depth": u.UplinkDepth.Txt, + "guest-num_sta": s.GuestNumSta.Val, + "ip": s.IP, + "bytes": s.Bytes.Val, + "fan_level": s.FanLevel.Val, + "general_temperature": s.GeneralTemperature.Val, + "last_seen": s.LastSeen.Val, + "rx_bytes": s.RxBytes.Val, + "tx_bytes": s.TxBytes.Val, + "uptime": s.Uptime.Val, + "state": s.State.Val, + "user-num_sta": s.UserNumSta.Val, + "loadavg_1": s.SysStats.Loadavg1.Val, + "loadavg_5": s.SysStats.Loadavg5.Val, + "loadavg_15": s.SysStats.Loadavg15.Val, + "mem_buffer": s.SysStats.MemBuffer.Val, + "mem_used": s.SysStats.MemUsed.Val, + "mem_total": s.SysStats.MemTotal.Val, + "cpu": s.SystemStats.CPU.Val, + "mem": s.SystemStats.Mem.Val, + "system_uptime": s.SystemStats.Uptime.Val, + "stat_bytes": s.Stat.Sw.Bytes.Val, + "stat_rx_bytes": s.Stat.Sw.RxBytes.Val, + "stat_rx_crypts": s.Stat.Sw.RxCrypts.Val, + "stat_rx_dropped": s.Stat.Sw.RxDropped.Val, + "stat_rx_errors": s.Stat.Sw.RxErrors.Val, + "stat_rx_frags": s.Stat.Sw.RxFrags.Val, + "stat_rx_packets": s.Stat.Sw.TxPackets.Val, + "stat_tx_bytes": s.Stat.Sw.TxBytes.Val, + "stat_tx_dropped": s.Stat.Sw.TxDropped.Val, + "stat_tx_errors": s.Stat.Sw.TxErrors.Val, + "stat_tx_packets": s.Stat.Sw.TxPackets.Val, + "stat_tx_retries": s.Stat.Sw.TxRetries.Val, } - pt, err := influx.NewPoint("usw", tags, fields, now) - if err != nil { - return nil, err - } - points := []*influx.Point{pt} - for _, p := range u.PortTable { + r.send(&metric{Table: "usw", Tags: tags, Fields: fields}) + + for _, p := range s.PortTable { + if !p.Up.Val || !p.Enable.Val { + continue // only record UP ports. + } tags := map[string]string{ - "site_id": u.SiteID, - "site_name": u.SiteName, - "device_name": u.Name, + "site_name": s.SiteName, + "device_name": s.Name, "name": p.Name, - "enable": p.Enable.Txt, - "up": p.Up.Txt, "poe_mode": p.PoeMode, "port_poe": p.PortPoe.Txt, "port_idx": p.PortIdx.Txt, - "port_id": u.Name + " Port " + p.PortIdx.Txt, + "port_id": s.Name + " Port " + p.PortIdx.Txt, "poe_enable": p.PoeEnable.Txt, "flowctrl_rx": p.FlowctrlRx.Txt, "flowctrl_tx": p.FlowctrlTx.Txt, "media": p.Media, - "poe_class": p.PoeClass, } fields := map[string]interface{}{ "dbytes_r": p.BytesR.Val, @@ -105,15 +90,12 @@ func USWPoints(u *unifi.USW, now time.Time) ([]*influx.Point, error) { "tx_errors": p.TxErrors.Val, "tx_multicast": p.TxMulticast.Val, "tx_packets": p.TxPackets.Val, - "poe_current": p.PoeCurrent.Val, - "poe_power": p.PoePower.Val, - "poe_voltage": p.PoeVoltage.Val, } - pt, err = influx.NewPoint("usw_ports", tags, fields, now) - if err != nil { - return points, err + if p.PoeEnable.Val && p.PortPoe.Val { + fields["poe_current"] = p.PoeCurrent.Val + fields["poe_power"] = p.PoePower.Val + fields["poe_voltage"] = p.PoeVoltage.Val } - points = append(points, pt) + r.send(&metric{Table: "usw_ports", Tags: tags, Fields: fields}) } - return points, nil } diff --git a/pkg/poller/config.go b/pkg/poller/config.go index 37ca9a90..2ded5926 100644 --- a/pkg/poller/config.go +++ b/pkg/poller/config.go @@ -13,7 +13,7 @@ import ( "time" "github.com/BurntSushi/toml" - influx "github.com/influxdata/influxdb1-client/v2" + "github.com/davidnewhall/unifi-poller/pkg/influxunifi" "github.com/spf13/pflag" "golift.io/unifi" yaml "gopkg.in/yaml.v2" @@ -41,7 +41,7 @@ const ENVConfigPrefix = "UP_" // UnifiPoller contains the application startup data, and auth info for UniFi & Influx. type UnifiPoller struct { - Influx influx.Client + Influx *influxunifi.InfluxUnifi Unifi *unifi.Unifi Flag *Flag Config *Config diff --git a/pkg/poller/influx.go b/pkg/poller/influx.go index 9b2872d2..6dbc15c3 100644 --- a/pkg/poller/influx.go +++ b/pkg/poller/influx.go @@ -1,21 +1,20 @@ package poller import ( - "crypto/tls" "fmt" + "time" "github.com/davidnewhall/unifi-poller/pkg/influxunifi" - "github.com/davidnewhall/unifi-poller/pkg/metrics" - influx "github.com/influxdata/influxdb1-client/v2" ) // GetInfluxDB returns an InfluxDB interface. func (u *UnifiPoller) GetInfluxDB() (err error) { - u.Influx, err = influx.NewHTTPClient(influx.HTTPConfig{ - Addr: u.Config.InfluxURL, - Username: u.Config.InfluxUser, - Password: u.Config.InfluxPass, - TLSConfig: &tls.Config{InsecureSkipVerify: u.Config.InfxBadSSL}, + u.Influx, err = influxunifi.New(&influxunifi.Config{ + Database: u.Config.InfluxDB, + User: u.Config.InfluxUser, + Pass: u.Config.InfluxPass, + BadSSL: u.Config.InfxBadSSL, + URL: u.Config.InfluxURL, }) if err != nil { return fmt.Errorf("influxdb: %v", err) @@ -35,46 +34,24 @@ func (u *UnifiPoller) CollectAndProcess() error { return err } u.AugmentMetrics(metrics) - err = u.ReportMetrics(metrics) - u.LogError(err, "processing metrics") - return err -} - -// ReportMetrics batches all the metrics and writes them to InfluxDB. -// This creates an InfluxDB writer, and returns an error if the write fails. -func (u *UnifiPoller) ReportMetrics(metrics *metrics.Metrics) error { - // Batch (and send) all the points. - m := &influxunifi.Metrics{Metrics: metrics} - // Make a new Influx Points Batcher. - var err error - m.BatchPoints, err = influx.NewBatchPoints(influx.BatchPointsConfig{Database: u.Config.InfluxDB}) + report, err := u.Influx.ReportMetrics(metrics) if err != nil { - return fmt.Errorf("influx.NewBatchPoints: %v", err) + u.LogError(err, "processing metrics") + return err } - for _, err := range m.ProcessPoints() { - u.LogError(err, "influx.ProcessPoints") - } - if err = u.Influx.Write(m.BatchPoints); err != nil { - return fmt.Errorf("influxdb.Write(points): %v", err) - } - u.LogInfluxReport(m) + u.LogInfluxReport(report) return nil } // LogInfluxReport writes a log message after exporting to influxdb. -func (u *UnifiPoller) LogInfluxReport(m *influxunifi.Metrics) { - var fields, points int - for _, p := range m.Points() { - points++ - i, _ := p.Fields() - fields += len(i) - } +func (u *UnifiPoller) LogInfluxReport(r *influxunifi.Report) { idsMsg := "" if u.Config.SaveIDS { - idsMsg = fmt.Sprintf("IDS Events: %d, ", len(m.IDSList)) + idsMsg = fmt.Sprintf("IDS Events: %d, ", len(r.Metrics.IDSList)) } - u.Logf("UniFi Measurements Recorded. Sites: %d, Clients: %d, "+ - "Wireless APs: %d, Gateways: %d, Switches: %d, %sPoints: %d, Fields: %d", - len(m.Sites), len(m.Clients), len(m.UAPs), - len(m.UDMs)+len(m.USGs), len(m.USWs), idsMsg, points, fields) + u.Logf("UniFi Metrics Recorded. Sites: %d, Clients: %d, "+ + "UAP: %d, USG/UDM: %d, USW: %d, %sPoints: %d, Fields: %d, Errs: %d, Elapsed: %v", + len(r.Metrics.Sites), len(r.Metrics.Clients), len(r.Metrics.UAPs), + len(r.Metrics.UDMs)+len(r.Metrics.USGs), len(r.Metrics.USWs), idsMsg, r.Total, + r.Fields, len(r.Errors), r.Elapsed.Round(time.Millisecond)) } diff --git a/pkg/poller/start.go b/pkg/poller/start.go index 342d4d58..ea68ba0b 100644 --- a/pkg/poller/start.go +++ b/pkg/poller/start.go @@ -21,14 +21,14 @@ func New() *UnifiPoller { InfluxPass: defaultInfluxPass, InfluxDB: defaultInfluxDB, UnifiUser: defaultUnifiUser, - UnifiPass: defaultUnifiUser, + UnifiPass: "", UnifiBase: defaultUnifiURL, Interval: Duration{defaultInterval}, Sites: []string{"all"}, SaveSites: true, HTTPListen: defaultHTTPListen, Namespace: appName, - }, Flag: &Flag{}, + }, Flag: &Flag{ConfigFile: DefaultConfFile}, } }