Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GS: Relay messages to direct peers #949

Merged
merged 6 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@

for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, pubSubPeer)
# also try to remove from explicit table here
g.explicit.removePeer(t, pubSubPeer)
# also try to remove from direct peers table here
g.subscribedDirectPeers.removePeer(t, pubSubPeer)

for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, pubSubPeer)
Expand Down Expand Up @@ -245,7 +245,7 @@
# subscribe remote peer to the topic
discard g.gossipsub.addPeer(topic, peer)
if peer.peerId in g.parameters.directPeers:
discard g.explicit.addPeer(topic, peer)
discard g.subscribedDirectPeers.addPeer(topic, peer)
else:
trace "peer unsubscribed from topic"

Expand All @@ -259,7 +259,7 @@

g.fanout.removePeer(topic, peer)
if peer.peerId in g.parameters.directPeers:
g.explicit.removePeer(topic, peer)
g.subscribedDirectPeers.removePeer(topic, peer)

Check warning on line 262 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L262

Added line #L262 was not covered by tests

trace "gossip peers", peers = g.gossipsub.peers(topic), topic

Expand Down Expand Up @@ -338,6 +338,9 @@
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])

# add direct peers
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t))

# Don't send it to source peer, or peers that
# sent it during validation
toSendPeers.excl(peer)
Expand Down Expand Up @@ -522,7 +525,7 @@
var peers: HashSet[PubSubPeer]

# add always direct peers
peers.incl(g.explicit.getOrDefault(topic))
peers.incl(g.subscribedDirectPeers.getOrDefault(topic))

if topic in g.topics: # if we're subscribed use the mesh
peers.incl(g.mesh.getOrDefault(topic))
Expand Down Expand Up @@ -608,11 +611,13 @@
return peers.len

proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
let peer = g.peers.getOrDefault(id)
if isNil(peer):
if id notin g.peers:
trace "Attempting to dial a direct peer", peer = id
if g.switch.isConnected(id):
warn "We are connected to a direct peer, but it isn't a GossipSub peer!", id
return

Check warning on line 618 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L618

Added line #L618 was not covered by tests
try:
await g.switch.connect(id, addrs)
await g.switch.connect(id, addrs, forceDial = true)
# populate the peer after it's connected
discard g.getOrCreatePeer(id, g.codecs)
except CancelledError as exc:
Expand Down
11 changes: 6 additions & 5 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@
let topic = graft.topicId
trace "peer grafted topic", peer, topic

# It is an error to GRAFT on a explicit peer
# It is an error to GRAFT on a direct peer
if peer.peerId in g.parameters.directPeers:
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
warn "an explicit peer attempted to graft us, peering agreements should be reciprocal",
# we are trusting direct peer not to abuse this
warn "a direct peer attempted to graft us, peering agreements should be reciprocal",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can cause a log spam issue if they keep doing this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are supposed to trust them somewhat
The other solution would be to remove them from the trusted set in that case
(descoring isn't an option as we will never kick them due to bad score)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, right about the trust - let's make a comment about it

peer, topic
# and such an attempt should be logged and rejected with a PRUNE
prunes.add(ControlPrune(
Expand Down Expand Up @@ -352,7 +353,7 @@
# avoid negative score peers
it.score >= 0.0 and
it notin currentMesh[] and
# don't pick explicit peers
# don't pick direct peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
Expand Down Expand Up @@ -392,7 +393,7 @@
it notin currentMesh[] and
# avoid negative score peers
it.score >= 0.0 and
# don't pick explicit peers
# don't pick direct peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
Expand Down Expand Up @@ -494,7 +495,7 @@
# avoid negative score peers
it.score >= median.score and
it notin currentMesh[] and
# don't pick explicit peers
# don't pick direct peers

Check warning on line 498 in libp2p/protocols/pubsub/gossipsub/behavior.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub/behavior.nim#L498

Added line #L498 was not covered by tests
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
gossipsub*: PeerTable # peers that are subscribed to a topic
explicit*: PeerTable # directpeers that we keep alive explicitly
subscribedDirectPeers*: PeerTable # directpeers that we keep alive
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
gossip*: Table[string, seq[ControlIHave]] # pending gossip
Expand Down
18 changes: 13 additions & 5 deletions tests/pubsub/testgossipsub2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -167,36 +167,44 @@ suite "GossipSub":

asyncTest "GossipSub directPeers: always forward messages":
let
nodes = generateNodes(2, gossip = true)
nodes = generateNodes(3, gossip = true)

# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
nodes[2].switch.start(),
)

await GossipSub(nodes[0]).addDirectPeer(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs)
await GossipSub(nodes[1]).addDirectPeer(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs)
await GossipSub(nodes[1]).addDirectPeer(nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs)
await GossipSub(nodes[2]).addDirectPeer(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs)

var handlerFut = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete()
proc noop(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"

nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
nodes[0].subscribe("foobar", noop)
nodes[1].subscribe("foobar", noop)
nodes[2].subscribe("foobar", handler)

tryPublish await nodes[0].publish("foobar", toBytes("hellow")), 1

await handlerFut
await handlerFut.wait(2.seconds)

# peer shouldn't be in our mesh
check "foobar" notin GossipSub(nodes[0]).mesh
check "foobar" notin GossipSub(nodes[1]).mesh
check "foobar" notin GossipSub(nodes[2]).mesh

await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
nodes[1].switch.stop(),
nodes[2].switch.stop()
)

await allFuturesThrowing(nodesFut.concat())
Expand Down
Loading