diff --git a/pkg/influxunifi/metrics.go b/pkg/influxunifi/metrics.go index af8bd274..90a95a31 100644 --- a/pkg/influxunifi/metrics.go +++ b/pkg/influxunifi/metrics.go @@ -8,7 +8,6 @@ import ( "time" "github.com/davidnewhall/unifi-poller/pkg/metrics" - _ "github.com/influxdata/influxdb1-client" influx "github.com/influxdata/influxdb1-client/v2" ) @@ -60,6 +59,7 @@ func (u *InfluxUnifi) ReportMetrics(m *metrics.Metrics) (*Report, error) { go u.collect(r, r.ch) // Batch all the points. u.loopPoints(r) + r.wg.Wait() // wait for all points to finish batching! // Send all the points. if err = u.influx.Write(r.bp); err != nil { @@ -139,5 +139,4 @@ func (u *InfluxUnifi) loopPoints(r report) { u.batchUDM(r, s) } }() - r.wait() } diff --git a/pkg/influxunifi/report.go b/pkg/influxunifi/report.go index 42211c26..5d1e760d 100644 --- a/pkg/influxunifi/report.go +++ b/pkg/influxunifi/report.go @@ -21,14 +21,10 @@ type Report struct { bp influx.BatchPoints } -// satisfy gomnd -const one = 1 - // report is an internal interface that can be mocked and overrridden for tests. type report interface { add() done() - wait() send(m *metric) error(err error) batch(m *metric, pt *influx.Point) @@ -39,6 +35,9 @@ func (r *Report) metrics() *metrics.Metrics { return r.Metrics } +// satisfy gomnd +const one = 1 + func (r *Report) add() { r.wg.Add(one) } @@ -52,10 +51,6 @@ func (r *Report) send(m *metric) { r.ch <- m } -func (r *Report) wait() { - r.wg.Wait() -} - /* The following methods are not thread safe. */ func (r *Report) error(err error) { diff --git a/pkg/influxunifi/uap.go b/pkg/influxunifi/uap.go index 005ef640..dc7c6214 100644 --- a/pkg/influxunifi/uap.go +++ b/pkg/influxunifi/uap.go @@ -19,7 +19,7 @@ func (u *InfluxUnifi) batchUAP(r report, s *unifi.UAP) { "serial": s.Serial, "type": s.Type, } - fields := Combine(u.processUAPstats(r, s.Stat.Ap), u.batchSysStats(r, s.SysStats, s.SystemStats)) + fields := Combine(u.processUAPstats(s.Stat.Ap), u.batchSysStats(s.SysStats, s.SystemStats)) fields["ip"] = s.IP fields["bytes"] = s.Bytes.Val fields["last_seen"] = s.LastSeen.Val @@ -34,7 +34,7 @@ func (u *InfluxUnifi) batchUAP(r report, s *unifi.UAP) { u.processVAPs(r, tags, s.VapTable, s.RadioTable, s.RadioTableStats) } -func (u *InfluxUnifi) processUAPstats(r report, ap *unifi.Ap) map[string]interface{} { +func (u *InfluxUnifi) processUAPstats(ap *unifi.Ap) map[string]interface{} { // Accumulative Statistics. return map[string]interface{}{ "stat_user-rx_packets": ap.UserRxPackets.Val, diff --git a/pkg/influxunifi/udm.go b/pkg/influxunifi/udm.go index 66cf78d9..cbe43f1f 100644 --- a/pkg/influxunifi/udm.go +++ b/pkg/influxunifi/udm.go @@ -16,7 +16,7 @@ func Combine(in ...map[string]interface{}) map[string]interface{} { } // batchSysStats is used by all device types. -func (u *InfluxUnifi) batchSysStats(r report, s unifi.SysStats, ss unifi.SystemStats) map[string]interface{} { +func (u *InfluxUnifi) batchSysStats(s unifi.SysStats, ss unifi.SystemStats) map[string]interface{} { return map[string]interface{}{ "loadavg_1": s.Loadavg1.Val, "loadavg_5": s.Loadavg5.Val, @@ -72,7 +72,7 @@ func (u *InfluxUnifi) batchUDM(r report, s *unifi.UDM) { "lan-rx_packets": s.Stat.Gw.LanRxPackets.Val, "lan-tx_bytes": s.Stat.Gw.LanTxBytes.Val, "lan-tx_packets": s.Stat.Gw.LanTxPackets.Val, - }, u.batchSysStats(r, s.SysStats, s.SystemStats)) + }, u.batchSysStats(s.SysStats, s.SystemStats)) r.send(&metric{Table: "usg", Tags: tags, Fields: fields}) u.batchNetworkTable(r, tags, s.NetworkTable) u.batchUSGwans(r, tags, s.Wan1, s.Wan2) @@ -124,7 +124,7 @@ func (u *InfluxUnifi) batchUDM(r report, s *unifi.UDM) { "serial": s.Serial, "type": s.Type, } - fields = u.processUAPstats(r, s.Stat.Ap) + fields = u.processUAPstats(s.Stat.Ap) fields["ip"] = s.IP fields["bytes"] = s.Bytes.Val fields["last_seen"] = s.LastSeen.Val diff --git a/pkg/influxunifi/usg.go b/pkg/influxunifi/usg.go index 54f0142c..db3df2b7 100644 --- a/pkg/influxunifi/usg.go +++ b/pkg/influxunifi/usg.go @@ -73,7 +73,7 @@ func (u *InfluxUnifi) batchUSG(r report, s *unifi.USG) { r.send(&metric{Table: "usg_ports", Tags: t, Fields: f}) } */ - fields = Combine(fields, u.batchSysStats(r, s.SysStats, s.SystemStats)) + fields = Combine(fields, u.batchSysStats(s.SysStats, s.SystemStats)) r.send(&metric{Table: "usg", Tags: tags, Fields: fields}) u.batchNetworkTable(r, tags, s.NetworkTable) u.batchUSGwans(r, tags, s.Wan1, s.Wan2) diff --git a/pkg/influxunifi/usw.go b/pkg/influxunifi/usw.go index 2187c95e..ce669ca1 100644 --- a/pkg/influxunifi/usw.go +++ b/pkg/influxunifi/usw.go @@ -43,7 +43,7 @@ func (u *InfluxUnifi) batchUSW(r report, s *unifi.USW) { "stat_tx_errors": s.Stat.Sw.TxErrors.Val, "stat_tx_packets": s.Stat.Sw.TxPackets.Val, "stat_tx_retries": s.Stat.Sw.TxRetries.Val, - }, u.batchSysStats(r, s.SysStats, s.SystemStats)) + }, u.batchSysStats(s.SysStats, s.SystemStats)) r.send(&metric{Table: "usw", Tags: tags, Fields: fields}) u.batchPortTable(r, tags, s.PortTable) }