diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 9fd1a06939..12d2d8342e 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -122,8 +122,12 @@ proc withMplex*( b.muxers.add(MuxerProvider.new(newMuxer, MplexCodec)) b -proc withYamux*(b: SwitchBuilder, windowSize: int = YamuxDefaultWindowSize): SwitchBuilder = - proc newMuxer(conn: Connection): Muxer = Yamux.new(conn, windowSize) +proc withYamux*(b: SwitchBuilder, + windowSize: int = YamuxDefaultWindowSize, + inTimeout: Duration = 5.minutes, + outTimeout: Duration = 5.minutes): SwitchBuilder = + proc newMuxer(conn: Connection): Muxer = + Yamux.new(conn, windowSize, inTimeout = inTimeout, outTimeout = outTimeout) assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times" b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec)) diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index 4a7b39a7e7..2f91251a16 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -396,6 +396,8 @@ type maxChannCount: int windowSize: int maxSendQueueSize: int + inTimeout: Duration + outTimeout: Duration proc lenBySrc(m: Yamux, isSrc: bool): int = for v in m.channels.values(): @@ -416,7 +418,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool, # that the initial recvWindow is 256k. # To solve this contradiction, no updateWindow will be sent until recvWindow is less # than maxRecvWindow - result = YamuxChannel( + var stream = YamuxChannel( id: id, maxRecvWindow: recvWindow, recvWindow: if recvWindow > YamuxDefaultWindowSize: recvWindow else: YamuxDefaultWindowSize, @@ -427,22 +429,28 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool, receivedData: newAsyncEvent(), closedRemotely: newFuture[void]() ) - result.objName = "YamuxStream" - result.dir = if isSrc: Direction.Out else: Direction.In - result.timeoutHandler = proc(): Future[void] {.gcsafe.} = + stream.objName = "YamuxStream" + if isSrc: + stream.dir = Direction.Out + stream.timeout = m.outTimeout + else: + stream.dir = Direction.In + stream.timeout = m.inTimeout + stream.timeoutHandler = proc(): Future[void] {.gcsafe.} = trace "Idle timeout expired, resetting YamuxChannel" - result.reset() - result.initStream() - result.peerId = m.connection.peerId - result.observedAddr = m.connection.observedAddr - result.transportDir = m.connection.transportDir + stream.reset(true) + stream.initStream() + stream.peerId = m.connection.peerId + stream.observedAddr = m.connection.observedAddr + stream.transportDir = m.connection.transportDir when defined(libp2p_agents_metrics): - result.shortAgent = m.connection.shortAgent - m.channels[id] = result - asyncSpawn m.cleanupChannel(result) + stream.shortAgent = m.connection.shortAgent + m.channels[id] = stream + asyncSpawn m.cleanupChannel(stream) trace "created channel", id, pid=m.connection.peerId when defined(libp2p_yamux_metrics): - libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId]) + libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $stream.peerId]) + return stream method close*(m: Yamux) {.async.} = if m.isClosed == true: @@ -567,11 +575,15 @@ method newStream*( proc new*(T: type[Yamux], conn: Connection, maxChannCount: int = MaxChannelCount, windowSize: int = YamuxDefaultWindowSize, - maxSendQueueSize: int = MaxSendQueueSize): T = + maxSendQueueSize: int = MaxSendQueueSize, + inTimeout: Duration = 5.minutes, + outTimeout: Duration = 5.minutes): T = T( connection: conn, currentId: if conn.dir == Out: 1 else: 2, maxChannCount: maxChannCount, windowSize: windowSize, - maxSendQueueSize: maxSendQueueSize + maxSendQueueSize: maxSendQueueSize, + inTimeout: inTimeout, + outTimeout: outTimeout ) diff --git a/tests/testyamux.nim b/tests/testyamux.nim index 06cc19b0d0..bb82ae07f0 100644 --- a/tests/testyamux.nim +++ b/tests/testyamux.nim @@ -22,12 +22,14 @@ suite "Yamux": teardown: checkTrackers() - template mSetup(ws: int = YamuxDefaultWindowSize) {.inject.} = + template mSetup(ws: int = YamuxDefaultWindowSize, + inTo: Duration = 5.minutes, + outTo: Duration = 5.minutes) {.inject.} = #TODO in a template to avoid threadvar let (conna {.inject.}, connb {.inject.}) = bridgedConnections() - yamuxa {.inject.} = Yamux.new(conna, windowSize = ws) - yamuxb {.inject.} = Yamux.new(connb, windowSize = ws) + yamuxa {.inject.} = Yamux.new(conna, windowSize = ws, inTimeout = inTo, outTimeout = outTo) + yamuxb {.inject.} = Yamux.new(connb, windowSize = ws, inTimeout = inTo, outTimeout = outTo) (handlera, handlerb) = (yamuxa.handle(), yamuxb.handle()) defer: @@ -237,6 +239,53 @@ suite "Yamux": await wait(thirdWriter, 1.seconds) await streamA.close() + suite "Timeout testing": + asyncTest "Check if InTimeout close both streams correctly": + mSetup(inTo = 1.seconds) + let blocker = newFuture[void]() + let connBlocker = newFuture[void]() + + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + check (await conn.readLp(100)) == fromHex("1234") + await conn.writeLp(fromHex("5678")) + await blocker + check conn.isClosed + connBlocker.complete() + + let streamA = await yamuxa.newStream() + check streamA == yamuxa.getStreams()[0] + await streamA.writeLp(fromHex("1234")) + check (await streamA.readLp(100)) == fromHex("5678") + # wait for the timeout to happens, the sleep duration is set to 4 seconds + # as the timeout could be a bit long to trigger + await sleepAsync(4.seconds) + blocker.complete() + check streamA.isClosed + await connBlocker + + asyncTest "Check if OutTimeout close both streams correctly": + mSetup(outTo = 1.seconds) + let blocker = newFuture[void]() + let connBlocker = newFuture[void]() + + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + check (await conn.readLp(100)) == fromHex("1234") + await conn.writeLp(fromHex("5678")) + await blocker + check conn.isClosed + connBlocker.complete() + + let streamA = await yamuxa.newStream() + check streamA == yamuxa.getStreams()[0] + await streamA.writeLp(fromHex("1234")) + check (await streamA.readLp(100)) == fromHex("5678") + # wait for the timeout to happens, the sleep duration is set to 4 seconds + # as the timeout could be a bit long to trigger + await sleepAsync(4.seconds) + blocker.complete() + check streamA.isClosed + await connBlocker + suite "Exception testing": asyncTest "Local & Remote close": mSetup() @@ -267,7 +316,7 @@ suite "Yamux": let streamA = await yamuxa.newStream() check streamA == yamuxa.getStreams()[0] - + await yamuxa.close() expect LPStreamClosedError: await streamA.writeLp(fromHex("1234")) expect LPStreamClosedError: discard await streamA.readLp(100)