From e3cf52af29c5cd90e75517412bcb964e3386e979 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 6 Feb 2025 15:19:27 +0100 Subject: [PATCH] refactor filter to react when the remote peer closes the stream Better control when the remote peer closes the WakuFilterPushCodec stream remotely. For example, go-waku closes the stream for every received message. On the other hand, js-waku keeps the stream opened. Therefore, we support both scenarios. --- vendor/nim-libp2p | 2 +- waku/node/peer_manager/peer_manager.nim | 36 +++++++++++++++++++++++++ waku/waku_filter_v2/protocol.nim | 32 +++++++--------------- 3 files changed, 46 insertions(+), 24 deletions(-) diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index c5aa3736f9..a4f0a638e7 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit c5aa3736f96e4d66f6aa653a2351ded74b7d21a9 +Subproject commit a4f0a638e718f05ecec01ae3a6ad2838714e7e40 diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 6894f55781..ba04b6b004 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -518,6 +518,42 @@ proc connectedPeers*( return (inPeers, outPeers) +proc getStreamByPeerIdAndProtocol*( + pm: PeerManager, peerId: PeerId, protocol: string +): Future[Result[Connection, string]] {.async.} = + ## Establishes a new stream to the given peer and protocol or returns the existing stream, if any. + ## Notice that the "Connection" type represents a stream within a transport connection + ## (we will need to adapt this term.) + + let peerIdsMuxers: Table[PeerId, seq[Muxer]] = pm.switch.connManager.getConnections() + if not peerIdsMuxers.contains(peerId): + return err("peerId not found in connManager: " & $peerId) + + let muxers = peerIdsMuxers[peerId] + + var streams = newSeq[Connection](0) + for m in muxers: + for s in m.getStreams(): + ## getStreams is defined in nim-libp2p + streams.add(s) + + ## Try to get the opened streams for the given protocol + let streamsOfInterest = streams.filterIt( + it.protocol == protocol and not LPStream(it).isClosed and + not LPStream(it).isClosedRemotely + ) + + if streamsOfInterest.len > 0: + ## In theory there should be one stream per protocol. Then we just pick up the 1st + return ok(streamsOfInterest[0]) + + ## There isn't still a stream. Let's dial to create one + let streamRes = await pm.dialPeer(peerId, protocol) + if streamRes.isNone(): + return err("getStreamByPeerIdProto no connection to peer: " & $peerId) + + return ok(streamRes.get()) + proc connectToRelayPeers*(pm: PeerManager) {.async.} = var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) let totalRelayPeers = inRelayPeers.len + outRelayPeers.len diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 22504488e0..d8b79ab670 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -172,29 +172,15 @@ proc pushToPeer( ): Future[Result[void, string]] {.async.} = debug "pushing message to subscribed peer", peerId = shortLog(peerId) - if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec): - # Check that peer has not been removed from peer store - error "no addresses for peer", peerId = shortLog(peerId) - return err("no addresses for peer: " & $peerId) - - let conn = - if wf.peerConnections.contains(peerId): - wf.peerConnections[peerId] - else: - ## we never pushed a message before, let's dial then - let connRes = await wf.peerManager.dialPeer(peerId, WakuFilterPushCodec) - if connRes.isNone(): - ## We do not remove this peer, but allow the underlying peer manager - ## to do so if it is deemed necessary - error "pushToPeer no connection to peer", peerId = shortLog(peerId) - return err("pushToPeer no connection to peer: " & shortLog(peerId)) - - let newConn = connRes.get() - wf.peerConnections[peerId] = newConn - newConn - - await conn.writeLp(buffer) - debug "published successful", peerId = shortLog(peerId), conn + let stream = ( + await wf.peerManager.getStreamByPeerIdAndProtocol(peerId, WakuFilterPushCodec) + ).valueOr: + error "pushToPeer failed", error + return err("pushToPeer failed: " & $error) + + await stream.writeLp(buffer) + + debug "published successful", peerId = shortLog(peerId), stream waku_service_network_bytes.inc( amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"] )