143 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			143 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Package influx provides the methods to turn UniFi measurements into influx
 | |
| // data-points with appropriate tags and fields.
 | |
| package influxunifi
 | |
| 
 | |
| import (
 | |
| 	"crypto/tls"
 | |
| 	"fmt"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/davidnewhall/unifi-poller/pkg/metrics"
 | |
| 	influx "github.com/influxdata/influxdb1-client/v2"
 | |
| )
 | |
| 
 | |
| // Config defines the data needed to store metrics in InfluxDB
 | |
| type Config struct {
 | |
| 	Database string
 | |
| 	URL      string
 | |
| 	User     string
 | |
| 	Pass     string
 | |
| 	BadSSL   bool
 | |
| }
 | |
| 
 | |
| // InfluxUnifi is returned by New() after you provide a Config.
 | |
| type InfluxUnifi struct {
 | |
| 	cf     *Config
 | |
| 	influx influx.Client
 | |
| }
 | |
| 
 | |
| type metric struct {
 | |
| 	Table  string
 | |
| 	Tags   map[string]string
 | |
| 	Fields map[string]interface{}
 | |
| }
 | |
| 
 | |
| // New returns an InfluxDB interface.
 | |
| func New(c *Config) (*InfluxUnifi, error) {
 | |
| 	i, err := influx.NewHTTPClient(influx.HTTPConfig{
 | |
| 		Addr:      c.URL,
 | |
| 		Username:  c.User,
 | |
| 		Password:  c.Pass,
 | |
| 		TLSConfig: &tls.Config{InsecureSkipVerify: c.BadSSL},
 | |
| 	})
 | |
| 	return &InfluxUnifi{cf: c, influx: i}, err
 | |
| }
 | |
| 
 | |
| // ReportMetrics batches all device and client data into influxdb data points.
 | |
| // Call this after you've collected all the data you care about.
 | |
| // Returns an error if influxdb calls fail, otherwise returns a report.
 | |
| func (u *InfluxUnifi) ReportMetrics(m *metrics.Metrics) (*Report, error) {
 | |
| 	r := &Report{Metrics: m, ch: make(chan *metric), Start: time.Now()}
 | |
| 	defer close(r.ch)
 | |
| 	// Make a new Influx Points Batcher.
 | |
| 	var err error
 | |
| 	r.bp, err = influx.NewBatchPoints(influx.BatchPointsConfig{Database: u.cf.Database})
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("influx.NewBatchPoints: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	go u.collect(r, r.ch)
 | |
| 	// Batch all the points.
 | |
| 	u.loopPoints(r)
 | |
| 	r.wg.Wait() // wait for all points to finish batching!
 | |
| 
 | |
| 	// Send all the points.
 | |
| 	if err = u.influx.Write(r.bp); err != nil {
 | |
| 		return nil, fmt.Errorf("influxdb.Write(points): %v", err)
 | |
| 	}
 | |
| 	r.Elapsed = time.Since(r.Start)
 | |
| 	return r, nil
 | |
| }
 | |
| 
 | |
| // collect runs in a go routine and batches all the points.
 | |
| func (u *InfluxUnifi) collect(r report, ch chan *metric) {
 | |
| 	for m := range ch {
 | |
| 		pt, err := influx.NewPoint(m.Table, m.Tags, m.Fields, r.metrics().TS)
 | |
| 		if err != nil {
 | |
| 			r.error(err)
 | |
| 		} else {
 | |
| 			r.batch(m, pt)
 | |
| 		}
 | |
| 		r.done()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // loopPoints kicks off 3 or 7 go routines to process metrics and send them
 | |
| // to the collect routine through the metric channel.
 | |
| func (u *InfluxUnifi) loopPoints(r report) {
 | |
| 	m := r.metrics()
 | |
| 	r.add()
 | |
| 	go func() {
 | |
| 		defer r.done()
 | |
| 		for _, s := range m.Sites {
 | |
| 			u.batchSite(r, s)
 | |
| 		}
 | |
| 	}()
 | |
| 	r.add()
 | |
| 	go func() {
 | |
| 		defer r.done()
 | |
| 		for _, s := range m.Clients {
 | |
| 			u.batchClient(r, s)
 | |
| 		}
 | |
| 	}()
 | |
| 	r.add()
 | |
| 	go func() {
 | |
| 		defer r.done()
 | |
| 		for _, s := range m.IDSList {
 | |
| 			u.batchIDS(r, s)
 | |
| 		}
 | |
| 	}()
 | |
| 	if m.Devices == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	r.add()
 | |
| 	go func() {
 | |
| 		defer r.done()
 | |
| 		for _, s := range m.UAPs {
 | |
| 			u.batchUAP(r, s)
 | |
| 		}
 | |
| 	}()
 | |
| 	r.add()
 | |
| 	go func() {
 | |
| 		defer r.done()
 | |
| 		for _, s := range m.USGs {
 | |
| 			u.batchUSG(r, s)
 | |
| 		}
 | |
| 	}()
 | |
| 	r.add()
 | |
| 	go func() {
 | |
| 		defer r.done()
 | |
| 		for _, s := range m.USWs {
 | |
| 			u.batchUSW(r, s)
 | |
| 		}
 | |
| 	}()
 | |
| 	r.add()
 | |
| 	go func() {
 | |
| 		defer r.done()
 | |
| 		for _, s := range m.UDMs {
 | |
| 			u.batchUDM(r, s)
 | |
| 		}
 | |
| 	}()
 | |
| }
 |