Skip to content

Commit

Permalink
set names for eventbus event subscriptions (#2057)
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann authored Feb 7, 2023
1 parent b1a6822 commit e847002
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 13 deletions.
5 changes: 4 additions & 1 deletion p2p/host/autonat/autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {
}
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})

subscriber, err := as.host.EventBus().Subscribe([]interface{}{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)})
subscriber, err := as.host.EventBus().Subscribe(
[]any{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)},
eventbus.Name("autonat"),
)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"

logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -57,7 +58,7 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
}

func (r *AutoRelay) background() {
subReachability, err := r.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
subReachability, err := r.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("autorelay (background)"))
if err != nil {
log.Debug("failed to subscribe to the EvtLocalReachabilityChanged")
return
Expand Down
3 changes: 2 additions & 1 deletion p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
Expand Down Expand Up @@ -115,7 +116,7 @@ func (rf *relayFinder) background(ctx context.Context) {
rf.handleNewCandidates(ctx)
}()

subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("autorelay (relay finder)"))
if err != nil {
log.Error("failed to subscribe to the EvtPeerConnectednessChanged")
return
Expand Down
3 changes: 2 additions & 1 deletion p2p/host/pstoremanager/pstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"

logging "github.com/ipfs/go-log/v2"
)
Expand Down Expand Up @@ -68,7 +69,7 @@ func NewPeerstoreManager(pstore peerstore.Peerstore, eventBus event.Bus, opts ..
func (m *PeerstoreManager) Start() {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel
sub, err := m.eventBus.Subscribe(&event.EvtPeerConnectednessChanged{})
sub, err := m.eventBus.Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.Name("pstoremanager"))
if err != nil {
log.Warnf("subscription failed. Peerstore manager not activated. Error: %s", err)
return
Expand Down
6 changes: 3 additions & 3 deletions p2p/host/relaysvc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"sync"

relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
)

type RelayManager struct {
Expand Down Expand Up @@ -44,7 +44,7 @@ func (m *RelayManager) background(ctx context.Context) {
m.mutex.Unlock()
}()

subReachability, _ := m.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
subReachability, _ := m.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("relaysvc"))
defer subReachability.Close()

for {
Expand Down
4 changes: 3 additions & 1 deletion p2p/protocol/holepunch/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-msgio/pbio"

ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -122,7 +124,7 @@ func (s *Service) watchForPublicAddr() {
}

// Only start the holePuncher if we're behind a NAT / firewall.
sub, err := s.host.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
sub, err := s.host.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}, eventbus.Name("holepunch"))
if err != nil {
log.Debugf("failed to subscripe to Reachability event: %s", err)
return
Expand Down
9 changes: 5 additions & 4 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,11 @@ func (ids *idService) loop() {
defer ids.refCount.Done()

phs := make(map[peer.ID]*peerHandler)
sub, err := ids.Host.EventBus().Subscribe([]interface{}{
&event.EvtLocalProtocolsUpdated{},
&event.EvtLocalAddressesUpdated{},
}, eventbus.BufSize(256))
sub, err := ids.Host.EventBus().Subscribe(
[]any{&event.EvtLocalProtocolsUpdated{}, &event.EvtLocalAddressesUpdated{}},
eventbus.BufSize(256),
eventbus.Name("identify (loop)"),
)
if err != nil {
log.Errorf("failed to subscribe to events on the bus, err=%s", err)
return
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func NewObservedAddrManager(host host.Host) (*ObservedAddrManager, error) {
}
oas.ctx, oas.ctxCancel = context.WithCancel(context.Background())

reachabilitySub, err := host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
reachabilitySub, err := host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("identify (obsaddr)"))
if err != nil {
return nil, fmt.Errorf("failed to subscribe to reachability event: %s", err)
}
Expand Down

0 comments on commit e847002

Please sign in to comment.