Skip to content

Commit

Permalink
feat: Yamux timeout (#1029)
Browse files Browse the repository at this point in the history
  • Loading branch information
lchenut authored Feb 22, 2024
1 parent 7faa0fa commit 349496e
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 21 deletions.
8 changes: 6 additions & 2 deletions libp2p/builders.nim
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,12 @@ proc withMplex*(
b.muxers.add(MuxerProvider.new(newMuxer, MplexCodec))
b

proc withYamux*(b: SwitchBuilder, windowSize: int = YamuxDefaultWindowSize): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn, windowSize)
proc withYamux*(b: SwitchBuilder,
windowSize: int = YamuxDefaultWindowSize,
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer =
Yamux.new(conn, windowSize, inTimeout = inTimeout, outTimeout = outTimeout)

assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times"
b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec))
Expand Down
42 changes: 27 additions & 15 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ type
maxChannCount: int
windowSize: int
maxSendQueueSize: int
inTimeout: Duration
outTimeout: Duration

proc lenBySrc(m: Yamux, isSrc: bool): int =
for v in m.channels.values():
Expand All @@ -416,7 +418,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
# that the initial recvWindow is 256k.
# To solve this contradiction, no updateWindow will be sent until recvWindow is less
# than maxRecvWindow
result = YamuxChannel(
var stream = YamuxChannel(
id: id,
maxRecvWindow: recvWindow,
recvWindow: if recvWindow > YamuxDefaultWindowSize: recvWindow else: YamuxDefaultWindowSize,
Expand All @@ -427,22 +429,28 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
receivedData: newAsyncEvent(),
closedRemotely: newFuture[void]()
)
result.objName = "YamuxStream"
result.dir = if isSrc: Direction.Out else: Direction.In
result.timeoutHandler = proc(): Future[void] {.gcsafe.} =
stream.objName = "YamuxStream"
if isSrc:
stream.dir = Direction.Out
stream.timeout = m.outTimeout
else:
stream.dir = Direction.In
stream.timeout = m.inTimeout
stream.timeoutHandler = proc(): Future[void] {.gcsafe.} =
trace "Idle timeout expired, resetting YamuxChannel"
result.reset()
result.initStream()
result.peerId = m.connection.peerId
result.observedAddr = m.connection.observedAddr
result.transportDir = m.connection.transportDir
stream.reset(true)
stream.initStream()
stream.peerId = m.connection.peerId
stream.observedAddr = m.connection.observedAddr
stream.transportDir = m.connection.transportDir
when defined(libp2p_agents_metrics):
result.shortAgent = m.connection.shortAgent
m.channels[id] = result
asyncSpawn m.cleanupChannel(result)
stream.shortAgent = m.connection.shortAgent
m.channels[id] = stream
asyncSpawn m.cleanupChannel(stream)
trace "created channel", id, pid=m.connection.peerId
when defined(libp2p_yamux_metrics):
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId])
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $stream.peerId])
return stream

method close*(m: Yamux) {.async.} =
if m.isClosed == true:
Expand Down Expand Up @@ -567,11 +575,15 @@ method newStream*(
proc new*(T: type[Yamux], conn: Connection,
maxChannCount: int = MaxChannelCount,
windowSize: int = YamuxDefaultWindowSize,
maxSendQueueSize: int = MaxSendQueueSize): T =
maxSendQueueSize: int = MaxSendQueueSize,
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): T =
T(
connection: conn,
currentId: if conn.dir == Out: 1 else: 2,
maxChannCount: maxChannCount,
windowSize: windowSize,
maxSendQueueSize: maxSendQueueSize
maxSendQueueSize: maxSendQueueSize,
inTimeout: inTimeout,
outTimeout: outTimeout
)
57 changes: 53 additions & 4 deletions tests/testyamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ suite "Yamux":
teardown:
checkTrackers()

template mSetup(ws: int = YamuxDefaultWindowSize) {.inject.} =
template mSetup(ws: int = YamuxDefaultWindowSize,
inTo: Duration = 5.minutes,
outTo: Duration = 5.minutes) {.inject.} =
#TODO in a template to avoid threadvar
let
(conna {.inject.}, connb {.inject.}) = bridgedConnections()
yamuxa {.inject.} = Yamux.new(conna, windowSize = ws)
yamuxb {.inject.} = Yamux.new(connb, windowSize = ws)
yamuxa {.inject.} = Yamux.new(conna, windowSize = ws, inTimeout = inTo, outTimeout = outTo)
yamuxb {.inject.} = Yamux.new(connb, windowSize = ws, inTimeout = inTo, outTimeout = outTo)
(handlera, handlerb) = (yamuxa.handle(), yamuxb.handle())

defer:
Expand Down Expand Up @@ -237,6 +239,53 @@ suite "Yamux":
await wait(thirdWriter, 1.seconds)
await streamA.close()

suite "Timeout testing":
asyncTest "Check if InTimeout close both streams correctly":
mSetup(inTo = 1.seconds)
let blocker = newFuture[void]()
let connBlocker = newFuture[void]()

yamuxb.streamHandler = proc(conn: Connection) {.async.} =
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()

let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
check (await streamA.readLp(100)) == fromHex("5678")
# wait for the timeout to happens, the sleep duration is set to 4 seconds
# as the timeout could be a bit long to trigger
await sleepAsync(4.seconds)
blocker.complete()
check streamA.isClosed
await connBlocker

asyncTest "Check if OutTimeout close both streams correctly":
mSetup(outTo = 1.seconds)
let blocker = newFuture[void]()
let connBlocker = newFuture[void]()

yamuxb.streamHandler = proc(conn: Connection) {.async.} =
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()

let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
check (await streamA.readLp(100)) == fromHex("5678")
# wait for the timeout to happens, the sleep duration is set to 4 seconds
# as the timeout could be a bit long to trigger
await sleepAsync(4.seconds)
blocker.complete()
check streamA.isClosed
await connBlocker

suite "Exception testing":
asyncTest "Local & Remote close":
mSetup()
Expand Down Expand Up @@ -267,7 +316,7 @@ suite "Yamux":

let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]

await yamuxa.close()
expect LPStreamClosedError: await streamA.writeLp(fromHex("1234"))
expect LPStreamClosedError: discard await streamA.readLp(100)
Expand Down

0 comments on commit 349496e

Please sign in to comment.