Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanups #1092

Merged
merged 6 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config.nims
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ if dirExists("nimbledeps/pkgs2"):
switch("warning", "CaseTransition:off")
switch("warning", "ObservableStores:off")
switch("warning", "LockLevel:off")
--define:chronosStrictException
--styleCheck:usages
switch("warningAsError", "UseBase:on")
--styleCheck:error
Expand Down
4 changes: 2 additions & 2 deletions libp2p/multiaddress.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
{.push public.}

import pkg/chronos, chronicles
import std/[nativesockets, hashes]
import tables, strutils, sets, stew/shims/net
import std/[nativesockets, net, hashes]
import tables, strutils, sets
import multicodec, multihash, multibase, transcoder, vbuffer, peerid,
protobuf/minprotobuf, errors, utility
import stew/[base58, base32, endians2, results]
Expand Down
4 changes: 2 additions & 2 deletions libp2p/multistream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ proc addHandler*[E](

proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([])`
# TODO https://github.com/nim-lang/Nim/issues/23445
var futs = newSeqOfCap[Future[void].Raising([CancelledError])](m.handlers.len)
for it in m.handlers:
futs.add it.protocol.start()
Expand All @@ -278,15 +279,14 @@ proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
doAssert m.handlers.len == futs.len, "Handlers modified while starting"
for i, fut in futs:
if not fut.finished:
pending.add noCancel fut.cancelAndWait()
pending.add fut.cancelAndWait()
elif fut.completed:
pending.add m.handlers[i].protocol.stop()
else:
static: doAssert typeof(fut).E is (CancelledError,)
await noCancel allFutures(pending)
raise exc


proc stop*(m: MultistreamSelect) {.async: (raises: []).} =
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([CancelledError])`
var futs = newSeqOfCap[Future[void].Raising([])](m.handlers.len)
Expand Down
35 changes: 16 additions & 19 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh wi
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"])

proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [].} =
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) =
g.withPeerStats(p.peerId) do (stats: var PeerStats):
var info = stats.topicInfos.getOrDefault(topic)
info.graftTime = Moment.now()
Expand All @@ -46,7 +46,7 @@ proc pruned*(g: GossipSub,
p: PubSubPeer,
topic: string,
setBackoff: bool = true,
backoff = none(Duration)) {.raises: [].} =
backoff = none(Duration)) =
if setBackoff:
let
backoffDuration = backoff.get(g.parameters.pruneBackoff)
Expand All @@ -70,7 +70,7 @@ proc pruned*(g: GossipSub,

trace "pruned", peer=p, topic

proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} =
proc handleBackingOff*(t: var BackoffTable, topic: string) =
let now = Moment.now()
var expired = toSeq(t.getOrDefault(topic).pairs())
expired.keepIf do (pair: tuple[peer: PeerId, expire: Moment]) -> bool:
Expand All @@ -79,7 +79,7 @@ proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} =
t.withValue(topic, v):
v[].del(peer)

proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: [].} =
proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] =
if not g.parameters.enablePX:
return @[]
var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq()
Expand All @@ -100,7 +100,7 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises:

proc handleGraft*(g: GossipSub,
peer: PubSubPeer,
grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows
grafts: seq[ControlGraft]): seq[ControlPrune] =
var prunes: seq[ControlPrune]
for graft in grafts:
let topic = graft.topicID
Expand Down Expand Up @@ -205,7 +205,7 @@ proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRe
routingRecords


proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [].} =
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes:
let topic = prune.topicID

Expand Down Expand Up @@ -239,7 +239,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r

proc handleIHave*(g: GossipSub,
peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant {.raises: [].} =
ihaves: seq[ControlIHave]): ControlIWant =
var res: ControlIWant
if peer.score < g.parameters.gossipThreshold:
trace "ihave: ignoring low score peer", peer, score = peer.score
Expand Down Expand Up @@ -273,7 +273,7 @@ proc handleIDontWant*(g: GossipSub,

proc handleIWant*(g: GossipSub,
peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] {.raises: [].} =
iwants: seq[ControlIWant]): seq[Message] =
var
messages: seq[Message]
invalidRequests = 0
Expand All @@ -299,7 +299,7 @@ proc handleIWant*(g: GossipSub,
messages.add(msg)
return messages

proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
proc commitMetrics(metrics: var MeshMetrics) =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics)
Expand All @@ -308,7 +308,7 @@ proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"])
libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"])

proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) {.raises: [].} =
proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) =
logScope:
topic
mesh = g.mesh.peers(topic)
Expand Down Expand Up @@ -538,7 +538,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune, isHighPriority = true)

proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
proc dropFanoutPeers*(g: GossipSub) =
# drop peers that we haven't published to in
# GossipSubFanoutTTL seconds
let now = Moment.now()
Expand All @@ -551,7 +551,7 @@ proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
for topic in drops:
g.lastFanoutPubSub.del topic

proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} =
proc replenishFanout*(g: GossipSub, topic: string) =
## get fanout peers for a topic
logScope: topic
trace "about to replenish fanout"
Expand All @@ -567,7 +567,7 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} =

trace "fanout replenished with peers", peers = g.fanout.peers(topic)

proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: [].} =
proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] =
## gossip iHave messages to peers
##

Expand Down Expand Up @@ -620,17 +620,16 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
g.rng.shuffle(allPeers)
allPeers.setLen(target)

let msgIdsAsSet = ihave.messageIDs.toHashSet()

for peer in allPeers:
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
peer.sentIHaves[^1].incl(msgIdsAsSet)
for msgId in ihave.messageIDs:
peer.sentIHaves[^1].incl(msgId)

libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)

return control

proc onHeartbeat(g: GossipSub) {.raises: [].} =
proc onHeartbeat(g: GossipSub) =
# reset IWANT budget
# reset IHAVE cap
block:
Expand Down Expand Up @@ -694,8 +693,6 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =

g.mcache.shift() # shift the cache

# {.pop.} # raises []

proc heartbeat*(g: GossipSub) {.async.} =
heartbeat "GossipSub", g.parameters.heartbeatInterval:
trace "running heartbeat", instance = cast[int](g)
Expand Down
2 changes: 0 additions & 2 deletions libp2p/protocols/pubsub/gossipsub/scoring.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
else:
0.0

{.pop.}

proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
try:
await g.switch.disconnect(peer.peerId)
Expand Down
37 changes: 22 additions & 15 deletions libp2p/protocols/pubsub/mcache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,57 @@

{.push raises: [].}

import std/[sets, tables, options]
import std/[sets, tables]
import rpc/[messages]
import results

export sets, tables, messages, options
export sets, tables, messages, results

type
CacheEntry* = object
mid*: MessageId
msgId*: MessageId
topic*: string

MCache* = object of RootObj
msgs*: Table[MessageId, Message]
history*: seq[seq[CacheEntry]]
pos*: int
windowSize*: Natural

func get*(c: MCache, mid: MessageId): Option[Message] =
if mid in c.msgs:
try: some(c.msgs[mid])
func get*(c: MCache, msgId: MessageId): Opt[Message] =
if msgId in c.msgs:
try: Opt.some(c.msgs[msgId])
except KeyError: raiseAssert "checked"
else:
none(Message)
Opt.none(Message)

func contains*(c: MCache, mid: MessageId): bool =
mid in c.msgs
func contains*(c: MCache, msgId: MessageId): bool =
msgId in c.msgs

func put*(c: var MCache, msgId: MessageId, msg: Message) =
if not c.msgs.hasKeyOrPut(msgId, msg):
# Only add cache entry if the message was not already in the cache
c.history[0].add(CacheEntry(mid: msgId, topic: msg.topic))
c.history[c.pos].add(CacheEntry(msgId: msgId, topic: msg.topic))

func window*(c: MCache, topic: string): HashSet[MessageId] =
let
len = min(c.windowSize, c.history.len)

for i in 0..<len:
for entry in c.history[i]:
# Work backwards from `pos` in the circular buffer
for entry in c.history[(c.pos + c.history.len - i) mod c.history.len]:
if entry.topic == topic:
result.incl(entry.mid)
result.incl(entry.msgId)

func shift*(c: var MCache) =
for entry in c.history.pop():
c.msgs.del(entry.mid)
# Shift circular buffer to write to a new position, clearing it from past
# iterations
c.pos = (c.pos + 1) mod c.history.len

c.history.insert(@[])
for entry in c.history[c.pos]:
c.msgs.del(entry.msgId)

reset(c.history[c.pos])

func init*(T: type MCache, window, history: Natural): T =
T(
Expand Down
1 change: 0 additions & 1 deletion libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import ./errors as pubsub_errors,
../../errors,
../../utility

import metrics
import stew/results
export results

Expand Down
4 changes: 2 additions & 2 deletions libp2p/protocols/pubsub/rpc/messages.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

{.push raises: [].}

import options, sequtils, sugar
import "../../.."/[
import options, sequtils
import ../../../[
peerid,
routing_record,
utility
Expand Down
2 changes: 1 addition & 1 deletion libp2p/transports/tcptransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,4 @@ method dial*(
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):
if address.protocols.isOk:
return TCP.match(address)
return TCP.match(address)
5 changes: 2 additions & 3 deletions tests/pubsub/testmcache.nim
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
{.used.}

import unittest2, options, sets, sequtils
import unittest2, sequtils
import stew/byteutils
import ../../libp2p/[peerid,
crypto/crypto,
protocols/pubsub/mcache,
protocols/pubsub/rpc/messages]
import ./utils
protocols/pubsub/rpc/message]

var rng = newRng()

Expand Down
Loading