56 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			56 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
package controllers
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
)
 | 
						|
 | 
						|
// worker is a worker that has a non-blocking bounded queue of scale targets, dequeues scale target and executes the scale operation one by one.
 | 
						|
type worker struct {
 | 
						|
	scaleTargetQueue chan *ScaleTarget
 | 
						|
	work             func(*ScaleTarget)
 | 
						|
	done             chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
func newWorker(ctx context.Context, queueLimit int, work func(*ScaleTarget)) *worker {
 | 
						|
	w := &worker{
 | 
						|
		scaleTargetQueue: make(chan *ScaleTarget, queueLimit),
 | 
						|
		work:             work,
 | 
						|
		done:             make(chan struct{}),
 | 
						|
	}
 | 
						|
 | 
						|
	go func() {
 | 
						|
		defer close(w.done)
 | 
						|
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
				return
 | 
						|
			case t := <-w.scaleTargetQueue:
 | 
						|
				work(t)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	return w
 | 
						|
}
 | 
						|
 | 
						|
// Add the scale target to the bounded queue, returning the result as a bool value. It returns true on successful enqueue, and returns false otherwise.
 | 
						|
// When returned false, the queue is already full so the enqueue operation must be retried later.
 | 
						|
// If the enqueue was triggered by an external source and there's no intermediate queue that we can use,
 | 
						|
// you must instruct the source to resend the original request later.
 | 
						|
// In case you're building a webhook server around this worker, this means that you must return a http error to the webhook server,
 | 
						|
// so that (hopefully) the sender can resend the webhook event later, or at least the human operator can notice or be notified about the
 | 
						|
// webhook develiery failure so that a manual retry can be done later.
 | 
						|
func (w *worker) Add(st *ScaleTarget) bool {
 | 
						|
	select {
 | 
						|
	case w.scaleTargetQueue <- st:
 | 
						|
		return true
 | 
						|
	default:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) Done() chan struct{} {
 | 
						|
	return w.done
 | 
						|
}
 |