Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GS: Relay messages to direct peers #949

Merged
merged 6 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@

for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, pubSubPeer)
# also try to remove from explicit table here
g.explicit.removePeer(t, pubSubPeer)
# also try to remove from direct peers table here
g.subscribedDirectPeers.removePeer(t, pubSubPeer)

for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, pubSubPeer)
Expand Down Expand Up @@ -245,7 +245,7 @@
# subscribe remote peer to the topic
discard g.gossipsub.addPeer(topic, peer)
if peer.peerId in g.parameters.directPeers:
discard g.explicit.addPeer(topic, peer)
discard g.subscribedDirectPeers.addPeer(topic, peer)
else:
trace "peer unsubscribed from topic"

Expand All @@ -259,7 +259,7 @@

g.fanout.removePeer(topic, peer)
if peer.peerId in g.parameters.directPeers:
g.explicit.removePeer(topic, peer)
g.subscribedDirectPeers.removePeer(topic, peer)

Check warning on line 262 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L262

Added line #L262 was not covered by tests

trace "gossip peers", peers = g.gossipsub.peers(topic), topic

Expand Down Expand Up @@ -359,6 +359,10 @@
break
toSendPeers.excl(seenPeers)

# add always direct peers
for topic in msg.topicIds:
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(topic))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll assume this gets filtered somewhere so we don't send to peers that have already seen the message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't, fixed



# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
Expand Down Expand Up @@ -522,7 +526,7 @@
var peers: HashSet[PubSubPeer]

# add always direct peers
peers.incl(g.explicit.getOrDefault(topic))
peers.incl(g.subscribedDirectPeers.getOrDefault(topic))

if topic in g.topics: # if we're subscribed use the mesh
peers.incl(g.mesh.getOrDefault(topic))
Expand Down Expand Up @@ -608,11 +612,13 @@
return peers.len

proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
let peer = g.peers.getOrDefault(id)
if isNil(peer):
if id notin g.peers:
trace "Attempting to dial a direct peer", peer = id
if g.switch.isConnected(id):
info "We are connected to a direct peer, but it isn't a GossipSub peer!", id
return

Check warning on line 619 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L619

Added line #L619 was not covered by tests
try:
await g.switch.connect(id, addrs)
await g.switch.connect(id, addrs, forceDial = true)
# populate the peer after it's connected
discard g.getOrCreatePeer(id, g.codecs)
except CancelledError as exc:
Expand Down
10 changes: 5 additions & 5 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@
let topic = graft.topicId
trace "peer grafted topic", peer, topic

# It is an error to GRAFT on a explicit peer
# It is an error to GRAFT on a direct peer
if peer.peerId in g.parameters.directPeers:
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
warn "an explicit peer attempted to graft us, peering agreements should be reciprocal",
warn "a direct peer attempted to graft us, peering agreements should be reciprocal",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can cause a log spam issue if they keep doing this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are supposed to trust them somewhat
The other solution would be to remove them from the trusted set in that case
(descoring isn't an option as we will never kick them due to bad score)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, right about the trust - let's make a comment about it

peer, topic
# and such an attempt should be logged and rejected with a PRUNE
prunes.add(ControlPrune(
Expand Down Expand Up @@ -352,7 +352,7 @@
# avoid negative score peers
it.score >= 0.0 and
it notin currentMesh[] and
# don't pick explicit peers
# don't pick direct peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
Expand Down Expand Up @@ -392,7 +392,7 @@
it notin currentMesh[] and
# avoid negative score peers
it.score >= 0.0 and
# don't pick explicit peers
# don't pick direct peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
Expand Down Expand Up @@ -494,7 +494,7 @@
# avoid negative score peers
it.score >= median.score and
it notin currentMesh[] and
# don't pick explicit peers
# don't pick direct peers

Check warning on line 497 in libp2p/protocols/pubsub/gossipsub/behavior.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub/behavior.nim#L497

Added line #L497 was not covered by tests
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
gossipsub*: PeerTable # peers that are subscribed to a topic
explicit*: PeerTable # directpeers that we keep alive explicitly
subscribedDirectPeers*: PeerTable # directpeers that we keep alive
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
gossip*: Table[string, seq[ControlIHave]] # pending gossip
Expand Down
18 changes: 13 additions & 5 deletions tests/pubsub/testgossipsub2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -167,36 +167,44 @@ suite "GossipSub":

asyncTest "GossipSub directPeers: always forward messages":
let
nodes = generateNodes(2, gossip = true)
nodes = generateNodes(3, gossip = true)

# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
nodes[2].switch.start(),
)

await GossipSub(nodes[0]).addDirectPeer(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs)
await GossipSub(nodes[1]).addDirectPeer(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs)
await GossipSub(nodes[1]).addDirectPeer(nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs)
await GossipSub(nodes[2]).addDirectPeer(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs)

var handlerFut = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete()
proc noop(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"

nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
nodes[0].subscribe("foobar", noop)
nodes[1].subscribe("foobar", noop)
nodes[2].subscribe("foobar", handler)

tryPublish await nodes[0].publish("foobar", toBytes("hellow")), 1

await handlerFut
await handlerFut.wait(2.seconds)

# peer shouldn't be in our mesh
check "foobar" notin GossipSub(nodes[0]).mesh
check "foobar" notin GossipSub(nodes[1]).mesh
check "foobar" notin GossipSub(nodes[2]).mesh

await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
nodes[1].switch.stop(),
nodes[2].switch.stop()
)

await allFuturesThrowing(nodesFut.concat())
Expand Down
Loading