Merge pull request #53 from davidnewhall/dn2_lambda

Add a run-once lambda mode.
This commit is contained in:
David Newhall II 2019-06-24 01:05:26 -07:00 committed by GitHub
commit 1a307e81d3
14 changed files with 188 additions and 106 deletions

View File

@ -15,6 +15,8 @@
*~
/package_build_*
/release
MANUAL
MANUAL.html
README
README.html
/unifi-poller_manual.html

View File

@ -7,7 +7,6 @@ MAINT=David Newhall II <david at sleepers dot pro>
DESC=This daemon polls a UniFi controller at a short interval and stores the collected measurements in an Influx Database.
GOLANGCI_LINT_ARGS=--enable-all -D gochecknoglobals
PACKAGE:=./cmd/$(BINARY)
LIBRARY:=./pkg/$(BINARY)
DOCKER_REPO=golift
MD2ROFF_BIN=github.com/github/hub/md2roff-bin
@ -46,9 +45,9 @@ clean:
man: $(BINARY).1.gz
$(BINARY).1.gz: md2roff
# Building man page. Build dependency first: md2roff
go run $(MD2ROFF_BIN) --manual $(BINARY) --version $(VERSION) --date "$$(date)" cmd/$(BINARY)/README.md
gzip -9nc cmd/$(BINARY)/README > $(BINARY).1.gz
mv cmd/$(BINARY)/README.html $(BINARY)_manual.html
go run $(MD2ROFF_BIN) --manual $(BINARY) --version $(VERSION) --date "$$(date)" examples/MANUAL.md
gzip -9nc examples/MANUAL > $(BINARY).1.gz
mv examples/MANUAL.html $(BINARY)_manual.html
md2roff:
go get $(MD2ROFF_BIN)
@ -148,7 +147,7 @@ $(BINARY).rb: v$(VERSION).tar.gz.sha256
# Run code tests and lint.
test: lint
# Testing.
go test -race -covermode=atomic $(PACKAGE) $(LIBRARY)
go test -race -covermode=atomic ./...
lint:
# Checking lint.
golangci-lint run $(GOLANGCI_LINT_ARGS)

View File

@ -0,0 +1,14 @@
package main
import (
"log"
unifipoller "github.com/davidnewhall/unifi-poller/pkg/unifi-poller"
)
// Keep it simple.
func main() {
if err := unifipoller.Start(); err != nil {
log.Fatalln("[ERROR]", err)
}
}

View File

@ -1,25 +0,0 @@
package main
import (
"fmt"
"log"
"os"
unifipoller "github.com/davidnewhall/unifi-poller/pkg/unifi-poller"
)
func main() {
log.SetFlags(log.LstdFlags)
unifi := &unifipoller.UnifiPoller{}
if unifi.ParseFlags(os.Args[1:]); unifi.ShowVer {
fmt.Printf("unifi-poller v%s\n", unifipoller.Version)
return // don't run anything else w/ version request.
}
if err := unifi.GetConfig(); err != nil {
unifi.Flag.Usage()
log.Fatalf("[ERROR] config file '%v': %v", unifi.ConfigFile, err)
}
if err := unifi.Run(); err != nil {
log.Fatalln("[ERROR]", err)
}
}

View File

@ -83,6 +83,21 @@ is provided so the application can be easily adapted to any environment.
errors will be logged. Using this with debug=true adds line numbers to
any error logs.
mode default: "influx"
* Value: influx
This default mode runs this application as a daemon. It will poll
the controller at the configured interval. Providing an invalid value
will run in this default mode.
* Value: influxlambda - (the only other available option right now)
Setting this value will invoke a run-once mode where the application
immediately polls the controller and reports the metrics to InfluxDB.
Then it exits. This mode is useful in an AWS Lambda or a crontab where
the execution timings are controlled. This mode may also be adapted
to run in other collector scripts and apps like telegraf or diamond.
This mode can also be combined with a "test database" in InfluxDB to
give yourself a "test config file" you may run ad-hoc to test changes.
max_errors default: 0
If you restart the UniFI controller, the poller will lose access until
it is restarted. Specifying a number greater than -1 for max_errors will

View File

@ -1,4 +1,4 @@
# unifi-poller primary configuration file. TOML FORMAT #
# UniFi Poller primary configuration file. TOML FORMAT #
# commented lines are defaults, uncomment to change. #
########################################################
@ -20,7 +20,18 @@
# Recommend enabling debug with this setting for better error logging.
#quiet = false
# If the poller experiences an error from the UniFi Controller or from InfluxDB
# Which mode to run this application in. The default mode is "influx". Providing
# an invalid mode will also result in "influx". In this default mode the application
# runs as a daemon and polls the controller at the configured interval.
#
# There is only one other option at this time: "influxlambda"
#
# Lambda mode makes the application exit after collecting and reporting metrics
# to InfluxDB one time. This mode requires an external process like an AWS Lambda
# or a simple crontab to keep the timings accurate on UniFi Poller run intervals.
#mode = "influx"
# If the poller experiences an error from the UniFi controller or from InfluxDB
# it will exit. If you do not want it to exit, change max_errors to -1. You can
# adjust the config to tolerate more errors by setting this to a higher value.
# Recommend setting this between 0 and 5. See man page for more explanation.

View File

@ -3,6 +3,7 @@
"interval": "30s",
"debug": false,
"quiet": false,
"mode": "influx",
"max_errors": 0,
"influx_url": "http://127.0.0.1:8086",
"influx_user": "unifi",

View File

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
#######################################################
# unifi-poller primary configuration file. XML FORMAT #
# UniFi Poller primary configuration file. XML FORMAT #
# provided values are defaults #
#######################################################
-->
@ -36,7 +36,20 @@
<quiet>false</quiet>
<!--
# If the poller experiences an error from the UniFi Controller or from InfluxDB
# Which mode to run this application in. The default mode is "influx". Providing
# an invalid mode will also result in "influx". In this default mode the application
# runs as a daemon and polls the controller at the configured interval.
#
# There is only one other option at this time: "influxlambda"
#
# Lambda mode makes the application exit after collecting and reporting metrics
# to InfluxDB one time. This mode requires an external process like an AWS Lambda
# or a simple crontab to keep the timings accurate on UniFi Poller run intervals.
-->
<mode>influx</mode>
<!--
# If the poller experiences an error from the UniFi controller or from InfluxDB
# it will exit. If you do not want it to exit, change max_errors to -1. You can
# adjust the config to tolerate more errors by setting this to a higher value.
# Recommend setting this between 0 and 5. See man page for more explanation.

View File

@ -1,4 +1,4 @@
# unifi-poller primary configuration file. YAML FORMAT #
# UniFi Poller primary configuration file. YAML FORMAT #
# provided values are defaults #
########################################################
---
@ -21,7 +21,18 @@ debug: false
# Recommend enabling debug with this setting for better error logging.
quiet: false
# If the poller experiences an error from the UniFi Controller or from InfluxDB
# Which mode to run this application in. The default mode is "influx". Providing
# an invalid mode will also result in "influx". In this default mode the application
# runs as a daemon and polls the controller at the configured interval.
#
# There is only one other option at this time: "influxlambda"
#
# Lambda mode makes the application exit after collecting and reporting metrics
# to InfluxDB one time. This mode requires an external process like an AWS Lambda
# or a simple crontab to keep the timings accurate on UniFi Poller run intervals.
mode: "influx"
# If the poller experiences an error from the UniFi controller or from InfluxDB
# it will exit. If you do not want it to exit, change max_errors to -1. You can
# adjust the config to tolerate more errors by setting this to a higher value.
# Recommend setting this between 0 and 5. See man page for more explanation.

View File

@ -49,12 +49,14 @@ type Metrics struct {
}
// Config represents the data needed to poll a controller and report to influxdb.
// This is all of the data stored in the config file.
type Config struct {
MaxErrors int `json:"max_errors,_omitempty" toml:"max_errors,_omitempty" xml:"max_errors" yaml:"max_errors"`
Interval Duration `json:"interval,_omitempty" toml:"interval,_omitempty" xml:"interval" yaml:"interval"`
Debug bool `json:"debug" toml:"debug" xml:"debug" yaml:"debug"`
Quiet bool `json:"quiet,_omitempty" toml:"quiet,_omitempty" xml:"quiet" yaml:"quiet"`
VerifySSL bool `json:"verify_ssl" toml:"verify_ssl" xml:"verify_ssl" yaml:"verify_ssl"`
Mode string `json:"mode" toml:"mode" xml:"mode" yaml:"mode"`
InfluxURL string `json:"influx_url,_omitempty" toml:"influx_url,_omitempty" xml:"influx_url" yaml:"influx_url"`
InfluxUser string `json:"influx_user,_omitempty" toml:"influx_user,_omitempty" xml:"influx_user" yaml:"influx_user"`
InfluxPass string `json:"influx_pass,_omitempty" toml:"influx_pass,_omitempty" xml:"influx_pass" yaml:"influx_pass"`

View File

@ -6,30 +6,18 @@ import (
"strings"
)
// hasErr checks a list of errors for a non-nil.
func hasErr(errs []error) bool {
for _, err := range errs {
if err != nil {
return true
}
}
return false
}
// LogErrors writes a slice of errors, with a prefix, to log-out.
// It also increments the error counter.
func (u *UnifiPoller) LogErrors(errs []error, prefix string) {
for _, err := range errs {
if err != nil {
u.errorCount++
_ = log.Output(2, fmt.Sprintf("[ERROR] (%v/%v) %v: %v", u.errorCount, u.MaxErrors, prefix, err))
}
// LogError logs an error and increments the error counter.
// Should be used in the poller loop.
func (u *UnifiPoller) LogError(err error, prefix string) {
if err != nil {
u.errorCount++
_ = log.Output(2, fmt.Sprintf("[ERROR] (%v/%v) %v: %v", u.errorCount, u.MaxErrors, prefix, err))
}
}
// StringInSlice returns true if a string is in a slice.
func StringInSlice(str string, slc []string) bool {
for _, s := range slc {
func StringInSlice(str string, slice []string) bool {
for _, s := range slice {
if strings.EqualFold(s, str) {
return true
}
@ -51,7 +39,7 @@ func (u *UnifiPoller) LogDebugf(m string, v ...interface{}) {
}
}
// LogErrorf prints an error log entry.
// LogErrorf prints an error log entry. This is used for external library logging.
func (u *UnifiPoller) LogErrorf(m string, v ...interface{}) {
_ = log.Output(2, fmt.Sprintf("[ERROR] "+m, v...))
}

View File

@ -11,7 +11,11 @@ import (
)
// CheckSites makes sure the list of provided sites exists on the controller.
// This does not run in Lambda (run-once) mode.
func (u *UnifiPoller) CheckSites() error {
if strings.Contains(strings.ToLower(u.Mode), "lambda") {
return nil // Skip this in lambda mode.
}
sites, err := u.GetSites()
if err != nil {
return err
@ -32,7 +36,8 @@ FIRST:
continue FIRST
}
}
return errors.Errorf("configured site not found on controller: %v", s)
// This is fine, it may get added later.
u.LogErrorf("configured site not found on controller: %v", s)
}
return nil
}
@ -42,46 +47,8 @@ FIRST:
func (u *UnifiPoller) PollController() error {
log.Println("[INFO] Everything checks out! Poller started, interval:", u.Interval.Round(time.Second))
ticker := time.NewTicker(u.Interval.Round(time.Second))
var err error
for range ticker.C {
m := &Metrics{}
// Get the sites we care about.
if m.Sites, err = u.GetFilteredSites(); err != nil {
u.LogErrors([]error{err}, "unifi.GetSites()")
}
// Get all the points.
if m.Clients, err = u.GetClients(m.Sites); err != nil {
u.LogErrors([]error{err}, "unifi.GetClients()")
}
if m.Devices, err = u.GetDevices(m.Sites); err != nil {
u.LogErrors([]error{err}, "unifi.GetDevices()")
}
// Make a new Points Batcher.
m.BatchPoints, err = influx.NewBatchPoints(influx.BatchPointsConfig{Database: u.InfluxDB})
if err != nil {
u.LogErrors([]error{err}, "influx.NewBatchPoints")
continue
}
// Batch (and send) all the points.
if errs := m.SendPoints(); errs != nil && hasErr(errs) {
u.LogErrors(errs, "asset.Points()")
}
if err := u.Write(m.BatchPoints); err != nil {
u.LogErrors([]error{err}, "infdb.Write(bp)")
}
// Talk about the data.
var fieldcount, pointcount int
for _, p := range m.Points() {
pointcount++
i, _ := p.Fields()
fieldcount += len(i)
}
u.Logf("UniFi Measurements Recorded. Sites: %d, Clients: %d, "+
"Wireless APs: %d, Gateways: %d, Switches: %d, Points: %d, Fields: %d",
len(m.Sites), len(m.Clients), len(m.UAPs), len(m.USGs), len(m.USWs), pointcount, fieldcount)
_ = u.CollectAndReport()
if u.MaxErrors >= 0 && u.errorCount > u.MaxErrors {
return errors.Errorf("reached maximum error count, stopping poller (%d > %d)", u.errorCount, u.MaxErrors)
}
@ -89,10 +56,73 @@ func (u *UnifiPoller) PollController() error {
return nil
}
// SendPoints combines all device and client data into influxdb data points.
// CollectAndReport collects measurements and reports them to influxdb.
// Can be called once or in a ticker loop. This function and all the ones below
// handle their own logging. An error is returned so the calling function may
// determine if there was a read or write erorr and act on it. This is currently
// called in two places in this library. One returns an error, one does not.
func (u *UnifiPoller) CollectAndReport() error {
metrics, err := u.CollectMetrics()
if err != nil {
return err
}
err = u.ReportMetrics(metrics)
u.LogError(err, "reporting metrics")
return err
}
// CollectMetrics grabs all the measurements from a UniFi controller and returns them.
// This also creates an InfluxDB writer, and returns an error if that fails.
func (u *UnifiPoller) CollectMetrics() (*Metrics, error) {
m := &Metrics{}
var err error
// Get the sites we care about.
m.Sites, err = u.GetFilteredSites()
u.LogError(err, "unifi.GetSites()")
// Get all the points.
m.Clients, err = u.GetClients(m.Sites)
u.LogError(err, "unifi.GetClients()")
m.Devices, err = u.GetDevices(m.Sites)
u.LogError(err, "unifi.GetDevices()")
// Make a new Influx Points Batcher.
m.BatchPoints, err = influx.NewBatchPoints(influx.BatchPointsConfig{Database: u.InfluxDB})
u.LogError(err, "influx.NewBatchPoints")
return m, err
}
// ReportMetrics batches all the metrics and writes them to InfluxDB.
// Returns an error if the write to influx fails.
func (u *UnifiPoller) ReportMetrics(metrics *Metrics) error {
// Batch (and send) all the points.
for _, err := range metrics.ProcessPoints() {
u.LogError(err, "asset.Points()")
}
err := u.Write(metrics.BatchPoints)
if err != nil {
return errors.Wrap(err, "influxdb.Write(points)")
}
var fields, points int
for _, p := range metrics.Points() {
points++
i, _ := p.Fields()
fields += len(i)
}
u.Logf("UniFi Measurements Recorded. Sites: %d, Clients: %d, "+
"Wireless APs: %d, Gateways: %d, Switches: %d, Points: %d, Fields: %d",
len(metrics.Sites), len(metrics.Clients), len(metrics.UAPs),
len(metrics.USGs), len(metrics.USWs), points, fields)
return nil
}
// ProcessPoints batches all device and client data into influxdb data points.
// Call this after you've collected all the data you care about.
// This sends all the batched points to InfluxDB.
func (m *Metrics) SendPoints() (errs []error) {
// This function is sorta weird and returns a slice of errors. The reasoning is
// that some points may process while others fail, so we attempt to process them
// all. This is (usually) run in a loop, so we can't really exit on error,
// we just log the errors and tally them on a counter. In reality, this never
// returns any errors because we control the data going in; cool right? But we
// still check&log it in case the data going is skewed up and causes errors!
func (m *Metrics) ProcessPoints() (errs []error) {
for _, asset := range m.Sites {
errs = append(errs, m.processPoints(asset))
}
@ -114,7 +144,7 @@ func (m *Metrics) SendPoints() (errs []error) {
return
}
// processPoints is helper function for SendPoints.
// processPoints is helper function for ProcessPoints.
func (m *Metrics) processPoints(asset Asset) error {
if asset == nil {
return nil
@ -129,7 +159,7 @@ func (m *Metrics) processPoints(asset Asset) error {
// GetFilteredSites returns a list of sites to fetch data for.
// Omits requested but unconfigured sites. Grabs the full list from the
// controller and filters the sites provided in the config file.
// controller and returns the sites provided in the config file.
func (u *UnifiPoller) GetFilteredSites() (unifi.Sites, error) {
sites, err := u.GetSites()
if err != nil {

View File

@ -17,6 +17,22 @@ import (
yaml "gopkg.in/yaml.v2"
)
// Start begins the application from a CLI.
// Parses flags, parses config and executes Run().
func Start() error {
log.SetFlags(log.LstdFlags)
up := &UnifiPoller{}
if up.ParseFlags(os.Args[1:]); up.ShowVer {
fmt.Printf("unifi-poller v%s\n", Version)
return nil // don't run anything else w/ version request.
}
if err := up.GetConfig(); err != nil {
up.Flag.Usage()
return err
}
return up.Run()
}
// ParseFlags runs the parser.
func (u *UnifiPoller) ParseFlags(args []string) {
u.Flag = flag.NewFlagSet("unifi-poller", flag.ExitOnError)
@ -71,14 +87,19 @@ func (u *UnifiPoller) Run() (err error) {
u.LogDebugf("Debug Logging Enabled")
}
log.Printf("[INFO] UniFi Poller v%v Starting Up! PID: %d", Version, os.Getpid())
if err = u.GetUnifi(); err != nil {
return err
}
if err = u.GetInfluxDB(); err != nil {
return err
}
return u.PollController()
switch strings.ToLower(u.Mode) {
case "influxlambda", "lambdainflux", "lambda_influx", "influx_lambda":
u.LogDebugf("Lambda Mode Enabled")
return u.CollectAndReport()
default:
return u.PollController()
}
}
// GetInfluxDB returns an InfluxDB interface.
@ -106,10 +127,10 @@ func (u *UnifiPoller) GetUnifi() (err error) {
u.Unifi.DebugLog = u.LogDebugf // Log debug messages.
v, err := u.GetServer()
if err != nil {
v.ServerVersion = "unknown"
return err
}
u.Logf("Authenticated to UniFi Controller at %s version %s as user %s", u.UnifiBase, v.ServerVersion, u.UnifiUser)
if err = u.CheckSites(); err != nil {
if err := u.CheckSites(); err != nil {
return err
}
u.Logf("Polling UniFi Controller Sites: %v", u.Sites)