Skip to content

Commit

Permalink
refactor filter to react when the remote peer closes the stream
Browse files Browse the repository at this point in the history
Better control when the remote peer closes the WakuFilterPushCodec
stream remotely.
For example, go-waku closes the stream for every received message.
On the other hand, js-waku keeps the stream opened.
Therefore, we support both scenarios.
  • Loading branch information
Ivansete-status committed Feb 6, 2025
1 parent 81a19c3 commit e3cf52a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
36 changes: 36 additions & 0 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,42 @@ proc connectedPeers*(

return (inPeers, outPeers)

proc getStreamByPeerIdAndProtocol*(
pm: PeerManager, peerId: PeerId, protocol: string
): Future[Result[Connection, string]] {.async.} =
## Establishes a new stream to the given peer and protocol or returns the existing stream, if any.
## Notice that the "Connection" type represents a stream within a transport connection
## (we will need to adapt this term.)

let peerIdsMuxers: Table[PeerId, seq[Muxer]] = pm.switch.connManager.getConnections()
if not peerIdsMuxers.contains(peerId):
return err("peerId not found in connManager: " & $peerId)

let muxers = peerIdsMuxers[peerId]

var streams = newSeq[Connection](0)
for m in muxers:
for s in m.getStreams():
## getStreams is defined in nim-libp2p
streams.add(s)

## Try to get the opened streams for the given protocol
let streamsOfInterest = streams.filterIt(
it.protocol == protocol and not LPStream(it).isClosed and
not LPStream(it).isClosedRemotely
)

if streamsOfInterest.len > 0:
## In theory there should be one stream per protocol. Then we just pick up the 1st
return ok(streamsOfInterest[0])

## There isn't still a stream. Let's dial to create one
let streamRes = await pm.dialPeer(peerId, protocol)
if streamRes.isNone():
return err("getStreamByPeerIdProto no connection to peer: " & $peerId)

return ok(streamRes.get())

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
Expand Down
32 changes: 9 additions & 23 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -172,29 +172,15 @@ proc pushToPeer(
): Future[Result[void, string]] {.async.} =
debug "pushing message to subscribed peer", peerId = shortLog(peerId)

if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
error "no addresses for peer", peerId = shortLog(peerId)
return err("no addresses for peer: " & $peerId)

let conn =
if wf.peerConnections.contains(peerId):
wf.peerConnections[peerId]
else:
## we never pushed a message before, let's dial then
let connRes = await wf.peerManager.dialPeer(peerId, WakuFilterPushCodec)
if connRes.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
error "pushToPeer no connection to peer", peerId = shortLog(peerId)
return err("pushToPeer no connection to peer: " & shortLog(peerId))

let newConn = connRes.get()
wf.peerConnections[peerId] = newConn
newConn

await conn.writeLp(buffer)
debug "published successful", peerId = shortLog(peerId), conn
let stream = (
await wf.peerManager.getStreamByPeerIdAndProtocol(peerId, WakuFilterPushCodec)
).valueOr:
error "pushToPeer failed", error
return err("pushToPeer failed: " & $error)

await stream.writeLp(buffer)

debug "published successful", peerId = shortLog(peerId), stream
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
)
Expand Down

0 comments on commit e3cf52a

Please sign in to comment.