controller(notifier): avoid deleting newer slot from stale unregister
Make Register() cleanup delete only the slot created by that specific registration, so stale callbacks cannot remove a newer worker slot. Add concurrentmap.DeleteIf() to perform the check-and-delete atomically. Add tests for DeleteIf behavior.
This commit is contained in:
parent
42c6b723f6
commit
d0639f5cd5
|
|
@ -37,3 +37,20 @@ func (cmap *ConcurrentMap[T]) Delete(key string) {
|
|||
|
||||
delete(cmap.nonConcurrentMap, key)
|
||||
}
|
||||
|
||||
func (cmap *ConcurrentMap[T]) DeleteIf(key string, predicate func(T) bool) bool {
|
||||
cmap.mtx.Lock()
|
||||
defer cmap.mtx.Unlock()
|
||||
|
||||
value, ok := cmap.nonConcurrentMap[key]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if !predicate(value) {
|
||||
return false
|
||||
}
|
||||
|
||||
delete(cmap.nonConcurrentMap, key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,34 @@
|
|||
package concurrentmap
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDeleteIf(t *testing.T) {
|
||||
cmap := NewConcurrentMap[int]()
|
||||
cmap.Store("a", 1)
|
||||
|
||||
deleted := cmap.DeleteIf("a", func(value int) bool {
|
||||
return value == 1
|
||||
})
|
||||
require.True(t, deleted)
|
||||
|
||||
_, ok := cmap.Load("a")
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
func TestDeleteIfPredicateFalse(t *testing.T) {
|
||||
cmap := NewConcurrentMap[int]()
|
||||
cmap.Store("a", 1)
|
||||
|
||||
deleted := cmap.DeleteIf("a", func(value int) bool {
|
||||
return value == 2
|
||||
})
|
||||
require.False(t, deleted)
|
||||
|
||||
value, ok := cmap.Load("a")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 1, value)
|
||||
}
|
||||
|
|
@ -31,16 +31,19 @@ func NewNotifier(logger *zap.SugaredLogger) *Notifier {
|
|||
func (watcher *Notifier) Register(ctx context.Context, worker string) (chan *rpc.WatchInstruction, func()) {
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
workerCh := make(chan *rpc.WatchInstruction)
|
||||
|
||||
watcher.logger.Debugf("registering worker %s", worker)
|
||||
watcher.workers.Store(worker, &WorkerSlot{
|
||||
slot := &WorkerSlot{
|
||||
ctx: subCtx,
|
||||
ch: workerCh,
|
||||
})
|
||||
}
|
||||
|
||||
watcher.logger.Debugf("registering worker %s", worker)
|
||||
watcher.workers.Store(worker, slot)
|
||||
|
||||
return workerCh, func() {
|
||||
watcher.logger.Debugf("deleting worker %s", worker)
|
||||
watcher.workers.Delete(worker)
|
||||
watcher.workers.DeleteIf(worker, func(current *WorkerSlot) bool {
|
||||
return current == slot
|
||||
})
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue