Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Yamux timeout #1029

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions libp2p/builders.nim
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,12 @@ proc withMplex*(
b.muxers.add(MuxerProvider.new(newMuxer, MplexCodec))
b

proc withYamux*(b: SwitchBuilder, windowSize: int = YamuxDefaultWindowSize): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn, windowSize)
proc withYamux*(b: SwitchBuilder,
windowSize: int = YamuxDefaultWindowSize,
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer =
Yamux.new(conn, windowSize, inTimeout = inTimeout, outTimeout = outTimeout)

assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times"
b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec))
Expand Down
42 changes: 27 additions & 15 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ type
maxChannCount: int
windowSize: int
maxSendQueueSize: int
inTimeout: Duration
outTimeout: Duration

proc lenBySrc(m: Yamux, isSrc: bool): int =
for v in m.channels.values():
Expand All @@ -416,7 +418,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
# that the initial recvWindow is 256k.
# To solve this contradiction, no updateWindow will be sent until recvWindow is less
# than maxRecvWindow
result = YamuxChannel(
var stream = YamuxChannel(
id: id,
maxRecvWindow: recvWindow,
recvWindow: if recvWindow > YamuxDefaultWindowSize: recvWindow else: YamuxDefaultWindowSize,
Expand All @@ -427,22 +429,28 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
receivedData: newAsyncEvent(),
closedRemotely: newFuture[void]()
)
result.objName = "YamuxStream"
result.dir = if isSrc: Direction.Out else: Direction.In
result.timeoutHandler = proc(): Future[void] {.gcsafe.} =
stream.objName = "YamuxStream"
if isSrc:
stream.dir = Direction.Out
stream.timeout = m.outTimeout
else:
stream.dir = Direction.In
stream.timeout = m.inTimeout
stream.timeoutHandler = proc(): Future[void] {.gcsafe.} =
trace "Idle timeout expired, resetting YamuxChannel"
result.reset()
result.initStream()
result.peerId = m.connection.peerId
result.observedAddr = m.connection.observedAddr
result.transportDir = m.connection.transportDir
stream.reset(true)
stream.initStream()
stream.peerId = m.connection.peerId
stream.observedAddr = m.connection.observedAddr
stream.transportDir = m.connection.transportDir
when defined(libp2p_agents_metrics):
result.shortAgent = m.connection.shortAgent
m.channels[id] = result
asyncSpawn m.cleanupChannel(result)
stream.shortAgent = m.connection.shortAgent
m.channels[id] = stream
asyncSpawn m.cleanupChannel(stream)
trace "created channel", id, pid=m.connection.peerId
when defined(libp2p_yamux_metrics):
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId])
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $stream.peerId])
return stream

method close*(m: Yamux) {.async.} =
if m.isClosed == true:
Expand Down Expand Up @@ -567,11 +575,15 @@ method newStream*(
proc new*(T: type[Yamux], conn: Connection,
maxChannCount: int = MaxChannelCount,
windowSize: int = YamuxDefaultWindowSize,
maxSendQueueSize: int = MaxSendQueueSize): T =
maxSendQueueSize: int = MaxSendQueueSize,
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): T =
T(
connection: conn,
currentId: if conn.dir == Out: 1 else: 2,
maxChannCount: maxChannCount,
windowSize: windowSize,
maxSendQueueSize: maxSendQueueSize
maxSendQueueSize: maxSendQueueSize,
inTimeout: inTimeout,
outTimeout: outTimeout
)
57 changes: 53 additions & 4 deletions tests/testyamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ suite "Yamux":
teardown:
checkTrackers()

template mSetup(ws: int = YamuxDefaultWindowSize) {.inject.} =
template mSetup(ws: int = YamuxDefaultWindowSize,
inTo: Duration = 5.minutes,
outTo: Duration = 5.minutes) {.inject.} =
#TODO in a template to avoid threadvar
let
(conna {.inject.}, connb {.inject.}) = bridgedConnections()
yamuxa {.inject.} = Yamux.new(conna, windowSize = ws)
yamuxb {.inject.} = Yamux.new(connb, windowSize = ws)
yamuxa {.inject.} = Yamux.new(conna, windowSize = ws, inTimeout = inTo, outTimeout = outTo)
yamuxb {.inject.} = Yamux.new(connb, windowSize = ws, inTimeout = inTo, outTimeout = outTo)
(handlera, handlerb) = (yamuxa.handle(), yamuxb.handle())

defer:
Expand Down Expand Up @@ -237,6 +239,53 @@ suite "Yamux":
await wait(thirdWriter, 1.seconds)
await streamA.close()

suite "Timeout testing":
asyncTest "Check if InTimeout close both streams correctly":
mSetup(inTo = 1.seconds)
let blocker = newFuture[void]()
let connBlocker = newFuture[void]()

yamuxb.streamHandler = proc(conn: Connection) {.async.} =
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()

let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
check (await streamA.readLp(100)) == fromHex("5678")
# wait for the timeout to happens, the sleep duration is set to 4 seconds
# as the timeout could be a bit long to trigger
await sleepAsync(4.seconds)
blocker.complete()
check streamA.isClosed
await connBlocker

asyncTest "Check if OutTimeout close both streams correctly":
mSetup(outTo = 1.seconds)
let blocker = newFuture[void]()
let connBlocker = newFuture[void]()

yamuxb.streamHandler = proc(conn: Connection) {.async.} =
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()

let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
check (await streamA.readLp(100)) == fromHex("5678")
# wait for the timeout to happens, the sleep duration is set to 4 seconds
# as the timeout could be a bit long to trigger
await sleepAsync(4.seconds)
blocker.complete()
check streamA.isClosed
await connBlocker

suite "Exception testing":
asyncTest "Local & Remote close":
mSetup()
Expand Down Expand Up @@ -267,7 +316,7 @@ suite "Yamux":

let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]

await yamuxa.close()
expect LPStreamClosedError: await streamA.writeLp(fromHex("1234"))
expect LPStreamClosedError: discard await streamA.readLp(100)
Expand Down
Loading