Merge pull request #469 from unpoller/parallize-inputs

Faster initialization & Collection for large number of sites
This commit is contained in:
Cody Lee 2022-12-18 09:18:33 -06:00 committed by GitHub
commit 9a13fdce4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 125 additions and 33 deletions

2
go.mod
View File

@ -6,6 +6,7 @@ require (
github.com/DataDog/datadog-go v4.8.3+incompatible
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.38.0
github.com/spf13/pflag v1.0.6-0.20201009195203-85dd5c8bc61c
@ -21,7 +22,6 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.3.0 // indirect
golang.org/x/tools v0.1.12 // indirect

View File

@ -1,9 +1,12 @@
package poller
import (
"fmt"
"strings"
"sync"
"time"
"github.com/pkg/errors"
)
var (
@ -61,17 +64,53 @@ func (u *UnifiPoller) InitializeInputs() error {
inputSync.RLock()
defer inputSync.RUnlock()
errChan := make(chan error, len(inputs))
wg := &sync.WaitGroup{}
// parallelize startup
u.LogDebugf("initializing %d inputs", len(inputs))
for _, input := range inputs {
wg.Add(1)
go func(input *InputPlugin) {
defer wg.Done()
// This must return, or the app locks up here.
u.LogDebugf("inititalizing input... %s", input.Name)
if err := input.Initialize(u); err != nil {
u.LogDebugf("error initializing input ... %s", input.Name)
return err
errChan <- err
return
}
u.LogDebugf("input successfully initialized ... %s", input.Name)
errChan <- nil
}(input)
}
wg.Wait()
close(errChan)
u.LogDebugf("collecting input errors...")
// collect errors if any.
errs := make([]error, 0)
for err := range errChan {
if err != nil {
errs = append(errs, err)
}
}
return nil
var err error
if len(errs) > 0 {
err = fmt.Errorf("error initializing inputs")
for _, e := range errs {
err = errors.Wrap(err, e.Error())
}
}
u.LogDebugf("returning error: %w", err)
return err
}
type eventInputResult struct {
logs []any
err error
}
// Events aggregates log messages (events) from one or more sources.
@ -79,25 +118,56 @@ func (u *UnifiPoller) Events(filter *Filter) (*Events, error) {
inputSync.RLock()
defer inputSync.RUnlock()
events := Events{}
resultChan := make(chan eventInputResult, len(inputs))
wg := &sync.WaitGroup{}
for _, input := range inputs {
wg.Add(1)
go func(input *InputPlugin) {
defer wg.Done()
if filter != nil &&
filter.Name != "" &&
!strings.EqualFold(input.Name, filter.Name) {
continue
resultChan <- eventInputResult{}
return
}
e, err := input.Events(filter)
if err != nil {
resultChan <- eventInputResult{err: err}
return
}
resultChan <- eventInputResult{logs: e.Logs}
}(input)
}
wg.Wait()
close(resultChan)
events := Events{}
errs := make([]error, 0)
for result := range resultChan {
if result.err != nil {
errs = append(errs, result.err)
} else if result.logs != nil {
// Logs is the only member to extend at this time.
events.Logs = append(events.Logs, result.logs...)
}
}
var err error
if len(errs) > 0 {
err = fmt.Errorf("error initializing inputs")
for _, e := range errs {
err = errors.Wrap(err, e.Error())
}
}
return &events, err
}
// Logs is the only member to extend at this time.
events.Logs = append(events.Logs, e.Logs...)
}
return &events, nil
type metricInputResult struct {
metric *Metrics
err error
}
// Metrics aggregates all the measurements from filtered inputs and returns them.
@ -106,26 +176,48 @@ func (u *UnifiPoller) Metrics(filter *Filter) (*Metrics, error) {
inputSync.RLock()
defer inputSync.RUnlock()
metrics := &Metrics{}
resultChan := make(chan metricInputResult, len(inputs))
wg := &sync.WaitGroup{}
for _, input := range inputs {
wg.Add(1)
go func(input *InputPlugin) {
defer wg.Done()
if filter != nil &&
filter.Name != "" &&
!strings.EqualFold(input.Name, filter.Name) {
continue
resultChan <- metricInputResult{}
return
}
m, err := input.Metrics(filter)
if err != nil {
resultChan <- metricInputResult{metric: m, err: err}
}(input)
}
wg.Wait()
close(resultChan)
errs := make([]error, 0)
metrics := &Metrics{}
for result := range resultChan {
if result.err != nil {
errs = append(errs, result.err)
} else if result.metric != nil {
metrics = AppendMetrics(metrics, result.metric)
}
}
var err error
if len(errs) > 0 {
err = fmt.Errorf("error initializing inputs")
for _, e := range errs {
err = errors.Wrap(err, e.Error())
}
}
return metrics, err
}
metrics = AppendMetrics(metrics, m)
}
return metrics, nil
}
// AppendMetrics combines the metrics from two sources.
func AppendMetrics(existing *Metrics, m *Metrics) *Metrics {
if existing == nil {