173 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			173 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
| package metrics
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"fmt"
 | |
| 	"log"
 | |
| 	"net"
 | |
| 	"net/url"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// We force flush the statsite metrics after this period of
 | |
| 	// inactivity. Prevents stats from getting stuck in a buffer
 | |
| 	// forever.
 | |
| 	flushInterval = 100 * time.Millisecond
 | |
| )
 | |
| 
 | |
| // NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used
 | |
| // (and tested) from NewMetricSinkFromURL.
 | |
| func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) {
 | |
| 	return NewStatsiteSink(u.Host)
 | |
| }
 | |
| 
 | |
| // StatsiteSink provides a MetricSink that can be used with a
 | |
| // statsite metrics server
 | |
| type StatsiteSink struct {
 | |
| 	addr        string
 | |
| 	metricQueue chan string
 | |
| }
 | |
| 
 | |
| // NewStatsiteSink is used to create a new StatsiteSink
 | |
| func NewStatsiteSink(addr string) (*StatsiteSink, error) {
 | |
| 	s := &StatsiteSink{
 | |
| 		addr:        addr,
 | |
| 		metricQueue: make(chan string, 4096),
 | |
| 	}
 | |
| 	go s.flushMetrics()
 | |
| 	return s, nil
 | |
| }
 | |
| 
 | |
| // Close is used to stop flushing to statsite
 | |
| func (s *StatsiteSink) Shutdown() {
 | |
| 	close(s.metricQueue)
 | |
| }
 | |
| 
 | |
| func (s *StatsiteSink) SetGauge(key []string, val float32) {
 | |
| 	flatKey := s.flattenKey(key)
 | |
| 	s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
 | |
| }
 | |
| 
 | |
| func (s *StatsiteSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
 | |
| 	flatKey := s.flattenKeyLabels(key, labels)
 | |
| 	s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
 | |
| }
 | |
| 
 | |
| func (s *StatsiteSink) EmitKey(key []string, val float32) {
 | |
| 	flatKey := s.flattenKey(key)
 | |
| 	s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
 | |
| }
 | |
| 
 | |
| func (s *StatsiteSink) IncrCounter(key []string, val float32) {
 | |
| 	flatKey := s.flattenKey(key)
 | |
| 	s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
 | |
| }
 | |
| 
 | |
| func (s *StatsiteSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
 | |
| 	flatKey := s.flattenKeyLabels(key, labels)
 | |
| 	s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
 | |
| }
 | |
| 
 | |
| func (s *StatsiteSink) AddSample(key []string, val float32) {
 | |
| 	flatKey := s.flattenKey(key)
 | |
| 	s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
 | |
| }
 | |
| 
 | |
| func (s *StatsiteSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
 | |
| 	flatKey := s.flattenKeyLabels(key, labels)
 | |
| 	s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
 | |
| }
 | |
| 
 | |
| // Flattens the key for formatting, removes spaces
 | |
| func (s *StatsiteSink) flattenKey(parts []string) string {
 | |
| 	joined := strings.Join(parts, ".")
 | |
| 	return strings.Map(func(r rune) rune {
 | |
| 		switch r {
 | |
| 		case ':':
 | |
| 			fallthrough
 | |
| 		case ' ':
 | |
| 			return '_'
 | |
| 		default:
 | |
| 			return r
 | |
| 		}
 | |
| 	}, joined)
 | |
| }
 | |
| 
 | |
| // Flattens the key along with labels for formatting, removes spaces
 | |
| func (s *StatsiteSink) flattenKeyLabels(parts []string, labels []Label) string {
 | |
| 	for _, label := range labels {
 | |
| 		parts = append(parts, label.Value)
 | |
| 	}
 | |
| 	return s.flattenKey(parts)
 | |
| }
 | |
| 
 | |
| // Does a non-blocking push to the metrics queue
 | |
| func (s *StatsiteSink) pushMetric(m string) {
 | |
| 	select {
 | |
| 	case s.metricQueue <- m:
 | |
| 	default:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Flushes metrics
 | |
| func (s *StatsiteSink) flushMetrics() {
 | |
| 	var sock net.Conn
 | |
| 	var err error
 | |
| 	var wait <-chan time.Time
 | |
| 	var buffered *bufio.Writer
 | |
| 	ticker := time.NewTicker(flushInterval)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| CONNECT:
 | |
| 	// Attempt to connect
 | |
| 	sock, err = net.Dial("tcp", s.addr)
 | |
| 	if err != nil {
 | |
| 		log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
 | |
| 		goto WAIT
 | |
| 	}
 | |
| 
 | |
| 	// Create a buffered writer
 | |
| 	buffered = bufio.NewWriter(sock)
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case metric, ok := <-s.metricQueue:
 | |
| 			// Get a metric from the queue
 | |
| 			if !ok {
 | |
| 				goto QUIT
 | |
| 			}
 | |
| 
 | |
| 			// Try to send to statsite
 | |
| 			_, err := buffered.Write([]byte(metric))
 | |
| 			if err != nil {
 | |
| 				log.Printf("[ERR] Error writing to statsite! Err: %s", err)
 | |
| 				goto WAIT
 | |
| 			}
 | |
| 		case <-ticker.C:
 | |
| 			if err := buffered.Flush(); err != nil {
 | |
| 				log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
 | |
| 				goto WAIT
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| WAIT:
 | |
| 	// Wait for a while
 | |
| 	wait = time.After(time.Duration(5) * time.Second)
 | |
| 	for {
 | |
| 		select {
 | |
| 		// Dequeue the messages to avoid backlog
 | |
| 		case _, ok := <-s.metricQueue:
 | |
| 			if !ok {
 | |
| 				goto QUIT
 | |
| 			}
 | |
| 		case <-wait:
 | |
| 			goto CONNECT
 | |
| 		}
 | |
| 	}
 | |
| QUIT:
 | |
| 	s.metricQueue = nil
 | |
| }
 |