Skip to content

Commit

Permalink
fix yamux / relay bug
Browse files Browse the repository at this point in the history
  • Loading branch information
lchenut committed Nov 20, 2023
1 parent 5b6fb1a commit 324d044
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 98 deletions.
2 changes: 2 additions & 0 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ proc remoteClosed(channel: YamuxChannel) {.async.} =
method closeImpl*(channel: YamuxChannel) {.async, gcsafe.} =
if not channel.closedLocally:
channel.closedLocally = true
channel.isEof = true

if channel.isReset == false and channel.sendQueue.len == 0:
await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
Expand Down Expand Up @@ -249,6 +250,7 @@ method readOnce*(
await channel.closedRemotely or channel.receivedData.wait()
if channel.closedRemotely.done() and channel.recvQueue.len == 0:
channel.returnedEof = true
channel.isEof = true
return 0

let toRead = min(channel.recvQueue.len, nbytes)
Expand Down
197 changes: 99 additions & 98 deletions tests/testrelayv2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,99 +37,99 @@ proc createSwitch(r: Relay, useYamux: bool = false): Switch =

suite "Circuit Relay V2":

# suite "Reservation":
# asyncTeardown:
# await allFutures(src1.stop(), src2.stop(), rel.stop())
# checkTrackers()
# var
# ttl {.threadvar.}: int
# ldur {.threadvar.}: uint32
# ldata {.threadvar.}: uint64
# cl1 {.threadvar.}: RelayClient
# cl2 {.threadvar.}: RelayClient
# rv2 {.threadvar.}: Relay
# src1 {.threadvar.}: Switch
# src2 {.threadvar.}: Switch
# rel {.threadvar.}: Switch
# rsvp {.threadvar.}: Rsvp
# range {.threadvar.}: HSlice[times.DateTime, times.DateTime]
#
# asyncSetup:
# ttl = 3
# ldur = 60
# ldata = 2048
# cl1 = RelayClient.new()
# cl2 = RelayClient.new()
# rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
# limitDuration=ldur,
# limitData=ldata,
# maxCircuit=1)
# src1 = createSwitch(cl1)
# src2 = createSwitch(cl2)
# rel = createSwitch(rv2)
#
# await src1.start()
# await src2.start()
# await rel.start()
# await src1.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
# await src2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
# rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
# range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds
# check:
# rsvp.expire.int64.fromUnix.utc in range
# rsvp.limitDuration == ldur
# rsvp.limitData == ldata
#
# asyncTest "Too many reservations":
# let conn = await cl2.switch.dial(rel.peerInfo.peerId, rel.peerInfo.addrs, RelayV2HopCodec)
# let pb = encode(HopMessage(msgType: HopMessageType.Reserve))
# await conn.writeLp(pb.buffer)
# let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get()
# check:
# msg.msgType == HopMessageType.Status
# msg.status == Opt.some(StatusV2.ReservationRefused)
#
# asyncTest "Too many reservations + Reconnect":
# expect(ReservationError):
# discard await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
# await rel.disconnect(src1.peerInfo.peerId)
# rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
# range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds
# check:
# rsvp.expire.int64.fromUnix.utc in range
# rsvp.limitDuration == ldur
# rsvp.limitData == ldata
#
# asyncTest "Reservation ttl expires":
# await sleepAsync(chronos.timer.seconds(ttl + 1))
# rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
# range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds
# check:
# rsvp.expire.int64.fromUnix.utc in range
# rsvp.limitDuration == ldur
# rsvp.limitData == ldata
#
# asyncTest "Reservation over relay":
# let
# rv2add = Relay.new()
# addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
# $rel.peerInfo.peerId & "/p2p-circuit").get() ]
# rv2add.setup(src2)
# await rv2add.start()
# src2.mount(rv2add)
# rv2.maxCircuit.inc()
#
# rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
# range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds
# check:
# rsvp.expire.int64.fromUnix.utc in range
# rsvp.limitDuration == ldur
# rsvp.limitData == ldata
# expect(ReservationError):
# discard await cl1.reserve(src2.peerInfo.peerId, addrs)

suite "Connection":
for useYamux in [false, true]:
suite "Reservation":
asyncTeardown:
await allFutures(src1.stop(), src2.stop(), rel.stop())
checkTrackers()
var
ttl {.threadvar.}: int
ldur {.threadvar.}: uint32
ldata {.threadvar.}: uint64
cl1 {.threadvar.}: RelayClient
cl2 {.threadvar.}: RelayClient
rv2 {.threadvar.}: Relay
src1 {.threadvar.}: Switch
src2 {.threadvar.}: Switch
rel {.threadvar.}: Switch
rsvp {.threadvar.}: Rsvp
range {.threadvar.}: HSlice[times.DateTime, times.DateTime]

asyncSetup:
ttl = 3
ldur = 60
ldata = 2048
cl1 = RelayClient.new()
cl2 = RelayClient.new()
rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
limitDuration=ldur,
limitData=ldata,
maxCircuit=1)
src1 = createSwitch(cl1)
src2 = createSwitch(cl2)
rel = createSwitch(rv2)

await src1.start()
await src2.start()
await rel.start()
await src1.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await src2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds
check:
rsvp.expire.int64.fromUnix.utc in range
rsvp.limitDuration == ldur
rsvp.limitData == ldata

asyncTest "Too many reservations":
let conn = await cl2.switch.dial(rel.peerInfo.peerId, rel.peerInfo.addrs, RelayV2HopCodec)
let pb = encode(HopMessage(msgType: HopMessageType.Reserve))
await conn.writeLp(pb.buffer)
let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get()
check:
msg.msgType == HopMessageType.Status
msg.status == Opt.some(StatusV2.ReservationRefused)

asyncTest "Too many reservations + Reconnect":
expect(ReservationError):
discard await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
await rel.disconnect(src1.peerInfo.peerId)
rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds
check:
rsvp.expire.int64.fromUnix.utc in range
rsvp.limitDuration == ldur
rsvp.limitData == ldata

asyncTest "Reservation ttl expires":
await sleepAsync(chronos.timer.seconds(ttl + 1))
rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds
check:
rsvp.expire.int64.fromUnix.utc in range
rsvp.limitDuration == ldur
rsvp.limitData == ldata

asyncTest "Reservation over relay":
let
rv2add = Relay.new()
addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
$rel.peerInfo.peerId & "/p2p-circuit").get() ]
rv2add.setup(src2)
await rv2add.start()
src2.mount(rv2add)
rv2.maxCircuit.inc()

rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds
check:
rsvp.expire.int64.fromUnix.utc in range
rsvp.limitDuration == ldur
rsvp.limitData == ldata
expect(ReservationError):
discard await cl1.reserve(src2.peerInfo.peerId, addrs)

for (useYamux, muxName) in [(false, "Mplex"), (true, "Yamux")]:
suite "Circuit Relay V2 Connection using " & muxName:
asyncTeardown:
checkTrackers()
var
Expand Down Expand Up @@ -329,7 +329,7 @@ suite "Circuit Relay V2":
raise newException(CatchableError, "Should not be here")
let
rel2Cl = RelayClient.new(canHop = true)
rel2 = createSwitch(rel2Cl)
rel2 = createSwitch(rel2Cl, useYamux)
rv2 = Relay.new()
rv2.setup(rel)
rel.mount(rv2)
Expand All @@ -354,7 +354,8 @@ suite "Circuit Relay V2":

expect(DialFailedError):
conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec)
await allFutures(conn.close())
if not conn.isNil():
await allFutures(conn.close())
await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop())

asyncTest "Connection using ClientRelay":
Expand Down Expand Up @@ -388,9 +389,9 @@ suite "Circuit Relay V2":
clientA = RelayClient.new(canHop = true)
clientB = RelayClient.new(canHop = true)
clientC = RelayClient.new(canHop = true)
switchA = createSwitch(clientA)
switchB = createSwitch(clientB)
switchC = createSwitch(clientC)
switchA = createSwitch(clientA, useYamux)
switchB = createSwitch(clientB, useYamux)
switchC = createSwitch(clientC, useYamux)

switchA.mount(protoBCA)
switchB.mount(protoCAB)
Expand Down

0 comments on commit 324d044

Please sign in to comment.