diff --git a/integrations/lokiunifi/loki.go b/integrations/lokiunifi/loki.go index 2b8aea2e..b6e2fb92 100644 --- a/integrations/lokiunifi/loki.go +++ b/integrations/lokiunifi/loki.go @@ -1,6 +1,7 @@ package lokiunifi import ( + "io/ioutil" "strings" "time" @@ -10,7 +11,7 @@ import ( ) const ( - maxInterval = time.Hour + maxInterval = 10 * time.Minute minInterval = 10 * time.Second defaultTimeout = 10 * time.Second defaultInterval = 2 * time.Minute @@ -20,7 +21,7 @@ const ( // InputName is the name of plugin that gives us data. InputName = "unifi" // PluginName is the name of this plugin. - PluginName = "loki" + PluginName = "Loki" ) // Config is the plugin's input data. @@ -81,6 +82,15 @@ func (l *Loki) ValidateConfig() { l.Interval.Duration = minInterval } + if strings.HasPrefix(l.Password, "file://") { + pass, err := ioutil.ReadFile(strings.TrimPrefix(l.Password, "file://")) + if err != nil { + l.LogErrorf("Reading Loki Password File: %v", err) + } + + l.Password = strings.TrimSpace(string(pass)) + } + l.last = time.Now().Add(-l.Interval.Duration) l.client = l.httpClient() l.URL = strings.TrimRight(l.URL, "/") // gets a path appended to it later. @@ -100,7 +110,7 @@ func (l *Loki) PollController() { continue } - err = l.ProcessEvents(l.NewReport(), events, start) + err = l.ProcessEvents(l.NewReport(start), events) if err != nil { l.LogErrorf("%v", err) } @@ -108,20 +118,19 @@ func (l *Loki) PollController() { } // ProcessEvents offloads some of the loop from PollController. -func (l *Loki) ProcessEvents(report *Report, events *poller.Events, start time.Time) error { +func (l *Loki) ProcessEvents(report *Report, events *poller.Events) error { // Sometimes it gets stuck on old messages. This gets it past that. if time.Since(l.last) > 4*l.Interval.Duration { l.last = time.Now().Add(-4 * l.Interval.Duration) } - report.ProcessEventLogs(events) - - if err := l.client.Post(report.Logs); err != nil { + logs := report.ProcessEventLogs(events) + if err := l.client.Post(logs); err != nil { return errors.Wrap(err, "sending to Loki failed") } - l.last = start - report.LogOutput(l.last) + l.last = report.Start + l.Logf("Events sent to Loki. %v", report) return nil } diff --git a/integrations/lokiunifi/report.go b/integrations/lokiunifi/report.go index af21aa81..a0b622e0 100644 --- a/integrations/lokiunifi/report.go +++ b/integrations/lokiunifi/report.go @@ -1,6 +1,7 @@ package lokiunifi import ( + "fmt" "strings" "time" @@ -15,51 +16,58 @@ type LogStream struct { Entries [][]string `json:"values"` // "the log lines" } -// Logs is the main logs-holding structure. +// Logs is the main logs-holding structure. This is the Loki-output format. type Logs struct { Streams []LogStream `json:"streams"` // "multiple files" } -// Report is the temporary data generated and sent to Loki at every interval. +// Report is the temporary data generated by processing events. type Report struct { - Logs Counts map[string]int Oldest time.Time + Start time.Time poller.Logger } // NewReport makes a new report. -func (l *Loki) NewReport() *Report { +func (l *Loki) NewReport(start time.Time) *Report { return &Report{ - Logger: l.Collect, + Counts: make(map[string]int), Oldest: l.last, + Start: start, + Logger: l.Collect, } } -func (r *Report) LogOutput(start time.Time) { - r.Logf("Events sent to Loki. Event: %d, IDS: %d, Alarm: %d, Anomaly: %d, Dur: %v", - r.Counts[typeEvent], r.Counts[typeIDS], r.Counts[typeAlarm], r.Counts[typeAnomaly], - time.Since(start).Round(time.Millisecond)) -} - -// ProcessEventLogs loops the event Logs, matches the interface -// type, calls the appropriate method for the data, and compiles the report. +// ProcessEventLogs loops the event Logs, matches the interface type, calls the +// appropriate method for the data, and compiles the Logs into a Loki format. // This runs once per interval, if there was no collection error. -func (r *Report) ProcessEventLogs(events *poller.Events) { +func (r *Report) ProcessEventLogs(events *poller.Events) *Logs { + logs := &Logs{} + for _, e := range events.Logs { switch event := e.(type) { case *unifi.IDS: - r.IDS(event) + r.IDS(event, logs) case *unifi.Event: - r.Event(event) + r.Event(event, logs) case *unifi.Alarm: - r.Alarm(event) + r.Alarm(event, logs) case *unifi.Anomaly: - r.Anomaly(event) + r.Anomaly(event, logs) default: // unlikely. r.LogErrorf("unknown event type: %T", e) } } + + return logs +} + +func (r *Report) String() string { + return fmt.Sprintf("%s: %d, %s: %d, %s: %d, %s: %d, Dur: %v", + typeEvent, r.Counts[typeEvent], typeIDS, r.Counts[typeIDS], + typeAlarm, r.Counts[typeAlarm], typeAnomaly, r.Counts[typeAnomaly], + time.Since(r.Start).Round(time.Millisecond)) } // CleanLabels removes any tag that is empty. diff --git a/integrations/lokiunifi/report_alarm.go b/integrations/lokiunifi/report_alarm.go index c7c98931..39fef0ae 100644 --- a/integrations/lokiunifi/report_alarm.go +++ b/integrations/lokiunifi/report_alarm.go @@ -6,16 +6,17 @@ import ( "github.com/unifi-poller/unifi" ) -const typeAlarm = "alarm" +const typeAlarm = "Alarm" // Alarm stores a structured Alarm for batch sending to Loki. -func (r *Report) Alarm(event *unifi.Alarm) { +func (r *Report) Alarm(event *unifi.Alarm, logs *Logs) { if event.Datetime.Before(r.Oldest) { return } r.Counts[typeAlarm]++ // increase counter and append new log line. - r.Streams = append(r.Streams, LogStream{ + + logs.Streams = append(logs.Streams, LogStream{ Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Msg}}, Labels: CleanLabels(map[string]string{ "application": "unifi_alarm", diff --git a/integrations/lokiunifi/report_anomaly.go b/integrations/lokiunifi/report_anomaly.go index 6c8c9269..2c782551 100644 --- a/integrations/lokiunifi/report_anomaly.go +++ b/integrations/lokiunifi/report_anomaly.go @@ -6,16 +6,17 @@ import ( "github.com/unifi-poller/unifi" ) -const typeAnomaly = "anomaly" +const typeAnomaly = "Anomaly" // Anomaly stores a structured Anomaly for batch sending to Loki. -func (r *Report) Anomaly(event *unifi.Anomaly) { +func (r *Report) Anomaly(event *unifi.Anomaly, logs *Logs) { if event.Datetime.Before(r.Oldest) { return } r.Counts[typeAnomaly]++ // increase counter and append new log line. - r.Streams = append(r.Streams, LogStream{ + + logs.Streams = append(logs.Streams, LogStream{ Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Anomaly}}, Labels: CleanLabels(map[string]string{ "application": "unifi_anomaly", diff --git a/integrations/lokiunifi/report_event.go b/integrations/lokiunifi/report_event.go index e3a2db53..901cb86e 100644 --- a/integrations/lokiunifi/report_event.go +++ b/integrations/lokiunifi/report_event.go @@ -6,16 +6,17 @@ import ( "github.com/unifi-poller/unifi" ) -const typeEvent = "event" +const typeEvent = "Event" // Event stores a structured UniFi Event for batch sending to Loki. -func (r *Report) Event(event *unifi.Event) { +func (r *Report) Event(event *unifi.Event, logs *Logs) { if event.Datetime.Before(r.Oldest) { return } r.Counts[typeEvent]++ // increase counter and append new log line. - r.Streams = append(r.Streams, LogStream{ + + logs.Streams = append(logs.Streams, LogStream{ Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Msg}}, Labels: CleanLabels(map[string]string{ "application": "unifi_event", diff --git a/integrations/lokiunifi/report_ids.go b/integrations/lokiunifi/report_ids.go index 0ea92b37..ae9a13b1 100644 --- a/integrations/lokiunifi/report_ids.go +++ b/integrations/lokiunifi/report_ids.go @@ -6,16 +6,17 @@ import ( "github.com/unifi-poller/unifi" ) -const typeIDS = "ids" +const typeIDS = "IDS" // event stores a structured event Event for batch sending to Loki. -func (r *Report) IDS(event *unifi.IDS) { +func (r *Report) IDS(event *unifi.IDS, logs *Logs) { if event.Datetime.Before(r.Oldest) { return } r.Counts[typeIDS]++ // increase counter and append new log line. - r.Streams = append(r.Streams, LogStream{ + + logs.Streams = append(logs.Streams, LogStream{ Entries: [][]string{{strconv.FormatInt(event.Datetime.UnixNano(), 10), event.Msg}}, Labels: CleanLabels(map[string]string{ "application": "unifi_ids",