diff --git a/integrations/lokiunifi/client.go b/integrations/lokiunifi/client.go index 4ce6f30d..4672faa7 100644 --- a/integrations/lokiunifi/client.go +++ b/integrations/lokiunifi/client.go @@ -40,7 +40,7 @@ func (l *Loki) httpClient() *Client { } // Post marshals and posts a batch of log messages. -func (c *Client) Post(logs LogStreams) error { +func (c *Client) Post(logs interface{}) error { msg, err := json.Marshal(logs) if err != nil { return err diff --git a/integrations/lokiunifi/loki.go b/integrations/lokiunifi/loki.go index 13e70d23..2b8aea2e 100644 --- a/integrations/lokiunifi/loki.go +++ b/integrations/lokiunifi/loki.go @@ -94,25 +94,34 @@ func (l *Loki) PollController() { ticker := time.NewTicker(interval) for start := range ticker.C { - if err := l.pollController(start); err != nil { + events, err := l.Events(&poller.Filter{Name: InputName}) + if err != nil { + l.LogErrorf("event fetch for Loki failed: %v", err) + continue + } + + err = l.ProcessEvents(l.NewReport(), events, start) + if err != nil { l.LogErrorf("%v", err) } } } -// pollController offloads the loop from PollController. -func (l *Loki) pollController(start time.Time) error { - events, err := l.Events(&poller.Filter{Name: InputName}) - if err != nil { - return errors.Wrap(err, "event fetch for Loki failed") +// ProcessEvents offloads some of the loop from PollController. +func (l *Loki) ProcessEvents(report *Report, events *poller.Events, start time.Time) error { + // Sometimes it gets stuck on old messages. This gets it past that. + if time.Since(l.last) > 4*l.Interval.Duration { + l.last = time.Now().Add(-4 * l.Interval.Duration) } - report := &Report{ - Start: start, - Logger: l.Collect, - Client: l.client, - Last: &l.last, + report.ProcessEventLogs(events) + + if err := l.client.Post(report.Logs); err != nil { + return errors.Wrap(err, "sending to Loki failed") } - return report.Execute(events, 4*l.Interval.Duration) // nolint: gomnd + l.last = start + report.LogOutput(l.last) + + return nil } diff --git a/integrations/lokiunifi/report.go b/integrations/lokiunifi/report.go index c7db5514..af21aa81 100644 --- a/integrations/lokiunifi/report.go +++ b/integrations/lokiunifi/report.go @@ -4,7 +4,6 @@ import ( "strings" "time" - "github.com/pkg/errors" "github.com/unifi-poller/poller" "github.com/unifi-poller/unifi" ) @@ -16,41 +15,31 @@ type LogStream struct { Entries [][]string `json:"values"` // "the log lines" } -// LogStreams is the main logs-holding structure. -type LogStreams struct { +// Logs is the main logs-holding structure. +type Logs struct { Streams []LogStream `json:"streams"` // "multiple files" } // Report is the temporary data generated and sent to Loki at every interval. type Report struct { + Logs Counts map[string]int - Start time.Time - Last *time.Time - Client *Client - LogStreams + Oldest time.Time poller.Logger } -// Execute processes events, reports events to Loki, updates last check time, and prints a log message. -func (r *Report) Execute(events *poller.Events, skipDur time.Duration) error { - // Sometimes it gets stuck on old messages. This gets it past that. - if time.Since(*r.Last) > skipDur { - *r.Last = time.Now().Add(-skipDur) +// NewReport makes a new report. +func (l *Loki) NewReport() *Report { + return &Report{ + Logger: l.Collect, + Oldest: l.last, } +} - r.ProcessEventLogs(events) // Compile report. - - // Send report to Loki. - if err := r.Client.Post(r.LogStreams); err != nil { - return errors.Wrap(err, "sending to Loki failed") - } - - *r.Last = r.Start +func (r *Report) LogOutput(start time.Time) { r.Logf("Events sent to Loki. Event: %d, IDS: %d, Alarm: %d, Anomaly: %d, Dur: %v", r.Counts[typeEvent], r.Counts[typeIDS], r.Counts[typeAlarm], r.Counts[typeAnomaly], - time.Since(r.Start).Round(time.Millisecond)) - - return nil + time.Since(start).Round(time.Millisecond)) } // ProcessEventLogs loops the event Logs, matches the interface diff --git a/integrations/lokiunifi/report_alarm.go b/integrations/lokiunifi/report_alarm.go index e69e04e7..c7c98931 100644 --- a/integrations/lokiunifi/report_alarm.go +++ b/integrations/lokiunifi/report_alarm.go @@ -10,7 +10,7 @@ const typeAlarm = "alarm" // Alarm stores a structured Alarm for batch sending to Loki. func (r *Report) Alarm(event *unifi.Alarm) { - if event.Datetime.Before(*r.Last) { + if event.Datetime.Before(r.Oldest) { return } diff --git a/integrations/lokiunifi/report_anomaly.go b/integrations/lokiunifi/report_anomaly.go index 3286db9d..6c8c9269 100644 --- a/integrations/lokiunifi/report_anomaly.go +++ b/integrations/lokiunifi/report_anomaly.go @@ -10,7 +10,7 @@ const typeAnomaly = "anomaly" // Anomaly stores a structured Anomaly for batch sending to Loki. func (r *Report) Anomaly(event *unifi.Anomaly) { - if event.Datetime.Before(*r.Last) { + if event.Datetime.Before(r.Oldest) { return } diff --git a/integrations/lokiunifi/report_event.go b/integrations/lokiunifi/report_event.go index b65f7dbf..e3a2db53 100644 --- a/integrations/lokiunifi/report_event.go +++ b/integrations/lokiunifi/report_event.go @@ -10,7 +10,7 @@ const typeEvent = "event" // Event stores a structured UniFi Event for batch sending to Loki. func (r *Report) Event(event *unifi.Event) { - if event.Datetime.Before(*r.Last) { + if event.Datetime.Before(r.Oldest) { return } diff --git a/integrations/lokiunifi/report_ids.go b/integrations/lokiunifi/report_ids.go index ee9c9b48..0ea92b37 100644 --- a/integrations/lokiunifi/report_ids.go +++ b/integrations/lokiunifi/report_ids.go @@ -10,7 +10,7 @@ const typeIDS = "ids" // event stores a structured event Event for batch sending to Loki. func (r *Report) IDS(event *unifi.IDS) { - if event.Datetime.Before(*r.Last) { + if event.Datetime.Before(r.Oldest) { return }