a few more steps
This commit is contained in:
parent
f6c73e7edb
commit
dd27f90a62
|
|
@ -3,19 +3,13 @@
|
||||||
package influx
|
package influx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"github.com/davidnewhall/unifi-poller/metrics"
|
||||||
|
|
||||||
client "github.com/influxdata/influxdb1-client/v2"
|
client "github.com/influxdata/influxdb1-client/v2"
|
||||||
"golift.io/unifi"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Metrics contains all the data from the controller and an influx endpoint to send it to.
|
// Metrics contains all the data from the controller and an influx endpoint to send it to.
|
||||||
type Metrics struct {
|
type Metrics struct {
|
||||||
TS time.Time
|
*metrics.Metrics
|
||||||
unifi.Sites
|
|
||||||
unifi.IDSList
|
|
||||||
unifi.Clients
|
|
||||||
*unifi.Devices
|
|
||||||
client.BatchPoints
|
client.BatchPoints
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -35,7 +29,7 @@ func (m *Metrics) ProcessPoints() []error {
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
case p == nil:
|
case p == nil:
|
||||||
default:
|
default:
|
||||||
m.BatchPoints.AddPoints(p)
|
m.AddPoints(p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,16 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golift.io/unifi"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Metrics is a type shared by the exporting and reporting packages.
|
||||||
|
type Metrics struct {
|
||||||
|
TS time.Time
|
||||||
|
unifi.Sites
|
||||||
|
unifi.IDSList
|
||||||
|
unifi.Clients
|
||||||
|
*unifi.Devices
|
||||||
|
}
|
||||||
|
|
@ -84,16 +84,17 @@ func (u *UnifiPoller) Run() (err error) {
|
||||||
}
|
}
|
||||||
u.Logf("Polling UniFi Controller at %s v%s as user %s. Sites: %v",
|
u.Logf("Polling UniFi Controller at %s v%s as user %s. Sites: %v",
|
||||||
u.Config.UnifiBase, u.Unifi.ServerVersion, u.Config.UnifiUser, u.Config.Sites)
|
u.Config.UnifiBase, u.Unifi.ServerVersion, u.Config.UnifiUser, u.Config.Sites)
|
||||||
if err = u.GetInfluxDB(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch strings.ToLower(u.Config.Mode) {
|
switch strings.ToLower(u.Config.Mode) {
|
||||||
case "influxlambda", "lambdainflux", "lambda_influx", "influx_lambda":
|
case "influxlambda", "lambdainflux", "lambda_influx", "influx_lambda":
|
||||||
|
if err = u.GetInfluxDB(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
u.Logf("Logging Measurements to InfluxDB at %s as user %s one time (lambda mode)",
|
u.Logf("Logging Measurements to InfluxDB at %s as user %s one time (lambda mode)",
|
||||||
u.Config.InfluxURL, u.Config.InfluxUser)
|
u.Config.InfluxURL, u.Config.InfluxUser)
|
||||||
u.LastCheck = time.Now()
|
u.LastCheck = time.Now()
|
||||||
return u.CollectAndProcess(u.ReportMetrics)
|
return u.CollectAndProcess(u.ReportMetrics)
|
||||||
|
|
||||||
case "prometheus", "exporter":
|
case "prometheus", "exporter":
|
||||||
u.Logf("Exporting Measurements at https://%s/metrics for Prometheus", u.Config.HTTPListen)
|
u.Logf("Exporting Measurements at https://%s/metrics for Prometheus", u.Config.HTTPListen)
|
||||||
u.Config.Mode = "http exporter"
|
u.Config.Mode = "http exporter"
|
||||||
|
|
@ -105,7 +106,11 @@ func (u *UnifiPoller) Run() (err error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return u.PollController(u.ExportMetrics)
|
return u.PollController(u.ExportMetrics)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
if err = u.GetInfluxDB(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
u.Logf("Logging Measurements to InfluxDB at %s as user %s", u.Config.InfluxURL, u.Config.InfluxUser)
|
u.Logf("Logging Measurements to InfluxDB at %s as user %s", u.Config.InfluxURL, u.Config.InfluxUser)
|
||||||
u.Config.Mode = "influx poller"
|
u.Config.Mode = "influx poller"
|
||||||
return u.PollController(u.ReportMetrics)
|
return u.PollController(u.ReportMetrics)
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/davidnewhall/unifi-poller/influx"
|
"github.com/davidnewhall/unifi-poller/influx"
|
||||||
|
"github.com/davidnewhall/unifi-poller/metrics"
|
||||||
|
"github.com/davidnewhall/unifi-poller/prometheus"
|
||||||
client "github.com/influxdata/influxdb1-client/v2"
|
client "github.com/influxdata/influxdb1-client/v2"
|
||||||
"golift.io/unifi"
|
"golift.io/unifi"
|
||||||
)
|
)
|
||||||
|
|
@ -47,7 +49,7 @@ FIRST:
|
||||||
// PollController runs forever, polling UniFi
|
// PollController runs forever, polling UniFi
|
||||||
// and pushing to influx OR exporting for prometheus.
|
// and pushing to influx OR exporting for prometheus.
|
||||||
// This is started by Run() after everything checks out.
|
// This is started by Run() after everything checks out.
|
||||||
func (u *UnifiPoller) PollController(process func(*influx.Metrics) error) error {
|
func (u *UnifiPoller) PollController(process func(*metrics.Metrics) error) error {
|
||||||
interval := u.Config.Interval.Round(time.Second)
|
interval := u.Config.Interval.Round(time.Second)
|
||||||
log.Printf("[INFO] Everything checks out! Poller started in %v mode, interval: %v", u.Config.Mode, interval)
|
log.Printf("[INFO] Everything checks out! Poller started in %v mode, interval: %v", u.Config.Mode, interval)
|
||||||
ticker := time.NewTicker(interval)
|
ticker := time.NewTicker(interval)
|
||||||
|
|
@ -65,7 +67,7 @@ func (u *UnifiPoller) PollController(process func(*influx.Metrics) error) error
|
||||||
_ = u.CollectAndProcess(process)
|
_ = u.CollectAndProcess(process)
|
||||||
}
|
}
|
||||||
if u.errorCount > 0 {
|
if u.errorCount > 0 {
|
||||||
return fmt.Errorf("controller or influxdb errors, stopping poller")
|
return fmt.Errorf("too many errors, stopping poller")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -77,7 +79,7 @@ func (u *UnifiPoller) PollController(process func(*influx.Metrics) error) error
|
||||||
// handle their own logging. An error is returned so the calling function may
|
// handle their own logging. An error is returned so the calling function may
|
||||||
// determine if there was a read or write error and act on it. This is currently
|
// determine if there was a read or write error and act on it. This is currently
|
||||||
// called in two places in this library. One returns an error, one does not.
|
// called in two places in this library. One returns an error, one does not.
|
||||||
func (u *UnifiPoller) CollectAndProcess(process func(*influx.Metrics) error) error {
|
func (u *UnifiPoller) CollectAndProcess(process func(*metrics.Metrics) error) error {
|
||||||
metrics, err := u.CollectMetrics()
|
metrics, err := u.CollectMetrics()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -91,9 +93,8 @@ func (u *UnifiPoller) CollectAndProcess(process func(*influx.Metrics) error) err
|
||||||
}
|
}
|
||||||
|
|
||||||
// CollectMetrics grabs all the measurements from a UniFi controller and returns them.
|
// CollectMetrics grabs all the measurements from a UniFi controller and returns them.
|
||||||
// This also creates an InfluxDB writer, and returns an error if that fails.
|
func (u *UnifiPoller) CollectMetrics() (*metrics.Metrics, error) {
|
||||||
func (u *UnifiPoller) CollectMetrics() (*influx.Metrics, error) {
|
m := &metrics.Metrics{TS: u.LastCheck} // At this point, it's the Current Check.
|
||||||
m := &influx.Metrics{TS: u.LastCheck} // At this point, it's the Current Check.
|
|
||||||
var err error
|
var err error
|
||||||
// Get the sites we care about.
|
// Get the sites we care about.
|
||||||
m.Sites, err = u.GetFilteredSites()
|
m.Sites, err = u.GetFilteredSites()
|
||||||
|
|
@ -108,16 +109,13 @@ func (u *UnifiPoller) CollectMetrics() (*influx.Metrics, error) {
|
||||||
u.LogError(err, "unifi.GetClients()")
|
u.LogError(err, "unifi.GetClients()")
|
||||||
m.Devices, err = u.Unifi.GetDevices(m.Sites)
|
m.Devices, err = u.Unifi.GetDevices(m.Sites)
|
||||||
u.LogError(err, "unifi.GetDevices()")
|
u.LogError(err, "unifi.GetDevices()")
|
||||||
// Make a new Influx Points Batcher.
|
|
||||||
m.BatchPoints, err = client.NewBatchPoints(client.BatchPointsConfig{Database: u.Config.InfluxDB})
|
|
||||||
u.LogError(err, "influx.NewBatchPoints")
|
|
||||||
return m, err
|
return m, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// AugmentMetrics is our middleware layer between collecting metrics and writing them.
|
// AugmentMetrics is our middleware layer between collecting metrics and writing them.
|
||||||
// This is where we can manipuate the returned data or make arbitrary decisions.
|
// This is where we can manipuate the returned data or make arbitrary decisions.
|
||||||
// This function currently adds parent device names to client metrics.
|
// This function currently adds parent device names to client metrics.
|
||||||
func (u *UnifiPoller) AugmentMetrics(metrics *influx.Metrics) error {
|
func (u *UnifiPoller) AugmentMetrics(metrics *metrics.Metrics) error {
|
||||||
if metrics == nil || metrics.Devices == nil || metrics.Clients == nil {
|
if metrics == nil || metrics.Devices == nil || metrics.Clients == nil {
|
||||||
return fmt.Errorf("nil metrics, augment impossible")
|
return fmt.Errorf("nil metrics, augment impossible")
|
||||||
}
|
}
|
||||||
|
|
@ -150,39 +148,64 @@ func (u *UnifiPoller) AugmentMetrics(metrics *influx.Metrics) error {
|
||||||
|
|
||||||
// ExportMetrics updates the internal metrics provided via
|
// ExportMetrics updates the internal metrics provided via
|
||||||
// HTTP at /metrics for prometheus collection.
|
// HTTP at /metrics for prometheus collection.
|
||||||
func (u *UnifiPoller) ExportMetrics(metrics *influx.Metrics) error {
|
func (u *UnifiPoller) ExportMetrics(metrics *metrics.Metrics) error {
|
||||||
/*
|
m := &prometheus.Metrics{Metrics: metrics}
|
||||||
This is where it gets complicated, and probably deserves its own package.
|
for _, err := range m.ProcessExports() {
|
||||||
*/
|
u.LogError(err, "prometheus.ProcessExports")
|
||||||
|
}
|
||||||
|
u.LogExportReport(m)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReportMetrics batches all the metrics and writes them to InfluxDB.
|
// LogExportReport writes a log line after exporting metrics via HTTP.
|
||||||
// Returns an error if the write to influx fails.
|
func (u *UnifiPoller) LogExportReport(m *prometheus.Metrics) {
|
||||||
func (u *UnifiPoller) ReportMetrics(metrics *influx.Metrics) error {
|
idsMsg := ""
|
||||||
// Batch (and send) all the points.
|
if u.Config.CollectIDS {
|
||||||
for _, err := range metrics.ProcessPoints() {
|
idsMsg = fmt.Sprintf(", IDS Events: %d, ", len(m.IDSList))
|
||||||
u.LogError(err, "metrics.ProcessPoints")
|
|
||||||
}
|
}
|
||||||
err := u.Influx.Write(metrics.BatchPoints)
|
u.Logf("UniFi Measurements Exported. Sites: %d, Clients: %d, "+
|
||||||
|
"Wireless APs: %d, Gateways: %d, Switches: %d%s",
|
||||||
|
len(m.Sites), len(m.Clients), len(m.UAPs),
|
||||||
|
len(m.UDMs)+len(m.USGs), len(m.USWs), idsMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReportMetrics batches all the metrics and writes them to InfluxDB.
|
||||||
|
// This creates an InfluxDB writer, and returns an error if the write fails.
|
||||||
|
func (u *UnifiPoller) ReportMetrics(metrics *metrics.Metrics) error {
|
||||||
|
// Batch (and send) all the points.
|
||||||
|
m := &influx.Metrics{Metrics: metrics}
|
||||||
|
// Make a new Influx Points Batcher.
|
||||||
|
var err error
|
||||||
|
m.BatchPoints, err = client.NewBatchPoints(client.BatchPointsConfig{Database: u.Config.InfluxDB})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
return fmt.Errorf("influx.NewBatchPoints: %v", err)
|
||||||
|
}
|
||||||
|
for _, err := range m.ProcessPoints() {
|
||||||
|
u.LogError(err, "influx.ProcessPoints")
|
||||||
|
}
|
||||||
|
if err = u.Influx.Write(m.BatchPoints); err != nil {
|
||||||
return fmt.Errorf("influxdb.Write(points): %v", err)
|
return fmt.Errorf("influxdb.Write(points): %v", err)
|
||||||
}
|
}
|
||||||
|
u.LogInfluxReport(m)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogInfluxReport writes a log message after exporting to influxdb.
|
||||||
|
func (u *UnifiPoller) LogInfluxReport(m *influx.Metrics) {
|
||||||
var fields, points int
|
var fields, points int
|
||||||
for _, p := range metrics.Points() {
|
for _, p := range m.Points() {
|
||||||
points++
|
points++
|
||||||
i, _ := p.Fields()
|
i, _ := p.Fields()
|
||||||
fields += len(i)
|
fields += len(i)
|
||||||
}
|
}
|
||||||
idsMsg := ""
|
idsMsg := ""
|
||||||
if u.Config.CollectIDS {
|
if u.Config.CollectIDS {
|
||||||
idsMsg = fmt.Sprintf("IDS Events: %d, ", len(metrics.IDSList))
|
idsMsg = fmt.Sprintf("IDS Events: %d, ", len(m.IDSList))
|
||||||
}
|
}
|
||||||
u.Logf("UniFi Measurements Recorded. Sites: %d, Clients: %d, "+
|
u.Logf("UniFi Measurements Recorded. Sites: %d, Clients: %d, "+
|
||||||
"Wireless APs: %d, Gateways: %d, Switches: %d, %sPoints: %d, Fields: %d",
|
"Wireless APs: %d, Gateways: %d, Switches: %d, %sPoints: %d, Fields: %d",
|
||||||
len(metrics.Sites), len(metrics.Clients), len(metrics.UAPs),
|
len(m.Sites), len(m.Clients), len(m.UAPs),
|
||||||
len(metrics.UDMs)+len(metrics.USGs), len(metrics.USWs), idsMsg, points, fields)
|
len(m.UDMs)+len(m.USGs), len(m.USWs), idsMsg, points, fields)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFilteredSites returns a list of sites to fetch data for.
|
// GetFilteredSites returns a list of sites to fetch data for.
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
package prometheus
|
||||||
|
|
||||||
|
import "github.com/davidnewhall/unifi-poller/metrics"
|
||||||
|
|
||||||
|
// Metrics contains all the data from the controller.
|
||||||
|
type Metrics struct {
|
||||||
|
*metrics.Metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessExports turns the data into exported data.
|
||||||
|
func (m *Metrics) ProcessExports() []error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue