From 60948e14fee9734e3a40e5d62e0ea1b1db29054e Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Fri, 8 Nov 2024 08:19:54 +0100 Subject: [PATCH] Rendezvous: use a buffered channel of size 1 (#219) * Rendezvous: use a buffered channel of size 1 * Fix spelling of "absence" in comment --- internal/controller/rendezvous/rendezvous.go | 9 +++- .../controller/rendezvous/rendezvous_test.go | 44 +++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/internal/controller/rendezvous/rendezvous.go b/internal/controller/rendezvous/rendezvous.go index 55e7695..dd175d8 100644 --- a/internal/controller/rendezvous/rendezvous.go +++ b/internal/controller/rendezvous/rendezvous.go @@ -26,7 +26,7 @@ func New[T any]() *Rendezvous[T] { func (rendezvous *Rendezvous[T]) Request(ctx context.Context, session string) (chan T, func()) { tokenSlot := &TokenSlot[T]{ ctx: ctx, - ch: make(chan T), + ch: make(chan T, 1), } rendezvous.sessions.Store(session, tokenSlot) @@ -42,7 +42,12 @@ func (rendezvous *Rendezvous[T]) Respond(session string, conn T) (context.Contex return nil, ErrInvalidToken } - tokenSlot.ch <- conn + select { + case tokenSlot.ch <- conn: + // first response + default: + // the receiving party still hadn't picked up the previous response, so discard the new one + } return tokenSlot.ctx, nil } diff --git a/internal/controller/rendezvous/rendezvous_test.go b/internal/controller/rendezvous/rendezvous_test.go index 3461360..772b727 100644 --- a/internal/controller/rendezvous/rendezvous_test.go +++ b/internal/controller/rendezvous/rendezvous_test.go @@ -37,3 +37,47 @@ func TestProxy(t *testing.T) { wg.Wait() } + +// TestProxyNonBlockingRespond ensures that the Respond() won't block +// the caller in the absence of the receiving party. +func TestProxyNonBlockingRespond(t *testing.T) { + ctx := context.Background() + + expectedConn, _ := net.Pipe() + + proxy := rendezvous.New[net.Conn]() + + token := uuid.New().String() + + connCh, cancel := proxy.Request(ctx, token) + defer cancel() + + // Call Respond() in the same goroutine as Request() + _, err := proxy.Respond(token, expectedConn) + require.NoError(t, err) + + actualConn := <-connCh + require.Equal(t, expectedConn, actualConn) +} + +// TestProxyDoubleRespond ensures that the Respond() can be +// safely called multiple times. +func TestProxyDoubleRespond(t *testing.T) { + ctx := context.Background() + + expectedConn, _ := net.Pipe() + + proxy := rendezvous.New[net.Conn]() + + token := uuid.New().String() + + _, cancel := proxy.Request(ctx, token) + defer cancel() + + // Call Respond() twice + _, err := proxy.Respond(token, expectedConn) + require.NoError(t, err) + + _, err = proxy.Respond(token, expectedConn) + require.NoError(t, err) +}