orchard/pkg/client/events.go

76 lines
1.5 KiB
Go

package client
import (
"context"
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
"github.com/hashicorp/go-multierror"
"io"
"net/http"
)
type EventStreamer struct {
client *Client
endpoint string
eventsChannel chan v1.Event
sendErr error
io.Closer
}
func NewEventStreamer(client *Client, endpoint string) *EventStreamer {
streamer := &EventStreamer{
client: client,
endpoint: endpoint,
eventsChannel: make(chan v1.Event, 64),
}
go streamer.stream()
return streamer
}
func (streamer *EventStreamer) Stream(event v1.Event) {
streamer.eventsChannel <- event
}
func (streamer *EventStreamer) stream() {
ctx := context.Background()
for {
events, finished := streamer.readAvailableEvents()
err := streamer.client.request(ctx, http.MethodPost, streamer.endpoint, events, nil, nil)
if err != nil {
streamer.sendErr = multierror.Append(streamer.sendErr, err)
}
if finished {
break
}
}
}
func (streamer *EventStreamer) readAvailableEvents() ([]v1.Event, bool) {
var result []v1.Event
// blocking wait for at least one event
nextEvent, ok := <-streamer.eventsChannel
if !ok {
return result, true
}
result = append(result, nextEvent)
// non-blocking wait for more events, if any
for {
select {
case nextEvent, ok := <-streamer.eventsChannel:
if !ok {
return result, true
}
result = append(result, nextEvent)
default:
return result, false
}
}
}
func (streamer *EventStreamer) Close() error {
close(streamer.eventsChannel)
return streamer.sendErr
}