move some more bits around

This commit is contained in:
davidnewhall2 2020-06-25 21:03:46 -07:00
parent 6ab0da0cf6
commit 7ac82f6e9c
6 changed files with 60 additions and 39 deletions

View File

@ -1,6 +1,7 @@
package lokiunifi package lokiunifi
import ( import (
"io/ioutil"
"strings" "strings"
"time" "time"
@ -10,7 +11,7 @@ import (
) )
const ( const (
maxInterval = time.Hour maxInterval = 10 * time.Minute
minInterval = 10 * time.Second minInterval = 10 * time.Second
defaultTimeout = 10 * time.Second defaultTimeout = 10 * time.Second
defaultInterval = 2 * time.Minute defaultInterval = 2 * time.Minute
@ -20,7 +21,7 @@ const (
// InputName is the name of plugin that gives us data. // InputName is the name of plugin that gives us data.
InputName = "unifi" InputName = "unifi"
// PluginName is the name of this plugin. // PluginName is the name of this plugin.
PluginName = "loki" PluginName = "Loki"
) )
// Config is the plugin's input data. // Config is the plugin's input data.
@ -81,6 +82,15 @@ func (l *Loki) ValidateConfig() {
l.Interval.Duration = minInterval l.Interval.Duration = minInterval
} }
if strings.HasPrefix(l.Password, "file://") {
pass, err := ioutil.ReadFile(strings.TrimPrefix(l.Password, "file://"))
if err != nil {
l.LogErrorf("Reading Loki Password File: %v", err)
}
l.Password = strings.TrimSpace(string(pass))
}
l.last = time.Now().Add(-l.Interval.Duration) l.last = time.Now().Add(-l.Interval.Duration)
l.client = l.httpClient() l.client = l.httpClient()
l.URL = strings.TrimRight(l.URL, "/") // gets a path appended to it later. l.URL = strings.TrimRight(l.URL, "/") // gets a path appended to it later.
@ -100,7 +110,7 @@ func (l *Loki) PollController() {
continue continue
} }
err = l.ProcessEvents(l.NewReport(), events, start) err = l.ProcessEvents(l.NewReport(start), events)
if err != nil { if err != nil {
l.LogErrorf("%v", err) l.LogErrorf("%v", err)
} }
@ -108,20 +118,19 @@ func (l *Loki) PollController() {
} }
// ProcessEvents offloads some of the loop from PollController. // ProcessEvents offloads some of the loop from PollController.
func (l *Loki) ProcessEvents(report *Report, events *poller.Events, start time.Time) error { func (l *Loki) ProcessEvents(report *Report, events *poller.Events) error {
// Sometimes it gets stuck on old messages. This gets it past that. // Sometimes it gets stuck on old messages. This gets it past that.
if time.Since(l.last) > 4*l.Interval.Duration { if time.Since(l.last) > 4*l.Interval.Duration {
l.last = time.Now().Add(-4 * l.Interval.Duration) l.last = time.Now().Add(-4 * l.Interval.Duration)
} }
report.ProcessEventLogs(events) logs := report.ProcessEventLogs(events)
if err := l.client.Post(logs); err != nil {
if err := l.client.Post(report.Logs); err != nil {
return errors.Wrap(err, "sending to Loki failed") return errors.Wrap(err, "sending to Loki failed")
} }
l.last = start l.last = report.Start
report.LogOutput(l.last) l.Logf("Events sent to Loki. %v", report)
return nil return nil
} }

View File

@ -1,6 +1,7 @@
package lokiunifi package lokiunifi
import ( import (
"fmt"
"strings" "strings"
"time" "time"
@ -15,51 +16,58 @@ type LogStream struct {
Entries [][]string `json:"values"` // "the log lines" Entries [][]string `json:"values"` // "the log lines"
} }
// Logs is the main logs-holding structure. // Logs is the main logs-holding structure. This is the Loki-output format.
type Logs struct { type Logs struct {
Streams []LogStream `json:"streams"` // "multiple files" Streams []LogStream `json:"streams"` // "multiple files"
} }
// Report is the temporary data generated and sent to Loki at every interval. // Report is the temporary data generated by processing events.
type Report struct { type Report struct {
Logs
Counts map[string]int Counts map[string]int
Oldest time.Time Oldest time.Time
Start time.Time
poller.Logger poller.Logger
} }
// NewReport makes a new report. // NewReport makes a new report.
func (l *Loki) NewReport() *Report { func (l *Loki) NewReport(start time.Time) *Report {
return &Report{ return &Report{
Logger: l.Collect, Counts: make(map[string]int),
Oldest: l.last, Oldest: l.last,
Start: start,
Logger: l.Collect,
} }
} }
func (r *Report) LogOutput(start time.Time) { // ProcessEventLogs loops the event Logs, matches the interface type, calls the
r.Logf("Events sent to Loki. Event: %d, IDS: %d, Alarm: %d, Anomaly: %d, Dur: %v", // appropriate method for the data, and compiles the Logs into a Loki format.
r.Counts[typeEvent], r.Counts[typeIDS], r.Counts[typeAlarm], r.Counts[typeAnomaly],
time.Since(start).Round(time.Millisecond))
}
// ProcessEventLogs loops the event Logs, matches the interface
// type, calls the appropriate method for the data, and compiles the report.
// This runs once per interval, if there was no collection error. // This runs once per interval, if there was no collection error.
func (r *Report) ProcessEventLogs(events *poller.Events) { func (r *Report) ProcessEventLogs(events *poller.Events) *Logs {
logs := &Logs{}
for _, e := range events.Logs { for _, e := range events.Logs {
switch event := e.(type) { switch event := e.(type) {
case *unifi.IDS: case *unifi.IDS:
r.IDS(event) r.IDS(event, logs)
case *unifi.Event: case *unifi.Event:
r.Event(event) r.Event(event, logs)
case *unifi.Alarm: case *unifi.Alarm:
r.Alarm(event) r.Alarm(event, logs)
case *unifi.Anomaly: case *unifi.Anomaly:
r.Anomaly(event) r.Anomaly(event, logs)
default: // unlikely. default: // unlikely.
r.LogErrorf("unknown event type: %T", e) r.LogErrorf("unknown event type: %T", e)
} }
} }
return logs
}
func (r *Report) String() string {
return fmt.Sprintf("%s: %d, %s: %d, %s: %d, %s: %d, Dur: %v",
typeEvent, r.Counts[typeEvent], typeIDS, r.Counts[typeIDS],
typeAlarm, r.Counts[typeAlarm], typeAnomaly, r.Counts[typeAnomaly],
time.Since(r.Start).Round(time.Millisecond))
} }
// CleanLabels removes any tag that is empty. // CleanLabels removes any tag that is empty.

View File

@ -6,16 +6,17 @@ import (
"github.com/unifi-poller/unifi" "github.com/unifi-poller/unifi"
) )
const typeAlarm = "alarm" const typeAlarm = "Alarm"
// Alarm stores a structured Alarm for batch sending to Loki. // Alarm stores a structured Alarm for batch sending to Loki.
func (r *Report) Alarm(event *unifi.Alarm) { func (r *Report) Alarm(event *unifi.Alarm, logs *Logs) {
if event.Datetime.Before(r.Oldest) { if event.Datetime.Before(r.Oldest) {
return return
} }
r.Counts[typeAlarm]++ // increase counter and append new log line. r.Counts[typeAlarm]++ // increase counter and append new log line.
r.Streams = append(r.Streams, LogStream{
logs.Streams = append(logs.Streams, LogStream{
Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Msg}}, Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Msg}},
Labels: CleanLabels(map[string]string{ Labels: CleanLabels(map[string]string{
"application": "unifi_alarm", "application": "unifi_alarm",

View File

@ -6,16 +6,17 @@ import (
"github.com/unifi-poller/unifi" "github.com/unifi-poller/unifi"
) )
const typeAnomaly = "anomaly" const typeAnomaly = "Anomaly"
// Anomaly stores a structured Anomaly for batch sending to Loki. // Anomaly stores a structured Anomaly for batch sending to Loki.
func (r *Report) Anomaly(event *unifi.Anomaly) { func (r *Report) Anomaly(event *unifi.Anomaly, logs *Logs) {
if event.Datetime.Before(r.Oldest) { if event.Datetime.Before(r.Oldest) {
return return
} }
r.Counts[typeAnomaly]++ // increase counter and append new log line. r.Counts[typeAnomaly]++ // increase counter and append new log line.
r.Streams = append(r.Streams, LogStream{
logs.Streams = append(logs.Streams, LogStream{
Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Anomaly}}, Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Anomaly}},
Labels: CleanLabels(map[string]string{ Labels: CleanLabels(map[string]string{
"application": "unifi_anomaly", "application": "unifi_anomaly",

View File

@ -6,16 +6,17 @@ import (
"github.com/unifi-poller/unifi" "github.com/unifi-poller/unifi"
) )
const typeEvent = "event" const typeEvent = "Event"
// Event stores a structured UniFi Event for batch sending to Loki. // Event stores a structured UniFi Event for batch sending to Loki.
func (r *Report) Event(event *unifi.Event) { func (r *Report) Event(event *unifi.Event, logs *Logs) {
if event.Datetime.Before(r.Oldest) { if event.Datetime.Before(r.Oldest) {
return return
} }
r.Counts[typeEvent]++ // increase counter and append new log line. r.Counts[typeEvent]++ // increase counter and append new log line.
r.Streams = append(r.Streams, LogStream{
logs.Streams = append(logs.Streams, LogStream{
Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Msg}}, Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Msg}},
Labels: CleanLabels(map[string]string{ Labels: CleanLabels(map[string]string{
"application": "unifi_event", "application": "unifi_event",

View File

@ -6,16 +6,17 @@ import (
"github.com/unifi-poller/unifi" "github.com/unifi-poller/unifi"
) )
const typeIDS = "ids" const typeIDS = "IDS"
// event stores a structured event Event for batch sending to Loki. // event stores a structured event Event for batch sending to Loki.
func (r *Report) IDS(event *unifi.IDS) { func (r *Report) IDS(event *unifi.IDS, logs *Logs) {
if event.Datetime.Before(r.Oldest) { if event.Datetime.Before(r.Oldest) {
return return
} }
r.Counts[typeIDS]++ // increase counter and append new log line. r.Counts[typeIDS]++ // increase counter and append new log line.
r.Streams = append(r.Streams, LogStream{
logs.Streams = append(logs.Streams, LogStream{
Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Msg}}, Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Msg}},
Labels: CleanLabels(map[string]string{ Labels: CleanLabels(map[string]string{
"application": "unifi_ids", "application": "unifi_ids",