extend output interface. fix mutexes

This commit is contained in:
davidnewhall2 2020-06-26 01:29:58 -07:00
parent d1a83892c2
commit 0858dc865d
3 changed files with 49 additions and 7 deletions

View File

@ -43,6 +43,7 @@ type Metrics struct {
Devices []interface{}
}
// Events defines the type for log entries.
type Events struct {
Logs []interface{}
}

View File

@ -9,7 +9,7 @@ import (
var (
// These are used ot keep track of loaded input plugins.
inputs []*InputPlugin // nolint: gochecknoglobals
inputSync sync.Mutex // nolint: gochecknoglobals
inputSync sync.RWMutex // nolint: gochecknoglobals
)
// Input plugins must implement this interface.
@ -58,8 +58,8 @@ func NewInput(i *InputPlugin) {
// InitializeInputs runs the passed-in initializer method for each input plugin.
func (u *UnifiPoller) InitializeInputs() error {
inputSync.Lock()
defer inputSync.Unlock()
inputSync.RLock()
defer inputSync.RUnlock()
for _, input := range inputs {
// This must return, or the app locks up here.
@ -73,6 +73,9 @@ func (u *UnifiPoller) InitializeInputs() error {
// Events aggregates log messages (events) from one or more sources.
func (u *UnifiPoller) Events(filter *Filter) (*Events, error) {
inputSync.RLock()
defer inputSync.RUnlock()
events := Events{}
for _, input := range inputs {
@ -97,6 +100,9 @@ func (u *UnifiPoller) Events(filter *Filter) (*Events, error) {
// Metrics aggregates all the measurements from filtered inputs and returns them.
// Passing a null filter returns everything!
func (u *UnifiPoller) Metrics(filter *Filter) (*Metrics, error) {
inputSync.RLock()
defer inputSync.RUnlock()
metrics := &Metrics{}
for _, input := range inputs {
@ -135,3 +141,15 @@ func AppendMetrics(existing *Metrics, m *Metrics) *Metrics {
return existing
}
// Inputs allows output plugins to see the list of loaded input plugins.
func (u *UnifiPoller) Inputs() (names []string) {
inputSync.RLock()
defer inputSync.RUnlock()
for i := range inputs {
names = append(names, inputs[i].Name)
}
return names
}

View File

@ -7,18 +7,21 @@ import (
var (
// These are used to keep track of loaded output plugins.
outputs []*Output // nolint: gochecknoglobals
outputSync sync.Mutex // nolint: gochecknoglobals
outputs []*Output // nolint: gochecknoglobals
outputSync sync.RWMutex // nolint: gochecknoglobals
errNoOutputPlugins = fmt.Errorf("no output plugins imported")
errAllOutputStopped = fmt.Errorf("all output plugins have stopped, or none enabled")
)
// Collect is passed into output packages so they may collect metrics to output.
// Output packages must implement this interface.
type Collect interface {
Logger
Metrics(*Filter) (*Metrics, error)
Events(*Filter) (*Events, error)
Logger
// These get used by the webserver output plugin.
Poller() Poller
Inputs() []string
Outputs() []string
}
// Output defines the output data for a metric exporter like influx or prometheus.
@ -41,9 +44,17 @@ func NewOutput(o *Output) {
outputs = append(outputs, o)
}
// Poller returns the poller config.
func (u *UnifiPoller) Poller() Poller {
return *u.Config.Poller
}
// InitializeOutputs runs all the configured output plugins.
// If none exist, or they all exit an error is returned.
func (u *UnifiPoller) InitializeOutputs() error {
outputSync.RLock()
defer outputSync.RUnlock()
v := make(chan error)
defer close(v)
@ -73,3 +84,15 @@ func (u *UnifiPoller) InitializeOutputs() error {
return nil
}
// Outputs allows other output plugins to see the list of loaded output plugins.
func (u *UnifiPoller) Outputs() (names []string) {
outputSync.RLock()
defer outputSync.RUnlock()
for i := range outputs {
names = append(names, outputs[i].Name)
}
return names
}