Add a run-once lambda mode.

This commit is contained in:
David Newhall II 2019-06-23 14:00:23 -07:00
parent 5eac7c49bd
commit 7f205065da
4 changed files with 68 additions and 63 deletions

View File

@ -33,6 +33,7 @@ type UnifiPoller struct {
ConfigFile string ConfigFile string
DumpJSON string DumpJSON string
ShowVer bool ShowVer bool
Lambda bool
Flag *pflag.FlagSet Flag *pflag.FlagSet
errorCount int errorCount int
influx.Client influx.Client
@ -49,12 +50,14 @@ type Metrics struct {
} }
// Config represents the data needed to poll a controller and report to influxdb. // Config represents the data needed to poll a controller and report to influxdb.
// This is all of the data stored in the config file.
type Config struct { type Config struct {
MaxErrors int `json:"max_errors,_omitempty" toml:"max_errors,_omitempty" xml:"max_errors" yaml:"max_errors"` MaxErrors int `json:"max_errors,_omitempty" toml:"max_errors,_omitempty" xml:"max_errors" yaml:"max_errors"`
Interval Duration `json:"interval,_omitempty" toml:"interval,_omitempty" xml:"interval" yaml:"interval"` Interval Duration `json:"interval,_omitempty" toml:"interval,_omitempty" xml:"interval" yaml:"interval"`
Debug bool `json:"debug" toml:"debug" xml:"debug" yaml:"debug"` Debug bool `json:"debug" toml:"debug" xml:"debug" yaml:"debug"`
Quiet bool `json:"quiet,_omitempty" toml:"quiet,_omitempty" xml:"quiet" yaml:"quiet"` Quiet bool `json:"quiet,_omitempty" toml:"quiet,_omitempty" xml:"quiet" yaml:"quiet"`
VerifySSL bool `json:"verify_ssl" toml:"verify_ssl" xml:"verify_ssl" yaml:"verify_ssl"` VerifySSL bool `json:"verify_ssl" toml:"verify_ssl" xml:"verify_ssl" yaml:"verify_ssl"`
Lambda bool `json:"lambda" toml:"lambda" xml:"lambda" yaml:"lambda"`
InfluxURL string `json:"influx_url,_omitempty" toml:"influx_url,_omitempty" xml:"influx_url" yaml:"influx_url"` InfluxURL string `json:"influx_url,_omitempty" toml:"influx_url,_omitempty" xml:"influx_url" yaml:"influx_url"`
InfluxUser string `json:"influx_user,_omitempty" toml:"influx_user,_omitempty" xml:"influx_user" yaml:"influx_user"` InfluxUser string `json:"influx_user,_omitempty" toml:"influx_user,_omitempty" xml:"influx_user" yaml:"influx_user"`
InfluxPass string `json:"influx_pass,_omitempty" toml:"influx_pass,_omitempty" xml:"influx_pass" yaml:"influx_pass"` InfluxPass string `json:"influx_pass,_omitempty" toml:"influx_pass,_omitempty" xml:"influx_pass" yaml:"influx_pass"`

View File

@ -6,30 +6,18 @@ import (
"strings" "strings"
) )
// hasErr checks a list of errors for a non-nil. // LogError logs an error and increments the error counter.
func hasErr(errs []error) bool { // Should be used in the poller loop.
for _, err := range errs { func (u *UnifiPoller) LogError(err error, prefix string) {
if err != nil { if err != nil {
return true u.errorCount++
} _ = log.Output(2, fmt.Sprintf("[ERROR] (%v/%v) %v: %v", u.errorCount, u.MaxErrors, prefix, err))
}
return false
}
// LogErrors writes a slice of errors, with a prefix, to log-out.
// It also increments the error counter.
func (u *UnifiPoller) LogErrors(errs []error, prefix string) {
for _, err := range errs {
if err != nil {
u.errorCount++
_ = log.Output(2, fmt.Sprintf("[ERROR] (%v/%v) %v: %v", u.errorCount, u.MaxErrors, prefix, err))
}
} }
} }
// StringInSlice returns true if a string is in a slice. // StringInSlice returns true if a string is in a slice.
func StringInSlice(str string, slc []string) bool { func StringInSlice(str string, slice []string) bool {
for _, s := range slc { for _, s := range slice {
if strings.EqualFold(s, str) { if strings.EqualFold(s, str) {
return true return true
} }
@ -51,7 +39,7 @@ func (u *UnifiPoller) LogDebugf(m string, v ...interface{}) {
} }
} }
// LogErrorf prints an error log entry. // LogErrorf prints an error log entry. This is used for external library logging.
func (u *UnifiPoller) LogErrorf(m string, v ...interface{}) { func (u *UnifiPoller) LogErrorf(m string, v ...interface{}) {
_ = log.Output(2, fmt.Sprintf("[ERROR] "+m, v...)) _ = log.Output(2, fmt.Sprintf("[ERROR] "+m, v...))
} }

View File

@ -78,6 +78,13 @@ func (u *UnifiPoller) Run() (err error) {
if err = u.GetInfluxDB(); err != nil { if err = u.GetInfluxDB(); err != nil {
return err return err
} }
if u.Lambda {
metrics, err := u.CollectMetrics()
if err != nil {
return err
}
return u.ReportMetrics(metrics)
}
return u.PollController() return u.PollController()
} }

View File

@ -42,46 +42,11 @@ FIRST:
func (u *UnifiPoller) PollController() error { func (u *UnifiPoller) PollController() error {
log.Println("[INFO] Everything checks out! Poller started, interval:", u.Interval.Round(time.Second)) log.Println("[INFO] Everything checks out! Poller started, interval:", u.Interval.Round(time.Second))
ticker := time.NewTicker(u.Interval.Round(time.Second)) ticker := time.NewTicker(u.Interval.Round(time.Second))
var err error
for range ticker.C { for range ticker.C {
m := &Metrics{} metrics, err := u.CollectMetrics()
// Get the sites we care about. if err == nil {
if m.Sites, err = u.GetFilteredSites(); err != nil { u.LogError(u.ReportMetrics(metrics), "reporting metrics")
u.LogErrors([]error{err}, "unifi.GetSites()")
} }
// Get all the points.
if m.Clients, err = u.GetClients(m.Sites); err != nil {
u.LogErrors([]error{err}, "unifi.GetClients()")
}
if m.Devices, err = u.GetDevices(m.Sites); err != nil {
u.LogErrors([]error{err}, "unifi.GetDevices()")
}
// Make a new Points Batcher.
m.BatchPoints, err = influx.NewBatchPoints(influx.BatchPointsConfig{Database: u.InfluxDB})
if err != nil {
u.LogErrors([]error{err}, "influx.NewBatchPoints")
continue
}
// Batch (and send) all the points.
if errs := m.SendPoints(); errs != nil && hasErr(errs) {
u.LogErrors(errs, "asset.Points()")
}
if err := u.Write(m.BatchPoints); err != nil {
u.LogErrors([]error{err}, "infdb.Write(bp)")
}
// Talk about the data.
var fieldcount, pointcount int
for _, p := range m.Points() {
pointcount++
i, _ := p.Fields()
fieldcount += len(i)
}
u.Logf("UniFi Measurements Recorded. Sites: %d, Clients: %d, "+
"Wireless APs: %d, Gateways: %d, Switches: %d, Points: %d, Fields: %d",
len(m.Sites), len(m.Clients), len(m.UAPs), len(m.USGs), len(m.USWs), pointcount, fieldcount)
if u.MaxErrors >= 0 && u.errorCount > u.MaxErrors { if u.MaxErrors >= 0 && u.errorCount > u.MaxErrors {
return errors.Errorf("reached maximum error count, stopping poller (%d > %d)", u.errorCount, u.MaxErrors) return errors.Errorf("reached maximum error count, stopping poller (%d > %d)", u.errorCount, u.MaxErrors)
} }
@ -89,10 +54,52 @@ func (u *UnifiPoller) PollController() error {
return nil return nil
} }
// SendPoints combines all device and client data into influxdb data points. // CollectMetrics grabs all the measurements from a UniFi controller and returns them.
// This also creates an InfluxDB writer, and retuns error if that fails.
func (u *UnifiPoller) CollectMetrics() (*Metrics, error) {
m := &Metrics{}
var err error
// Get the sites we care about.
m.Sites, err = u.GetFilteredSites()
u.LogError(err, "unifi.GetSites()")
// Get all the points.
m.Clients, err = u.GetClients(m.Sites)
u.LogError(err, "unifi.GetClients()")
m.Devices, err = u.GetDevices(m.Sites)
u.LogError(err, "unifi.GetDevices()")
// Make a new Influx Points Batcher.
m.BatchPoints, err = influx.NewBatchPoints(influx.BatchPointsConfig{Database: u.InfluxDB})
u.LogError(err, "influx.NewBatchPoints")
return m, err
}
// ReportMetrics batches all the metrics and writes them to InfluxDB.
// Returns an error if the write to influx fails.
func (u *UnifiPoller) ReportMetrics(metrics *Metrics) error {
// Batch (and send) all the points.
for _, err := range metrics.ProcessPoints() {
u.LogError(err, "asset.Points()")
}
err := u.Write(metrics.BatchPoints)
if err != nil {
return errors.Wrap(err, "infdb.Write(bp)")
}
var fields, points int
for _, p := range metrics.Points() {
points++
i, _ := p.Fields()
fields += len(i)
}
u.Logf("UniFi Measurements Recorded. Sites: %d, Clients: %d, "+
"Wireless APs: %d, Gateways: %d, Switches: %d, Points: %d, Fields: %d",
len(metrics.Sites), len(metrics.Clients), len(metrics.UAPs),
len(metrics.USGs), len(metrics.USWs), points, fields)
return nil
}
// ProcessPoints batches all device and client data into influxdb data points.
// Call this after you've collected all the data you care about. // Call this after you've collected all the data you care about.
// This sends all the batched points to InfluxDB. func (m *Metrics) ProcessPoints() (errs []error) {
func (m *Metrics) SendPoints() (errs []error) {
for _, asset := range m.Sites { for _, asset := range m.Sites {
errs = append(errs, m.processPoints(asset)) errs = append(errs, m.processPoints(asset))
} }
@ -114,7 +121,7 @@ func (m *Metrics) SendPoints() (errs []error) {
return return
} }
// processPoints is helper function for SendPoints. // processPoints is helper function for ProcessPoints.
func (m *Metrics) processPoints(asset Asset) error { func (m *Metrics) processPoints(asset Asset) error {
if asset == nil { if asset == nil {
return nil return nil