Rendezvous: use a buffered channel of size 1 (#219)
* Rendezvous: use a buffered channel of size 1 * Fix spelling of "absence" in comment
This commit is contained in:
parent
d66d667f1e
commit
60948e14fe
|
|
@ -26,7 +26,7 @@ func New[T any]() *Rendezvous[T] {
|
|||
func (rendezvous *Rendezvous[T]) Request(ctx context.Context, session string) (chan T, func()) {
|
||||
tokenSlot := &TokenSlot[T]{
|
||||
ctx: ctx,
|
||||
ch: make(chan T),
|
||||
ch: make(chan T, 1),
|
||||
}
|
||||
|
||||
rendezvous.sessions.Store(session, tokenSlot)
|
||||
|
|
@ -42,7 +42,12 @@ func (rendezvous *Rendezvous[T]) Respond(session string, conn T) (context.Contex
|
|||
return nil, ErrInvalidToken
|
||||
}
|
||||
|
||||
tokenSlot.ch <- conn
|
||||
select {
|
||||
case tokenSlot.ch <- conn:
|
||||
// first response
|
||||
default:
|
||||
// the receiving party still hadn't picked up the previous response, so discard the new one
|
||||
}
|
||||
|
||||
return tokenSlot.ctx, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,3 +37,47 @@ func TestProxy(t *testing.T) {
|
|||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestProxyNonBlockingRespond ensures that the Respond() won't block
|
||||
// the caller in the absence of the receiving party.
|
||||
func TestProxyNonBlockingRespond(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
expectedConn, _ := net.Pipe()
|
||||
|
||||
proxy := rendezvous.New[net.Conn]()
|
||||
|
||||
token := uuid.New().String()
|
||||
|
||||
connCh, cancel := proxy.Request(ctx, token)
|
||||
defer cancel()
|
||||
|
||||
// Call Respond() in the same goroutine as Request()
|
||||
_, err := proxy.Respond(token, expectedConn)
|
||||
require.NoError(t, err)
|
||||
|
||||
actualConn := <-connCh
|
||||
require.Equal(t, expectedConn, actualConn)
|
||||
}
|
||||
|
||||
// TestProxyDoubleRespond ensures that the Respond() can be
|
||||
// safely called multiple times.
|
||||
func TestProxyDoubleRespond(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
expectedConn, _ := net.Pipe()
|
||||
|
||||
proxy := rendezvous.New[net.Conn]()
|
||||
|
||||
token := uuid.New().String()
|
||||
|
||||
_, cancel := proxy.Request(ctx, token)
|
||||
defer cancel()
|
||||
|
||||
// Call Respond() twice
|
||||
_, err := proxy.Respond(token, expectedConn)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = proxy.Respond(token, expectedConn)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue