From 40f58e4aeec0088ee27c2410cf5b1a3fdf9a462d Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Wed, 27 Sep 2023 20:16:00 +0400 Subject: [PATCH] More RPC-related logs (#136) * More RPC-related logs * Notifier should be set before we use it in the scheduler --- internal/concurrentmap/concurrentmap.go | 10 +++++----- internal/controller/controller.go | 2 +- internal/controller/notifier/notifier.go | 7 ++++++- internal/controller/notifier/notifier_test.go | 3 ++- internal/worker/rpc.go | 6 ++++++ 5 files changed, 20 insertions(+), 8 deletions(-) diff --git a/internal/concurrentmap/concurrentmap.go b/internal/concurrentmap/concurrentmap.go index 5b9095b..badb823 100644 --- a/internal/concurrentmap/concurrentmap.go +++ b/internal/concurrentmap/concurrentmap.go @@ -5,17 +5,17 @@ import ( ) type ConcurrentMap[T any] struct { - nonConcurrentMap map[any]T + nonConcurrentMap map[string]T mtx sync.Mutex } func NewConcurrentMap[T any]() *ConcurrentMap[T] { return &ConcurrentMap[T]{ - nonConcurrentMap: map[any]T{}, + nonConcurrentMap: map[string]T{}, } } -func (cmap *ConcurrentMap[T]) Load(key any) (T, bool) { +func (cmap *ConcurrentMap[T]) Load(key string) (T, bool) { cmap.mtx.Lock() defer cmap.mtx.Unlock() @@ -24,14 +24,14 @@ func (cmap *ConcurrentMap[T]) Load(key any) (T, bool) { return result, ok } -func (cmap *ConcurrentMap[T]) Store(id any, value T) { +func (cmap *ConcurrentMap[T]) Store(id string, value T) { cmap.mtx.Lock() defer cmap.mtx.Unlock() cmap.nonConcurrentMap[id] = value } -func (cmap *ConcurrentMap[T]) Delete(key any) { +func (cmap *ConcurrentMap[T]) Delete(key string) { cmap.mtx.Lock() defer cmap.mtx.Unlock() diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 433463c..8b5e3d6 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -58,7 +58,6 @@ type Controller struct { func New(opts ...Option) (*Controller, error) { controller := &Controller{ - workerNotifier: notifier.NewNotifier(), proxy: proxy.NewProxy(), workerOfflineTimeout: 3 * time.Minute, maxWorkersPerLicense: maxWorkersPerDefaultLicense, @@ -101,6 +100,7 @@ func New(opts ...Option) (*Controller, error) { return nil, err } controller.store = store + controller.workerNotifier = notifier.NewNotifier(controller.logger.With("component", "rpc")) controller.scheduler = scheduler.NewScheduler(store, controller.workerNotifier, controller.workerOfflineTimeout, controller.logger) diff --git a/internal/controller/notifier/notifier.go b/internal/controller/notifier/notifier.go index 316a7fc..ab15675 100644 --- a/internal/controller/notifier/notifier.go +++ b/internal/controller/notifier/notifier.go @@ -6,12 +6,14 @@ import ( "fmt" "github.com/cirruslabs/orchard/internal/concurrentmap" "github.com/cirruslabs/orchard/rpc" + "go.uber.org/zap" ) var ErrNoWorker = errors.New("no worker registered with this name") type Notifier struct { workers *concurrentmap.ConcurrentMap[*WorkerSlot] + logger *zap.SugaredLogger } type WorkerSlot struct { @@ -19,9 +21,10 @@ type WorkerSlot struct { ch chan *rpc.WatchInstruction } -func NewNotifier() *Notifier { +func NewNotifier(logger *zap.SugaredLogger) *Notifier { return &Notifier{ workers: concurrentmap.NewConcurrentMap[*WorkerSlot](), + logger: logger, } } @@ -29,12 +32,14 @@ func (watcher *Notifier) Register(ctx context.Context, worker string) (chan *rpc subCtx, cancel := context.WithCancel(ctx) workerCh := make(chan *rpc.WatchInstruction) + watcher.logger.Debugf("registering worker %s", worker) watcher.workers.Store(worker, &WorkerSlot{ ctx: subCtx, ch: workerCh, }) return workerCh, func() { + watcher.logger.Debugf("deleting worker %s", worker) watcher.workers.Delete(worker) cancel() } diff --git a/internal/controller/notifier/notifier_test.go b/internal/controller/notifier/notifier_test.go index 0adeddd..f5392fc 100644 --- a/internal/controller/notifier/notifier_test.go +++ b/internal/controller/notifier/notifier_test.go @@ -6,6 +6,7 @@ import ( "github.com/cirruslabs/orchard/internal/controller/notifier" "github.com/google/uuid" "github.com/stretchr/testify/require" + "go.uber.org/zap" "sync" "testing" "time" @@ -14,7 +15,7 @@ import ( func TestNotifier(t *testing.T) { ctx := context.Background() - notifier := notifier.NewNotifier() + notifier := notifier.NewNotifier(zap.NewNop().Sugar()) var topic = uuid.New().String() diff --git a/internal/worker/rpc.go b/internal/worker/rpc.go index 98dd0f5..221237d 100644 --- a/internal/worker/rpc.go +++ b/internal/worker/rpc.go @@ -21,6 +21,8 @@ import ( ) func (worker *Worker) watchRPC(ctx context.Context) error { + worker.logger.Infof("connecting to %s over gRPC", worker.client.GRPCTarget()) + conn, err := grpc.Dial(worker.client.GRPCTarget(), grpc.WithTransportCredentials(worker.client.GRPCTransportCredentials()), grpc.WithKeepaliveParams(keepalive.ClientParameters{ @@ -31,6 +33,8 @@ func (worker *Worker) watchRPC(ctx context.Context) error { return err } + worker.logger.Infof("gRPC connection established, starting gRPC stream with the controller") + client := rpc.NewControllerClient(conn) ctxWithMetadata := metadata.NewOutgoingContext(ctx, worker.grpcMetadata()) @@ -40,6 +44,8 @@ func (worker *Worker) watchRPC(ctx context.Context) error { return err } + worker.logger.Infof("running gRPC stream with the controller") + for { watchFromController, err := stream.Recv() if err != nil {