diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 9a4d9aa450..f2251fbf92 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -10,8 +10,9 @@ {.used.} import sequtils, options, tables, sets, sugar -import chronos, stew/byteutils +import chronos, stew/byteutils, chronos/ratelimit import chronicles +import metrics import utils, ../../libp2p/[errors, peerid, peerinfo, @@ -20,6 +21,7 @@ import utils, ../../libp2p/[errors, crypto/crypto, protocols/pubsub/pubsub, protocols/pubsub/gossipsub, + protocols/pubsub/gossipsub/scoring, protocols/pubsub/pubsubpeer, protocols/pubsub/peertable, protocols/pubsub/timedcache, @@ -928,3 +930,66 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + proc initializeGossipTest(): Future[(seq[PubSub], GossipSub, GossipSub)] {.async.} = + let nodes = generateNodes( + 2, + gossip = true, + overheadRateLimit = Opt.some((20, 1.millis))) + + discard await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + await subscribeNodes(nodes) + + proc handle(topic: string, data: seq[byte]) {.async, gcsafe.} = discard + + let gossip0 = GossipSub(nodes[0]) + let gossip1 = GossipSub(nodes[1]) + + gossip0.subscribe("foobar", handle) + gossip1.subscribe("foobar", handle) + await waitSubGraph(nodes, "foobar") + + return (nodes, gossip0, gossip1) + + asyncTest "e2e - GossipSub should process valid messages": + let (nodes, gossip0, gossip1) = await initializeGossipTest() + + gossip0.broadcast(gossip1.mesh["foobar"], RPCMsg( + messages: @[Message(topicIDs: @["foobar"], data: "Valid data".toBytes)]) + ) + await sleepAsync(300.millis) + + expect(system.KeyError): + check libp2p_gossipsub_peers_rate_limit_disconnections.valueByName("libp2p_gossipsub_peers_rate_limit_disconnections_total", @["nim-libp2p"]) == 0 + + await stopNodes(nodes) + + asyncTest "e2e - GossipSub should rate limit undecodable messages above what is allowed": + let (nodes, gossip0, gossip1) = await initializeGossipTest() + + # Simulate sending an undecodable message + await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](30, 1.byte)) + await sleepAsync(300.millis) + + check libp2p_gossipsub_peers_rate_limit_disconnections.valueByName("libp2p_gossipsub_peers_rate_limit_disconnections_total", @["nim-libp2p"]) == 1 + + await stopNodes(nodes) + + asyncTest "e2e - GossipSub should rate limit messages with excessive useless data": + let (nodes, gossip0, gossip1) = await initializeGossipTest() + + gossip0.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(prune: @[ + ControlPrune( + topicID: "foobar", + peers: @[PeerInfoMsg(peerId: PeerId(data: newSeq[byte](30)))], + backoff: 123'u64 + ) + ])))) + await sleepAsync(300.millis) + + check libp2p_gossipsub_peers_rate_limit_disconnections.valueByName("libp2p_gossipsub_peers_rate_limit_disconnections_total", @["nim-libp2p"]) == 1 + + await stopNodes(nodes) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 82209dcc15..b1b9d21449 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -5,7 +5,7 @@ const libp2p_pubsub_anonymize {.booldefine.} = false import hashes, random, tables, sets, sequtils -import chronos, stew/[byteutils, results] +import chronos, stew/[byteutils, results], chronos/ratelimit import ../../libp2p/[builders, protocols/pubsub/errors, protocols/pubsub/pubsub, @@ -67,7 +67,8 @@ proc generateNodes*( sendSignedPeerRecord = false, unsubscribeBackoff = 1.seconds, maxMessageSize: int = 1024 * 1024, - enablePX: bool = false): seq[PubSub] = + enablePX: bool = false, + overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] = Opt.none(tuple[bytes: int, interval: Duration])): seq[PubSub] = for i in 0..