diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 8bb88bfa3c..ef4cbafeae 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -74,7 +74,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = behaviourPenaltyWeight: -1.0, behaviourPenaltyDecay: 0.999, disconnectBadPeers: false, - enablePX: false + enablePX: false, + bandwidthEstimatebps: 100_000_000 # 100 Mbps or 12.5 MBps ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -521,14 +522,16 @@ method publish*(g: GossipSub, # With flood publishing enabled, the mesh is used when propagating messages from other peers, # but a peer's own messages will always be published to all known peers in the topic, limited # to the amount of peers we can send it to in one heartbeat - let - bandwidth = 12_500_000 div 1000 # 100 Mbps or 12.5 MBps TODO replace with bandwidth estimate - msToTransmit = max(data.len div bandwidth, 1) - maxPeersToFlod = - max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow) + var maxPeersToFlodOpt: Opt[int64] + if g.parameters.bandwidthEstimatebps > 0: + let + bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate + msToTransmit = max(data.len div bandwidth, 1) + maxPeersToFlodOpt = Opt.some(max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)) for peer in g.gossipsub.getOrDefault(topic): - if peers.len >= maxPeersToFlod: break + maxPeersToFlodOpt.withValue(maxPeersToFlod): + if peers.len >= maxPeersToFlod: break if peer.score >= g.parameters.publishThreshold: trace "publish: including flood/high score peer", peer peers.incl(peer) diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 0f5fe55c70..333b9fae17 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -142,6 +142,8 @@ type disconnectBadPeers*: bool enablePX*: bool + bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely + BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 81b46f4d76..9a4d9aa450 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -636,27 +636,31 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) - asyncTest "e2e - GossipSub floodPublish limit": - var passed: Future[bool] = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - - let - nodes = generateNodes( - 20, - gossip = true) + # Helper procedures to avoid repetition + proc setupNodes(count: int): seq[PubSub] = + generateNodes(count, gossip = true) + proc startNodes(nodes: seq[PubSub]) {.async.} = await allFuturesThrowing( nodes.mapIt(it.switch.start()) ) - var gossip1: GossipSub = GossipSub(nodes[0]) - gossip1.parameters.floodPublish = true - gossip1.parameters.heartbeatInterval = milliseconds(700) + proc stopNodes(nodes: seq[PubSub]) {.async.} = + await allFuturesThrowing( + nodes.mapIt(it.switch.stop()) + ) + + proc connectNodes(nodes: seq[PubSub], target: PubSub) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" - for node in nodes[1..^1]: + for node in nodes: node.subscribe("foobar", handler) - await node.switch.connect(nodes[0].peerInfo.peerId, nodes[0].peerInfo.addrs) + await node.switch.connect(target.peerInfo.peerId, target.peerInfo.addrs) + + proc baseTestProcedure(nodes: seq[PubSub], gossip1: GossipSub, numPeersFirstMsg: int, numPeersSecondMsg: int) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" block setup: for i in 0..<50: @@ -665,20 +669,45 @@ suite "GossipSub": await sleepAsync(10.milliseconds) check false - check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == gossip1.parameters.dLow - - check (await nodes[0].publish("foobar", newSeq[byte](500_001))) == 17 + check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == numPeersFirstMsg + check (await nodes[0].publish("foobar", newSeq[byte](500_001))) == numPeersSecondMsg # Now try with a mesh gossip1.subscribe("foobar", handler) checkExpiring: gossip1.mesh.peers("foobar") > 5 # use a different length so that the message is not equal to the last - check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == 17 + check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == numPeersSecondMsg - await allFuturesThrowing( - nodes.mapIt(it.switch.stop()) - ) + # Actual tests + asyncTest "e2e - GossipSub floodPublish limit": + + let + nodes = setupNodes(20) + gossip1 = GossipSub(nodes[0]) + + gossip1.parameters.floodPublish = true + gossip1.parameters.heartbeatInterval = milliseconds(700) + + await startNodes(nodes) + await connectNodes(nodes[1..^1], nodes[0]) + await baseTestProcedure(nodes, gossip1, gossip1.parameters.dLow, 17) + await stopNodes(nodes) + + asyncTest "e2e - GossipSub floodPublish limit with bandwidthEstimatebps = 0": + + let + nodes = setupNodes(20) + gossip1 = GossipSub(nodes[0]) + + gossip1.parameters.floodPublish = true + gossip1.parameters.heartbeatInterval = milliseconds(700) + gossip1.parameters.bandwidthEstimatebps = 0 + + await startNodes(nodes) + await connectNodes(nodes[1..^1], nodes[0]) + await baseTestProcedure(nodes, gossip1, nodes.len - 1, nodes.len - 1) + await stopNodes(nodes) asyncTest "e2e - GossipSub with multiple peers": var runs = 10