diff --git a/.changelog/v0.38.6/features/3472-p2p-has-channel-api.md b/.changelog/v0.38.6/features/3472-p2p-has-channel-api.md new file mode 100644 index 0000000000..b554a29ce1 --- /dev/null +++ b/.changelog/v0.38.6/features/3472-p2p-has-channel-api.md @@ -0,0 +1,3 @@ +- `[p2p]` `HasChannel(chID)` method added to the `Peer` interface, used by + reactors to check whether a peer implements/supports a given channel. + ([#3472](https://github.com/cometbft/cometbft/issues/3472)) diff --git a/CHANGELOG.md b/CHANGELOG.md index e8b53a0f19..14f044b994 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,11 @@ It also includes a few other bug fixes and performance improvements. * [#109](https://github.com/osmosis-labs/cometbft/pull/109) perf(p2p,mempool): Make mempool reactor receive not block. (Fixed by either #3209, #3230) * [#105](https://github.com/osmosis-labs/cometbft/pull/105) perf(p2p)!: Remove PeerSendBytesTotal metric #3184 * [#95](https://github.com/osmosis-labs/cometbft/pull/95) perf(types) Make a new method `GetByAddressMut` for `ValSet`, which does not copy the returned validator. (#3129) +* [#128](https://github.com/osmosis-labs/cometbft/pull/128) feat(p2p): render HasChannel(chID) is a public p2p.Peer method (#3510) +* [#126]() Remove p2p allocations for wrapping outbound packets +* [#125]() Fix marshalling and concurrency overhead within broadcast routines +* perf(p2p): Only update send monitor once per batch packet msg send (#3382) +* [#124]() Secret connection read buffer ## v0.38.10 diff --git a/consensus/reactor.go b/consensus/reactor.go index 4fa1ca3484..cd9cae4b0b 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -569,6 +569,10 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState { func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + if !peer.HasChannel(DataChannel) { + logger.Info("Peer does not implement DataChannel.") + return + } rng := cmtrand.NewStdlibRand() OUTER_LOOP: @@ -729,6 +733,10 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + if !peer.HasChannel(VoteChannel) { + logger.Info("Peer does not implement VoteChannel.") + return + } rng := cmtrand.NewStdlibRand() // Simple hack to throttle logs upon sleep. diff --git a/evidence/reactor.go b/evidence/reactor.go index 10d3e53111..fbc25f63ec 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -64,7 +64,9 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. func (evR *Reactor) AddPeer(peer p2p.Peer) { - go evR.broadcastEvidenceRoutine(peer) + if peer.HasChannel(EvidenceChannel) { + go evR.broadcastEvidenceRoutine(peer) + } } // Receive implements Reactor. diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 620c8fa7d5..d04c5f5d49 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -212,6 +212,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) { e, ok := i.(p2p.Envelope) return ok && e.ChannelID == evidence.EvidenceChannel })).Return(false) + p.On("HasChannel", evidence.EvidenceChannel).Maybe().Return(true) quitChan := make(<-chan struct{}) p.On("Quit").Return(quitChan) ps := peerState{2} diff --git a/mempool/reactor.go b/mempool/reactor.go index 3f40181bc3..88f7051d32 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -96,7 +96,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *Reactor) AddPeer(peer p2p.Peer) { - if memR.config.Broadcast { + if memR.config.Broadcast && peer.HasChannel(MempoolChannel) { go func() { // Always forward transactions to unconditional peers. if !memR.Switch.IsPeerUnconditional(peer.ID()) { diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go index 90acb65aa5..4214550a12 100644 --- a/p2p/mock/peer.go +++ b/p2p/mock/peer.go @@ -45,6 +45,7 @@ func NewPeer(ip net.IP) *Peer { func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error func (mp *Peer) TrySend(_ p2p.Envelope) bool { return true } func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true } +func (mp *Peer) HasChannel(_ byte) bool { return true } func (mp *Peer) Send(_ p2p.Envelope) bool { return true } func (mp *Peer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{ diff --git a/p2p/mocks/peer.go b/p2p/mocks/peer.go index bc0ed10470..3358896967 100644 --- a/p2p/mocks/peer.go +++ b/p2p/mocks/peer.go @@ -67,6 +67,24 @@ func (_m *Peer) GetRemovalFailed() bool { return r0 } +// HasChannel provides a mock function with given fields: chID +func (_m *Peer) HasChannel(chID byte) bool { + ret := _m.Called(chID) + + if len(ret) == 0 { + panic("no return value specified for HasChannel") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(byte) bool); ok { + r0 = rf(chID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // ID provides a mock function with given fields: func (_m *Peer) ID() p2p.ID { ret := _m.Called() diff --git a/p2p/peer.go b/p2p/peer.go index fabecc3a82..e1e371d810 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -39,6 +39,7 @@ type Peer interface { Status() cmtconn.ConnectionStatus SocketAddr() *NetAddress // actual address of the socket + HasChannel(chID byte) bool // Does the peer implement this channel? Send(Envelope) bool TrySend(Envelope) bool TrySendMarshalled(MarshalledEnvelope) bool @@ -117,7 +118,7 @@ type peer struct { // peer's node info and the channel it knows about // channels = nodeInfo.Channels - // cached to avoid copying nodeInfo in hasChannel + // cached to avoid copying nodeInfo in HasChannel nodeInfo NodeInfo channels []byte @@ -289,7 +290,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool { if !p.IsRunning() { return false - } else if !p.hasChannel(chID) { + } else if !p.HasChannel(chID) { return false } res := sendFunc(chID, msgBytes) @@ -309,9 +310,8 @@ func (p *peer) Set(key string, data interface{}) { p.Data.Set(key, data) } -// hasChannel returns true if the peer reported -// knowing about the given chID. -func (p *peer) hasChannel(chID byte) bool { +// HasChannel returns whether the peer reported implementing this channel. +func (p *peer) HasChannel(chID byte) bool { for _, ch := range p.channels { if ch == chID { return true diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 5a7701a55b..58fe752057 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -19,6 +19,7 @@ type mockPeer struct { } func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error +func (mp *mockPeer) HasChannel(byte) bool { return true } func (mp *mockPeer) TrySend(Envelope) bool { return true } func (mp *mockPeer) TrySendMarshalled(MarshalledEnvelope) bool { return true } func (mp *mockPeer) Send(Envelope) bool { return true } diff --git a/p2p/types.go b/p2p/types.go index 51bdd6ad48..6a36940946 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -17,7 +17,7 @@ type Envelope struct { ChannelID byte } -// MarshalledEnvelope contains a proto message, its marshalled message, with sender routing info. +// MarshalledEnvelope contains a proto message, its marshaled message, with sender routing info. type MarshalledEnvelope struct { Envelope MarshalledMessage []byte diff --git a/spec/p2p/reactor-api/p2p-api.md b/spec/p2p/reactor-api/p2p-api.md index ad1fbff311..06386bdce5 100644 --- a/spec/p2p/reactor-api/p2p-api.md +++ b/spec/p2p/reactor-api/p2p-api.md @@ -185,15 +185,16 @@ From this point, reactors can use the methods of the new `Peer` instance. The table below summarizes the interaction of the standard reactors with connected peers, with the `Peer` methods used by them: -| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX | -|--------------------------------------------|-----------|------------|------------|---------|-----------|-------| -| `ID() ID` | x | x | x | x | x | x | -| `IsRunning() bool` | x | | | x | x | | -| `Quit() <-chan struct{}` | | | | x | x | | -| `Get(string) interface{}` | x | | | x | x | | -| `Set(string, interface{})` | x | | | | | | -| `Send(Envelope) bool` | x | x | x | x | x | x | -| `TrySend(Envelope) bool` | x | x | | | | | +| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX | +|----------------------------|-----------|------------|------------|---------|----------|-----| +| `ID() ID` | x | x | x | x | x | x | +| `IsRunning() bool` | x | | | x | x | | +| `Quit() <-chan struct{}` | | | | x | x | | +| `Get(string) interface{}` | x | | | x | x | | +| `Set(string, interface{})` | x | | | | | | +| `HasChannel(byte) bool` | x | | | x | x | | +| `Send(Envelope) bool` | x | x | x | x | x | x | +| `TrySend(Envelope) bool` | x | x | | | | | The above list is not exhaustive as it does not include all the `Peer` methods invoked by the PEX reactor, a special component that should be considered part @@ -269,8 +270,10 @@ Finally, a `Peer` instance allows a reactor to send messages to companion reactors running at that peer. This is ultimately the goal of the switch when it provides `Peer` instances to the registered reactors. -There are two methods for sending messages: +There are two methods for sending messages, and one auxiliary method to check +whether the peer supports a given channel: + func (p Peer) HasChannel(chID byte) bool func (p Peer) Send(e Envelope) bool func (p Peer) TrySend(e Envelope) bool @@ -279,6 +282,9 @@ set as follows: - `ChannelID`: the channel the message should be sent through, which defines the reactor that will process the message; + - The auxiliary `HasChannel()` method allows testing whether the remote peer + implements a channel; if it does not, both message-sending methods will + immediately return `false`, as sending always fails. - `Src`: this field represents the source of an incoming message, which is irrelevant for outgoing messages; - `Message`: the actual message's payload, which is marshalled using protocol buffers.