diff --git a/go.mod b/go.mod index abf55453..7db8e45f 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/DataDog/datadog-go v4.8.3+incompatible github.com/gorilla/mux v1.8.0 github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/common v0.38.0 github.com/spf13/pflag v1.0.6-0.20201009195203-85dd5c8bc61c @@ -21,7 +22,6 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect - github.com/pkg/errors v0.9.1 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.3.0 // indirect golang.org/x/tools v0.1.12 // indirect diff --git a/pkg/poller/inputs.go b/pkg/poller/inputs.go index 942e70b5..1bb8bced 100644 --- a/pkg/poller/inputs.go +++ b/pkg/poller/inputs.go @@ -1,9 +1,12 @@ package poller import ( + "fmt" "strings" "sync" "time" + + "github.com/pkg/errors" ) var ( @@ -61,17 +64,53 @@ func (u *UnifiPoller) InitializeInputs() error { inputSync.RLock() defer inputSync.RUnlock() + errChan := make(chan error, len(inputs)) + wg := &sync.WaitGroup{} + + // parallelize startup + u.LogDebugf("initializing %d inputs", len(inputs)) for _, input := range inputs { - // This must return, or the app locks up here. - u.LogDebugf("inititalizing input... %s", input.Name) - if err := input.Initialize(u); err != nil { - u.LogDebugf("error initializing input ... %s", input.Name) - return err + wg.Add(1) + go func(input *InputPlugin) { + defer wg.Done() + // This must return, or the app locks up here. + u.LogDebugf("inititalizing input... %s", input.Name) + if err := input.Initialize(u); err != nil { + u.LogDebugf("error initializing input ... %s", input.Name) + errChan <- err + return + } + u.LogDebugf("input successfully initialized ... %s", input.Name) + errChan <- nil + }(input) + } + wg.Wait() + close(errChan) + u.LogDebugf("collecting input errors...") + + // collect errors if any. + errs := make([]error, 0) + for err := range errChan { + if err != nil { + errs = append(errs, err) } - u.LogDebugf("input successfully initialized ... %s", input.Name) } - return nil + var err error + if len(errs) > 0 { + err = fmt.Errorf("error initializing inputs") + for _, e := range errs { + err = errors.Wrap(err, e.Error()) + } + } + u.LogDebugf("returning error: %w", err) + + return err +} + +type eventInputResult struct { + logs []any + err error } // Events aggregates log messages (events) from one or more sources. @@ -79,25 +118,56 @@ func (u *UnifiPoller) Events(filter *Filter) (*Events, error) { inputSync.RLock() defer inputSync.RUnlock() - events := Events{} + resultChan := make(chan eventInputResult, len(inputs)) + wg := &sync.WaitGroup{} for _, input := range inputs { - if filter != nil && - filter.Name != "" && - !strings.EqualFold(input.Name, filter.Name) { - continue - } + wg.Add(1) + go func(input *InputPlugin) { + defer wg.Done() + if filter != nil && + filter.Name != "" && + !strings.EqualFold(input.Name, filter.Name) { + resultChan <- eventInputResult{} + return + } - e, err := input.Events(filter) - if err != nil { - return &events, err - } + e, err := input.Events(filter) + if err != nil { + resultChan <- eventInputResult{err: err} + return + } + resultChan <- eventInputResult{logs: e.Logs} - // Logs is the only member to extend at this time. - events.Logs = append(events.Logs, e.Logs...) + }(input) + } + wg.Wait() + close(resultChan) + + events := Events{} + errs := make([]error, 0) + for result := range resultChan { + if result.err != nil { + errs = append(errs, result.err) + } else if result.logs != nil { + // Logs is the only member to extend at this time. + events.Logs = append(events.Logs, result.logs...) + } + } + var err error + if len(errs) > 0 { + err = fmt.Errorf("error initializing inputs") + for _, e := range errs { + err = errors.Wrap(err, e.Error()) + } } - return &events, nil + return &events, err +} + +type metricInputResult struct { + metric *Metrics + err error } // Metrics aggregates all the measurements from filtered inputs and returns them. @@ -106,24 +176,46 @@ func (u *UnifiPoller) Metrics(filter *Filter) (*Metrics, error) { inputSync.RLock() defer inputSync.RUnlock() - metrics := &Metrics{} + resultChan := make(chan metricInputResult, len(inputs)) + wg := &sync.WaitGroup{} for _, input := range inputs { - if filter != nil && - filter.Name != "" && - !strings.EqualFold(input.Name, filter.Name) { - continue - } + wg.Add(1) + go func(input *InputPlugin) { + defer wg.Done() + if filter != nil && + filter.Name != "" && + !strings.EqualFold(input.Name, filter.Name) { + resultChan <- metricInputResult{} + return + } - m, err := input.Metrics(filter) - if err != nil { - return metrics, err - } + m, err := input.Metrics(filter) + resultChan <- metricInputResult{metric: m, err: err} + }(input) + } + wg.Wait() + close(resultChan) - metrics = AppendMetrics(metrics, m) + errs := make([]error, 0) + metrics := &Metrics{} + for result := range resultChan { + if result.err != nil { + errs = append(errs, result.err) + } else if result.metric != nil { + metrics = AppendMetrics(metrics, result.metric) + } } - return metrics, nil + var err error + if len(errs) > 0 { + err = fmt.Errorf("error initializing inputs") + for _, e := range errs { + err = errors.Wrap(err, e.Error()) + } + } + + return metrics, err } // AppendMetrics combines the metrics from two sources.