rename and move a few things
This commit is contained in:
parent
ee90410d8f
commit
6ab0da0cf6
|
|
@ -40,7 +40,7 @@ func (l *Loki) httpClient() *Client {
|
|||
}
|
||||
|
||||
// Post marshals and posts a batch of log messages.
|
||||
func (c *Client) Post(logs LogStreams) error {
|
||||
func (c *Client) Post(logs interface{}) error {
|
||||
msg, err := json.Marshal(logs)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -94,25 +94,34 @@ func (l *Loki) PollController() {
|
|||
|
||||
ticker := time.NewTicker(interval)
|
||||
for start := range ticker.C {
|
||||
if err := l.pollController(start); err != nil {
|
||||
events, err := l.Events(&poller.Filter{Name: InputName})
|
||||
if err != nil {
|
||||
l.LogErrorf("event fetch for Loki failed: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = l.ProcessEvents(l.NewReport(), events, start)
|
||||
if err != nil {
|
||||
l.LogErrorf("%v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pollController offloads the loop from PollController.
|
||||
func (l *Loki) pollController(start time.Time) error {
|
||||
events, err := l.Events(&poller.Filter{Name: InputName})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "event fetch for Loki failed")
|
||||
// ProcessEvents offloads some of the loop from PollController.
|
||||
func (l *Loki) ProcessEvents(report *Report, events *poller.Events, start time.Time) error {
|
||||
// Sometimes it gets stuck on old messages. This gets it past that.
|
||||
if time.Since(l.last) > 4*l.Interval.Duration {
|
||||
l.last = time.Now().Add(-4 * l.Interval.Duration)
|
||||
}
|
||||
|
||||
report := &Report{
|
||||
Start: start,
|
||||
Logger: l.Collect,
|
||||
Client: l.client,
|
||||
Last: &l.last,
|
||||
report.ProcessEventLogs(events)
|
||||
|
||||
if err := l.client.Post(report.Logs); err != nil {
|
||||
return errors.Wrap(err, "sending to Loki failed")
|
||||
}
|
||||
|
||||
return report.Execute(events, 4*l.Interval.Duration) // nolint: gomnd
|
||||
l.last = start
|
||||
report.LogOutput(l.last)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/unifi-poller/poller"
|
||||
"github.com/unifi-poller/unifi"
|
||||
)
|
||||
|
|
@ -16,41 +15,31 @@ type LogStream struct {
|
|||
Entries [][]string `json:"values"` // "the log lines"
|
||||
}
|
||||
|
||||
// LogStreams is the main logs-holding structure.
|
||||
type LogStreams struct {
|
||||
// Logs is the main logs-holding structure.
|
||||
type Logs struct {
|
||||
Streams []LogStream `json:"streams"` // "multiple files"
|
||||
}
|
||||
|
||||
// Report is the temporary data generated and sent to Loki at every interval.
|
||||
type Report struct {
|
||||
Logs
|
||||
Counts map[string]int
|
||||
Start time.Time
|
||||
Last *time.Time
|
||||
Client *Client
|
||||
LogStreams
|
||||
Oldest time.Time
|
||||
poller.Logger
|
||||
}
|
||||
|
||||
// Execute processes events, reports events to Loki, updates last check time, and prints a log message.
|
||||
func (r *Report) Execute(events *poller.Events, skipDur time.Duration) error {
|
||||
// Sometimes it gets stuck on old messages. This gets it past that.
|
||||
if time.Since(*r.Last) > skipDur {
|
||||
*r.Last = time.Now().Add(-skipDur)
|
||||
// NewReport makes a new report.
|
||||
func (l *Loki) NewReport() *Report {
|
||||
return &Report{
|
||||
Logger: l.Collect,
|
||||
Oldest: l.last,
|
||||
}
|
||||
}
|
||||
|
||||
r.ProcessEventLogs(events) // Compile report.
|
||||
|
||||
// Send report to Loki.
|
||||
if err := r.Client.Post(r.LogStreams); err != nil {
|
||||
return errors.Wrap(err, "sending to Loki failed")
|
||||
}
|
||||
|
||||
*r.Last = r.Start
|
||||
func (r *Report) LogOutput(start time.Time) {
|
||||
r.Logf("Events sent to Loki. Event: %d, IDS: %d, Alarm: %d, Anomaly: %d, Dur: %v",
|
||||
r.Counts[typeEvent], r.Counts[typeIDS], r.Counts[typeAlarm], r.Counts[typeAnomaly],
|
||||
time.Since(r.Start).Round(time.Millisecond))
|
||||
|
||||
return nil
|
||||
time.Since(start).Round(time.Millisecond))
|
||||
}
|
||||
|
||||
// ProcessEventLogs loops the event Logs, matches the interface
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ const typeAlarm = "alarm"
|
|||
|
||||
// Alarm stores a structured Alarm for batch sending to Loki.
|
||||
func (r *Report) Alarm(event *unifi.Alarm) {
|
||||
if event.Datetime.Before(*r.Last) {
|
||||
if event.Datetime.Before(r.Oldest) {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ const typeAnomaly = "anomaly"
|
|||
|
||||
// Anomaly stores a structured Anomaly for batch sending to Loki.
|
||||
func (r *Report) Anomaly(event *unifi.Anomaly) {
|
||||
if event.Datetime.Before(*r.Last) {
|
||||
if event.Datetime.Before(r.Oldest) {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ const typeEvent = "event"
|
|||
|
||||
// Event stores a structured UniFi Event for batch sending to Loki.
|
||||
func (r *Report) Event(event *unifi.Event) {
|
||||
if event.Datetime.Before(*r.Last) {
|
||||
if event.Datetime.Before(r.Oldest) {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ const typeIDS = "ids"
|
|||
|
||||
// event stores a structured event Event for batch sending to Loki.
|
||||
func (r *Report) IDS(event *unifi.IDS) {
|
||||
if event.Datetime.Before(*r.Last) {
|
||||
if event.Datetime.Before(r.Oldest) {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue