Skip to content

Commit

Permalink
fix: handle more circuit relay refresh failures
Browse files Browse the repository at this point in the history
Handles some more instances where we don't remove old reservations
when refreshing them fails.
  • Loading branch information
achingbrain committed Oct 11, 2024
1 parent 1cbfd6c commit 9d7ec98
Show file tree
Hide file tree
Showing 14 changed files with 313 additions and 147 deletions.
34 changes: 1 addition & 33 deletions packages/integration-tests/test/circuit-relay.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -540,41 +540,9 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(relay1).encapsulate(`/p2p-circuit/p2p/${remote.peerId.toString()}`)

await expect(local.dial(ma)).to.eventually.be.rejected
.with.property('name', 'DialError')
.with.property('name', 'NoValidAddressesError')
})
/*
it('should fail to open connection over relayed connection', async () => {
// relay1 dials relay2
await relay1.dial(relay2.getMultiaddrs()[0])
await usingAsRelay(relay1, relay2)
// remote dials relay2
await remote.dial(relay2.getMultiaddrs()[0])
await usingAsRelay(remote, relay2)
// local dials relay1 via relay2
const ma = getRelayAddress(relay1)

// open hop stream and try to connect to remote
const stream = await local.dialProtocol(ma, RELAY_V2_HOP_CODEC, {
runOnLimitedConnection: true
})
const hopStream = pbStream(stream).pb(HopMessage)
await hopStream.write({
type: HopMessage.Type.CONNECT,
peer: {
id: remote.peerId.toMultihash().bytes,
addrs: []
}
})
const response = await hopStream.read()
expect(response).to.have.property('type', HopMessage.Type.STATUS)
expect(response).to.have.property('status', Status.PERMISSION_DENIED)
})
*/
it('should emit connection:close when relay stops', async () => {
// discover relay and make reservation
await remote.dial(relay1.getMultiaddrs()[0])
Expand Down
5 changes: 3 additions & 2 deletions packages/transport-circuit-relay-v2/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,27 @@
"doc-check": "aegir doc-check"
},
"dependencies": {
"@libp2p/crypto": "^5.0.5",
"@libp2p/interface": "^2.1.3",
"@libp2p/interface-internal": "^2.0.8",
"@libp2p/peer-collections": "^6.0.8",
"@libp2p/peer-id": "^5.0.5",
"@libp2p/peer-record": "^8.0.8",
"@libp2p/utils": "^6.1.1",
"@multiformats/mafmt": "^12.1.6",
"@multiformats/multiaddr": "^12.2.3",
"@multiformats/multiaddr-matcher": "^1.3.0",
"any-signal": "^4.1.1",
"it-protobuf-stream": "^1.1.3",
"it-stream-types": "^2.0.1",
"multiformats": "^13.1.0",
"progress-events": "^1.0.0",
"protons-runtime": "^5.4.0",
"race-signal": "^1.0.2",
"retimeable-signal": "^0.0.0",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.1.0"
},
"devDependencies": {
"@libp2p/crypto": "^5.0.5",
"@libp2p/interface-compliance-tests": "^6.1.6",
"@libp2p/logger": "^5.1.1",
"aegir": "^44.0.1",
Expand Down
5 changes: 0 additions & 5 deletions packages/transport-circuit-relay-v2/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ export const CIRCUIT_PROTO_CODE = 290
*/
export const DEFAULT_MAX_RESERVATION_STORE_SIZE = 15

/**
* How often to check for reservation expiry
*/
export const DEFAULT_MAX_RESERVATION_CLEAR_INTERVAL = 300 * second

/**
* How often to check for reservation expiry
*/
Expand Down
20 changes: 19 additions & 1 deletion packages/transport-circuit-relay-v2/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,31 @@ import type { Limit } from './pb/index.js'
import type { TypedEventEmitter } from '@libp2p/interface'
import type { PeerMap } from '@libp2p/peer-collections'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { RetimeableAbortSignal } from 'retimeable-signal'

export type { Limit }

export interface RelayReservation {
expire: Date
/**
* When this reservation expires
*/
expiry: Date

/**
* The address of the relay client
*/
addr: Multiaddr

/**
* How much data can be transferred over each relayed connection and for how
* long before the underlying stream is reset
*/
limit?: Limit

/**
* This signal will fire it's "abort" event when the reservation expires
*/
signal: RetimeableAbortSignal
}

export interface CircuitRelayServiceEvents {
Expand Down
21 changes: 20 additions & 1 deletion packages/transport-circuit-relay-v2/src/pb/index.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ message Peer {
message Reservation {
uint64 expire = 1; // Unix expiration time (UTC)
repeated bytes addrs = 2; // relay addrs for reserving peer
optional bytes voucher = 3; // reservation voucher
optional Envelope voucher = 3; // reservation voucher
}

message Limit {
Expand All @@ -65,3 +65,22 @@ message ReservationVoucher {
bytes peer = 2;
uint64 expiration = 3;
}

// https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md
message Envelope {
// public_key is the public key of the keypair the enclosed payload was
// signed with.
bytes public_key = 1;

// payload_type encodes the type of payload, so that it can be deserialized
// deterministically.
bytes payload_type = 2;

// payload is the actual payload carried inside this envelope.
ReservationVoucher payload = 3;

// signature is the signature produced by the private key corresponding to
// the enclosed public key, over the payload, prefixing a domain string for
// additional security.
bytes signature = 5;
}
102 changes: 99 additions & 3 deletions packages/transport-circuit-relay-v2/src/pb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ export namespace Peer {
export interface Reservation {
expire: bigint
addrs: Uint8Array[]
voucher?: Uint8Array
voucher?: Envelope
}

export namespace Reservation {
Expand All @@ -345,7 +345,7 @@ export namespace Reservation {

if (obj.voucher != null) {
w.uint32(26)
w.bytes(obj.voucher)
Envelope.codec().encode(obj.voucher, w)
}

if (opts.lengthDelimited !== false) {
Expand Down Expand Up @@ -376,7 +376,9 @@ export namespace Reservation {
break
}
case 3: {
obj.voucher = reader.bytes()
obj.voucher = Envelope.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.voucher
})
break
}
default: {
Expand Down Expand Up @@ -580,3 +582,97 @@ export namespace ReservationVoucher {
return decodeMessage(buf, ReservationVoucher.codec(), opts)
}
}

export interface Envelope {
publicKey: Uint8Array
payloadType: Uint8Array
payload?: ReservationVoucher
signature: Uint8Array
}

export namespace Envelope {
let _codec: Codec<Envelope>

export const codec = (): Codec<Envelope> => {
if (_codec == null) {
_codec = message<Envelope>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}

if ((obj.publicKey != null && obj.publicKey.byteLength > 0)) {
w.uint32(10)
w.bytes(obj.publicKey)
}

if ((obj.payloadType != null && obj.payloadType.byteLength > 0)) {
w.uint32(18)
w.bytes(obj.payloadType)
}

if (obj.payload != null) {
w.uint32(26)
ReservationVoucher.codec().encode(obj.payload, w)
}

if ((obj.signature != null && obj.signature.byteLength > 0)) {
w.uint32(42)
w.bytes(obj.signature)
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
publicKey: uint8ArrayAlloc(0),
payloadType: uint8ArrayAlloc(0),
signature: uint8ArrayAlloc(0)
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1: {
obj.publicKey = reader.bytes()
break
}
case 2: {
obj.payloadType = reader.bytes()
break
}
case 3: {
obj.payload = ReservationVoucher.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.payload
})
break
}
case 5: {
obj.signature = reader.bytes()
break
}
default: {
reader.skipType(tag & 7)
break
}

Check warning on line 660 in packages/transport-circuit-relay-v2/src/pb/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/pb/index.ts#L658-L660

Added lines #L658 - L660 were not covered by tests
}
}

return obj
})
}

return _codec
}

export const encode = (obj: Partial<Envelope>): Uint8Array => {
return encodeMessage(obj, Envelope.codec())
}

Check warning on line 673 in packages/transport-circuit-relay-v2/src/pb/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/pb/index.ts#L672-L673

Added lines #L672 - L673 were not covered by tests

export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<Envelope>): Envelope => {
return decodeMessage(buf, Envelope.codec(), opts)
}

Check warning on line 677 in packages/transport-circuit-relay-v2/src/pb/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/pb/index.ts#L676-L677

Added lines #L676 - L677 were not covered by tests
}
32 changes: 21 additions & 11 deletions packages/transport-circuit-relay-v2/src/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { publicKeyToProtobuf } from '@libp2p/crypto/keys'
import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
import { peerIdFromMultihash } from '@libp2p/peer-id'
import { RecordEnvelope } from '@libp2p/peer-record'
Expand Down Expand Up @@ -156,16 +157,14 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
runOnLimitedConnection: true
})

this.reservationStore.start()

this.started = true
}

/**
* Stop Relay service
*/
async stop (): Promise<void> {
this.reservationStore.stop()
this.reservationStore.clear()
this.shutdownController.abort()
await this.registrar.unhandle(RELAY_V2_HOP_CODEC)

Expand Down Expand Up @@ -290,16 +289,25 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
addrs.push(relayAddr.bytes)
}

const voucher = await RecordEnvelope.seal(new ReservationVoucherRecord({
const envelope = await RecordEnvelope.seal(new ReservationVoucherRecord({
peer: remotePeer,
relay: this.peerId,
expiration: Number(expire)
expiration: expire
}), this.privateKey)

return {
addrs,
expire,
voucher: voucher.marshal()
voucher: {
publicKey: publicKeyToProtobuf(envelope.publicKey),
payloadType: envelope.payloadType,
payload: {
peer: remotePeer.toMultihash().bytes,
relay: this.peerId.toMultihash().bytes,
expiration: expire
},
signature: envelope.signature
}
}
}

Expand Down Expand Up @@ -330,7 +338,9 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
return
}

if (!this.reservationStore.hasReservation(dstPeer)) {
const reservation = this.reservationStore.get(dstPeer)

if (reservation == null) {
this.log.error('hop connect denied for destination peer %p not having a reservation for %p with status %s', dstPeer, connection.remotePeer, Status.NO_RESERVATION)
await hopstr.write({ type: HopMessage.Type.STATUS, status: Status.NO_RESERVATION }, options)
return
Expand All @@ -350,7 +360,6 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
return
}

const limit = this.reservationStore.get(dstPeer)?.limit
const destinationConnection = connections[0]

const destinationStream = await this.stopHop({
Expand All @@ -361,7 +370,7 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
id: connection.remotePeer.toMultihash().bytes,
addrs: []
},
limit
limit: reservation?.limit
}
}, options)

Expand All @@ -374,13 +383,14 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
await hopstr.write({
type: HopMessage.Type.STATUS,
status: Status.OK,
limit
limit: reservation?.limit
}, options)
const sourceStream = stream.unwrap()

this.log('connection from %p to %p established - merging streams', connection.remotePeer, dstPeer)

// Short circuit the two streams to create the relayed connection
createLimitedRelay(sourceStream, destinationStream, this.shutdownController.signal, limit, {
createLimitedRelay(sourceStream, destinationStream, this.shutdownController.signal, reservation, {
log: this.log
})
}
Expand Down
Loading

0 comments on commit 9d7ec98

Please sign in to comment.