Skip to content

Commit

Permalink
Bandwidth estimate as a parameter (#941)
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos authored Aug 14, 2023
1 parent d6263bf commit f80ce31
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 28 deletions.
17 changes: 10 additions & 7 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
behaviourPenaltyWeight: -1.0,
behaviourPenaltyDecay: 0.999,
disconnectBadPeers: false,
enablePX: false
enablePX: false,
bandwidthEstimatebps: 100_000_000 # 100 Mbps or 12.5 MBps
)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
Expand Down Expand Up @@ -521,14 +522,16 @@ method publish*(g: GossipSub,
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
# but a peer's own messages will always be published to all known peers in the topic, limited
# to the amount of peers we can send it to in one heartbeat
let
bandwidth = 12_500_000 div 1000 # 100 Mbps or 12.5 MBps TODO replace with bandwidth estimate
msToTransmit = max(data.len div bandwidth, 1)
maxPeersToFlod =
max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)
var maxPeersToFlodOpt: Opt[int64]
if g.parameters.bandwidthEstimatebps > 0:
let
bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate
msToTransmit = max(data.len div bandwidth, 1)
maxPeersToFlodOpt = Opt.some(max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow))

for peer in g.gossipsub.getOrDefault(topic):
if peers.len >= maxPeersToFlod: break
maxPeersToFlodOpt.withValue(maxPeersToFlod):
if peers.len >= maxPeersToFlod: break
if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer
peers.incl(peer)
Expand Down
2 changes: 2 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ type
disconnectBadPeers*: bool
enablePX*: bool

bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]

Expand Down
71 changes: 50 additions & 21 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -636,27 +636,31 @@ suite "GossipSub":

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - GossipSub floodPublish limit":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"

let
nodes = generateNodes(
20,
gossip = true)
# Helper procedures to avoid repetition
proc setupNodes(count: int): seq[PubSub] =
generateNodes(count, gossip = true)

proc startNodes(nodes: seq[PubSub]) {.async.} =
await allFuturesThrowing(
nodes.mapIt(it.switch.start())
)

var gossip1: GossipSub = GossipSub(nodes[0])
gossip1.parameters.floodPublish = true
gossip1.parameters.heartbeatInterval = milliseconds(700)
proc stopNodes(nodes: seq[PubSub]) {.async.} =
await allFuturesThrowing(
nodes.mapIt(it.switch.stop())
)

proc connectNodes(nodes: seq[PubSub], target: PubSub) {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"

for node in nodes[1..^1]:
for node in nodes:
node.subscribe("foobar", handler)
await node.switch.connect(nodes[0].peerInfo.peerId, nodes[0].peerInfo.addrs)
await node.switch.connect(target.peerInfo.peerId, target.peerInfo.addrs)

proc baseTestProcedure(nodes: seq[PubSub], gossip1: GossipSub, numPeersFirstMsg: int, numPeersSecondMsg: int) {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"

block setup:
for i in 0..<50:
Expand All @@ -665,20 +669,45 @@ suite "GossipSub":
await sleepAsync(10.milliseconds)
check false

check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == gossip1.parameters.dLow

check (await nodes[0].publish("foobar", newSeq[byte](500_001))) == 17
check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == numPeersFirstMsg
check (await nodes[0].publish("foobar", newSeq[byte](500_001))) == numPeersSecondMsg

# Now try with a mesh
gossip1.subscribe("foobar", handler)
checkExpiring: gossip1.mesh.peers("foobar") > 5

# use a different length so that the message is not equal to the last
check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == 17
check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == numPeersSecondMsg

await allFuturesThrowing(
nodes.mapIt(it.switch.stop())
)
# Actual tests
asyncTest "e2e - GossipSub floodPublish limit":

let
nodes = setupNodes(20)
gossip1 = GossipSub(nodes[0])

gossip1.parameters.floodPublish = true
gossip1.parameters.heartbeatInterval = milliseconds(700)

await startNodes(nodes)
await connectNodes(nodes[1..^1], nodes[0])
await baseTestProcedure(nodes, gossip1, gossip1.parameters.dLow, 17)
await stopNodes(nodes)

asyncTest "e2e - GossipSub floodPublish limit with bandwidthEstimatebps = 0":

let
nodes = setupNodes(20)
gossip1 = GossipSub(nodes[0])

gossip1.parameters.floodPublish = true
gossip1.parameters.heartbeatInterval = milliseconds(700)
gossip1.parameters.bandwidthEstimatebps = 0

await startNodes(nodes)
await connectNodes(nodes[1..^1], nodes[0])
await baseTestProcedure(nodes, gossip1, nodes.len - 1, nodes.len - 1)
await stopNodes(nodes)

asyncTest "e2e - GossipSub with multiple peers":
var runs = 10
Expand Down

0 comments on commit f80ce31

Please sign in to comment.