This commit is contained in:
davidnewhall2 2019-11-30 19:30:21 -08:00
parent 36e88dcaf4
commit ec3bae40be
6 changed files with 11 additions and 17 deletions

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/davidnewhall/unifi-poller/pkg/metrics"
_ "github.com/influxdata/influxdb1-client"
influx "github.com/influxdata/influxdb1-client/v2"
)
@ -60,6 +59,7 @@ func (u *InfluxUnifi) ReportMetrics(m *metrics.Metrics) (*Report, error) {
go u.collect(r, r.ch)
// Batch all the points.
u.loopPoints(r)
r.wg.Wait() // wait for all points to finish batching!
// Send all the points.
if err = u.influx.Write(r.bp); err != nil {
@ -139,5 +139,4 @@ func (u *InfluxUnifi) loopPoints(r report) {
u.batchUDM(r, s)
}
}()
r.wait()
}

View File

@ -21,14 +21,10 @@ type Report struct {
bp influx.BatchPoints
}
// satisfy gomnd
const one = 1
// report is an internal interface that can be mocked and overrridden for tests.
type report interface {
add()
done()
wait()
send(m *metric)
error(err error)
batch(m *metric, pt *influx.Point)
@ -39,6 +35,9 @@ func (r *Report) metrics() *metrics.Metrics {
return r.Metrics
}
// satisfy gomnd
const one = 1
func (r *Report) add() {
r.wg.Add(one)
}
@ -52,10 +51,6 @@ func (r *Report) send(m *metric) {
r.ch <- m
}
func (r *Report) wait() {
r.wg.Wait()
}
/* The following methods are not thread safe. */
func (r *Report) error(err error) {

View File

@ -19,7 +19,7 @@ func (u *InfluxUnifi) batchUAP(r report, s *unifi.UAP) {
"serial": s.Serial,
"type": s.Type,
}
fields := Combine(u.processUAPstats(r, s.Stat.Ap), u.batchSysStats(r, s.SysStats, s.SystemStats))
fields := Combine(u.processUAPstats(s.Stat.Ap), u.batchSysStats(s.SysStats, s.SystemStats))
fields["ip"] = s.IP
fields["bytes"] = s.Bytes.Val
fields["last_seen"] = s.LastSeen.Val
@ -34,7 +34,7 @@ func (u *InfluxUnifi) batchUAP(r report, s *unifi.UAP) {
u.processVAPs(r, tags, s.VapTable, s.RadioTable, s.RadioTableStats)
}
func (u *InfluxUnifi) processUAPstats(r report, ap *unifi.Ap) map[string]interface{} {
func (u *InfluxUnifi) processUAPstats(ap *unifi.Ap) map[string]interface{} {
// Accumulative Statistics.
return map[string]interface{}{
"stat_user-rx_packets": ap.UserRxPackets.Val,

View File

@ -16,7 +16,7 @@ func Combine(in ...map[string]interface{}) map[string]interface{} {
}
// batchSysStats is used by all device types.
func (u *InfluxUnifi) batchSysStats(r report, s unifi.SysStats, ss unifi.SystemStats) map[string]interface{} {
func (u *InfluxUnifi) batchSysStats(s unifi.SysStats, ss unifi.SystemStats) map[string]interface{} {
return map[string]interface{}{
"loadavg_1": s.Loadavg1.Val,
"loadavg_5": s.Loadavg5.Val,
@ -72,7 +72,7 @@ func (u *InfluxUnifi) batchUDM(r report, s *unifi.UDM) {
"lan-rx_packets": s.Stat.Gw.LanRxPackets.Val,
"lan-tx_bytes": s.Stat.Gw.LanTxBytes.Val,
"lan-tx_packets": s.Stat.Gw.LanTxPackets.Val,
}, u.batchSysStats(r, s.SysStats, s.SystemStats))
}, u.batchSysStats(s.SysStats, s.SystemStats))
r.send(&metric{Table: "usg", Tags: tags, Fields: fields})
u.batchNetworkTable(r, tags, s.NetworkTable)
u.batchUSGwans(r, tags, s.Wan1, s.Wan2)
@ -124,7 +124,7 @@ func (u *InfluxUnifi) batchUDM(r report, s *unifi.UDM) {
"serial": s.Serial,
"type": s.Type,
}
fields = u.processUAPstats(r, s.Stat.Ap)
fields = u.processUAPstats(s.Stat.Ap)
fields["ip"] = s.IP
fields["bytes"] = s.Bytes.Val
fields["last_seen"] = s.LastSeen.Val

View File

@ -73,7 +73,7 @@ func (u *InfluxUnifi) batchUSG(r report, s *unifi.USG) {
r.send(&metric{Table: "usg_ports", Tags: t, Fields: f})
}
*/
fields = Combine(fields, u.batchSysStats(r, s.SysStats, s.SystemStats))
fields = Combine(fields, u.batchSysStats(s.SysStats, s.SystemStats))
r.send(&metric{Table: "usg", Tags: tags, Fields: fields})
u.batchNetworkTable(r, tags, s.NetworkTable)
u.batchUSGwans(r, tags, s.Wan1, s.Wan2)

View File

@ -43,7 +43,7 @@ func (u *InfluxUnifi) batchUSW(r report, s *unifi.USW) {
"stat_tx_errors": s.Stat.Sw.TxErrors.Val,
"stat_tx_packets": s.Stat.Sw.TxPackets.Val,
"stat_tx_retries": s.Stat.Sw.TxRetries.Val,
}, u.batchSysStats(r, s.SysStats, s.SystemStats))
}, u.batchSysStats(s.SysStats, s.SystemStats))
r.send(&metric{Table: "usw", Tags: tags, Fields: fields})
u.batchPortTable(r, tags, s.PortTable)
}