Skip to content

Commit

Permalink
fix(eventbus): Idempotent wildcardSub close
Browse files Browse the repository at this point in the history
The tests expects the wildcardSub .Close to be idempotent, but it never
checked this. The tests also weirdly asserted that there were orphaned
events on the channel. That was wrong as the events should be flushed on
close as to not indefinitely block an emitter.
  • Loading branch information
MarcoPolo committed Nov 18, 2024
1 parent 66e8db2 commit e5b739e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
12 changes: 8 additions & 4 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,21 @@ type wildcardSub struct {
w *wildcardNode
metricsTracer MetricsTracer
name string
closeOnce sync.Once
}

func (w *wildcardSub) Out() <-chan interface{} {
return w.ch
}

func (w *wildcardSub) Close() error {
w.w.removeSink(w.ch)
if w.metricsTracer != nil {
w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription))
}
w.closeOnce.Do(func() {
w.w.removeSink(w.ch)
if w.metricsTracer != nil {
w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription))
}
})

return nil
}

Expand Down
15 changes: 11 additions & 4 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,13 @@ func TestManyWildcardSubscriptions(t *testing.T) {
require.NoError(t, em1.Emit(EventA{}))
require.NoError(t, em2.Emit(EventB(1)))

// the first five still have 2 events, while the other five have 4 events.
for _, s := range subs[:5] {
require.Len(t, s.Out(), 2)
}
// the first five 0 events because it was closed. The other five
// have 4 events.
require.EventuallyWithT(t, func(t *assert.CollectT) {
for _, s := range subs[:5] {
require.Len(t, s.Out(), 0, "expected closed subscription to have flushed events")
}
}, 2*time.Second, 100*time.Millisecond)

for _, s := range subs[5:] {
require.Len(t, s.Out(), 4)
Expand All @@ -407,6 +410,10 @@ func TestManyWildcardSubscriptions(t *testing.T) {
for _, s := range subs {
require.NoError(t, s.Close())
}

for _, s := range subs {
require.Zero(t, s.(*wildcardSub).w.nSinks.Load())
}
}

func TestWildcardValidations(t *testing.T) {
Expand Down

0 comments on commit e5b739e

Please sign in to comment.