From e847002522280395eb8923c1f6f446c06e24c56e Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 6 Feb 2023 16:27:49 -0800 Subject: [PATCH] set names for eventbus event subscriptions (#2057) --- p2p/host/autonat/autonat.go | 5 ++++- p2p/host/autorelay/autorelay.go | 3 ++- p2p/host/autorelay/relay_finder.go | 3 ++- p2p/host/pstoremanager/pstoremanager.go | 3 ++- p2p/host/relaysvc/relay.go | 6 +++--- p2p/protocol/holepunch/svc.go | 4 +++- p2p/protocol/identify/id.go | 9 +++++---- p2p/protocol/identify/obsaddr.go | 2 +- 8 files changed, 22 insertions(+), 13 deletions(-) diff --git a/p2p/host/autonat/autonat.go b/p2p/host/autonat/autonat.go index 0fd8b0ec49..7368cccfe4 100644 --- a/p2p/host/autonat/autonat.go +++ b/p2p/host/autonat/autonat.go @@ -119,7 +119,10 @@ func New(h host.Host, options ...Option) (AutoNAT, error) { } as.status.Store(autoNATResult{network.ReachabilityUnknown, nil}) - subscriber, err := as.host.EventBus().Subscribe([]interface{}{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)}) + subscriber, err := as.host.EventBus().Subscribe( + []any{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)}, + eventbus.Name("autonat"), + ) if err != nil { return nil, err } diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index e4e3568eff..58cebfb431 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" basic "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" logging "github.com/ipfs/go-log/v2" ma "github.com/multiformats/go-multiaddr" @@ -57,7 +58,7 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { } func (r *AutoRelay) background() { - subReachability, err := r.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) + subReachability, err := r.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("autorelay (background)")) if err != nil { log.Debug("failed to subscribe to the EvtLocalReachabilityChanged") return diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 20d71240b9..7d5f304bb3 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" basic "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" @@ -115,7 +116,7 @@ func (rf *relayFinder) background(ctx context.Context) { rf.handleNewCandidates(ctx) }() - subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("autorelay (relay finder)")) if err != nil { log.Error("failed to subscribe to the EvtPeerConnectednessChanged") return diff --git a/p2p/host/pstoremanager/pstoremanager.go b/p2p/host/pstoremanager/pstoremanager.go index f8382c709d..da8f84c477 100644 --- a/p2p/host/pstoremanager/pstoremanager.go +++ b/p2p/host/pstoremanager/pstoremanager.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" logging "github.com/ipfs/go-log/v2" ) @@ -68,7 +69,7 @@ func NewPeerstoreManager(pstore peerstore.Peerstore, eventBus event.Bus, opts .. func (m *PeerstoreManager) Start() { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel - sub, err := m.eventBus.Subscribe(&event.EvtPeerConnectednessChanged{}) + sub, err := m.eventBus.Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.Name("pstoremanager")) if err != nil { log.Warnf("subscription failed. Peerstore manager not activated. Error: %s", err) return diff --git a/p2p/host/relaysvc/relay.go b/p2p/host/relaysvc/relay.go index a36e20e002..bea8aa73a4 100644 --- a/p2p/host/relaysvc/relay.go +++ b/p2p/host/relaysvc/relay.go @@ -4,11 +4,11 @@ import ( "context" "sync" - relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" - "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" + relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" ) type RelayManager struct { @@ -44,7 +44,7 @@ func (m *RelayManager) background(ctx context.Context) { m.mutex.Unlock() }() - subReachability, _ := m.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) + subReachability, _ := m.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("relaysvc")) defer subReachability.Close() for { diff --git a/p2p/protocol/holepunch/svc.go b/p2p/protocol/holepunch/svc.go index 8476686bc8..5de7c7cf30 100644 --- a/p2p/protocol/holepunch/svc.go +++ b/p2p/protocol/holepunch/svc.go @@ -13,9 +13,11 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-msgio/pbio" + ma "github.com/multiformats/go-multiaddr" ) @@ -122,7 +124,7 @@ func (s *Service) watchForPublicAddr() { } // Only start the holePuncher if we're behind a NAT / firewall. - sub, err := s.host.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + sub, err := s.host.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}, eventbus.Name("holepunch")) if err != nil { log.Debugf("failed to subscripe to Reachability event: %s", err) return diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 2a1af713c7..6f56931051 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -189,10 +189,11 @@ func (ids *idService) loop() { defer ids.refCount.Done() phs := make(map[peer.ID]*peerHandler) - sub, err := ids.Host.EventBus().Subscribe([]interface{}{ - &event.EvtLocalProtocolsUpdated{}, - &event.EvtLocalAddressesUpdated{}, - }, eventbus.BufSize(256)) + sub, err := ids.Host.EventBus().Subscribe( + []any{&event.EvtLocalProtocolsUpdated{}, &event.EvtLocalAddressesUpdated{}}, + eventbus.BufSize(256), + eventbus.Name("identify (loop)"), + ) if err != nil { log.Errorf("failed to subscribe to events on the bus, err=%s", err) return diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index bd72175dc1..9b20ee4f61 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -141,7 +141,7 @@ func NewObservedAddrManager(host host.Host) (*ObservedAddrManager, error) { } oas.ctx, oas.ctxCancel = context.WithCancel(context.Background()) - reachabilitySub, err := host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) + reachabilitySub, err := host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("identify (obsaddr)")) if err != nil { return nil, fmt.Errorf("failed to subscribe to reachability event: %s", err) }