Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement circuit v2 #1533

Merged
merged 60 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
084042c
Squashed commit of the following:
ckousik Jan 5, 2023
d4e6436
remove extra test command
ckousik Jan 5, 2023
7961199
fix linter
ckousik Jan 5, 2023
290dc0a
remove extraneous files
ckousik Jan 5, 2023
16fd3f6
fixes
ckousik Jan 24, 2023
f02275d
address review comments
ckousik Jan 24, 2023
7c6df6d
fix
ckousik Jan 24, 2023
b9d5168
more fixes
ckousik Jan 24, 2023
12c4775
start/stop reservation store
ckousik Jan 24, 2023
09a48cf
flakey test
ckousik Jan 24, 2023
0ceaa6e
await protocol handling
ckousik Jan 24, 2023
4b307ab
fix linting
ckousik Jan 24, 2023
fcc0f54
testing changes
ckousik Jan 24, 2023
b8499de
more console logs
ckousik Jan 24, 2023
036569b
more console logs
ckousik Jan 24, 2023
89fc003
debug
ckousik Jan 24, 2023
9eb58e5
remove sink
ckousik Jan 24, 2023
231eb1f
temporarily skip flaky test
ckousik Jan 25, 2023
2dd3b4a
remove duplicated interfaces
ckousik Jan 25, 2023
3f547d7
remove unused fields
ckousik Jan 25, 2023
9a35bab
fix tests expecting hop to be enabled
ckousik Jan 25, 2023
1601189
remove circuit v1 support
ckousik Jan 25, 2023
fedf97e
cleanup
ckousik Jan 27, 2023
d3226ec
set type as optional in proto
ckousik Jan 30, 2023
c9bc1d3
address review from Alex
ckousik Feb 1, 2023
545fe51
remove streamhandler
ckousik Feb 1, 2023
3afc8b9
fix lint
ckousik Feb 1, 2023
7901093
fix onError callback
ckousik Feb 1, 2023
996eed2
deny reservation over relayed connection
ckousik Feb 3, 2023
dfcb610
disable relay for tests
ckousik Feb 3, 2023
881d1e9
fix start
ckousik Feb 3, 2023
530f428
revert interop
ckousik Feb 3, 2023
5e1d3e0
remove all circuitv1 code
ckousik Feb 8, 2023
840e3e4
handle peer reconnect in circuit v2
ckousik Feb 8, 2023
ccc2894
handle peer reconnect
ckousik Feb 8, 2023
9a3c092
remove circuitv1 protobuf
ckousik Feb 8, 2023
e590eed
updated interop tests
ckousik Jan 31, 2023
c28b278
remove relay
ckousik Feb 1, 2023
0cff5a0
fix
ckousik Feb 3, 2023
b2092f3
point to remote interop tests (should fail)
ckousik Feb 17, 2023
5b00e5f
chore: update versions, fix up tests and linting
achingbrain Feb 22, 2023
ca59818
Merge remote-tracking branch 'origin/master' into ckousik/feat/circui…
achingbrain Feb 22, 2023
54ac1c3
chore: remove casting, update interop suite to be more reliable
achingbrain Feb 23, 2023
37a5a0e
temporary
ckousik Feb 23, 2023
a3e527a
transfer limits
ckousik Feb 23, 2023
69a7e68
remove commented code
ckousik Feb 24, 2023
de764b7
chore: fix interop
achingbrain Feb 24, 2023
48094f1
Merge remote-tracking branch 'origin/master' into ckousik/feat/circui…
achingbrain Feb 28, 2023
7e2b848
remove pipe from utils
ckousik Mar 1, 2023
ed86c65
spawn microtasks to handle circuit connections
ckousik Mar 1, 2023
ee77185
reset stream on exceeding relay limits
ckousik Mar 1, 2023
edad075
replace reset with abort
ckousik Mar 1, 2023
a5dc1ab
abort takes error param
ckousik Mar 1, 2023
8885cad
fix dep-check
ckousik Mar 1, 2023
15696d8
Fix abortable
ckousik Mar 1, 2023
e9cbdd9
chore: remove unecessary types as we can now unwrap the original Stre…
achingbrain Mar 2, 2023
72b695a
fix tests
ckousik Mar 2, 2023
5d7afa3
add test for tagging peer
ckousik Mar 2, 2023
38eeb31
apply suggestion
ckousik Mar 2, 2023
70e1b49
fix: allow configuring limits, actually apply limits, update docs, re…
achingbrain Mar 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/circuit/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,11 @@ export class RelayReservationManager extends EventEmitter<RelayReservationManage
}

const timeout = setTimeout(
(peerId: PeerId) => { void refreshReservation(peerId) },
(peerId: PeerId) => {
void refreshReservation(peerId).catch(err => {
log.error('error refreshing reservation for %p', peerId, err)
})
},
Math.max(getExpiration(reservation.expire) - 100, 0),
peerId
)
Expand Down
4 changes: 2 additions & 2 deletions src/circuit/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export class Circuit implements Transport, Startable {
hopTimeoutPromise.reject('timed out')
}, this._init.hop.timeout)
const pbstr = pbStream(stream)
const abortable = { value: pbstr, abort: (err: Error) => stream.abort(err) }

try {
const request: CircuitV2.HopMessage = await Promise.race([
pbstr.pb(CircuitV2.HopMessage).read(),
Expand All @@ -140,7 +140,7 @@ export class Circuit implements Transport, Startable {
await Promise.race([
CircuitV2Handler.handleHopProtocol({
connection,
stream: abortable,
stream: pbstr,
connectionManager: this.components.connectionManager,
relayPeer: this.components.peerId,
relayAddrs: this.components.addressManager.getListenAddrs(),
Expand Down
15 changes: 7 additions & 8 deletions src/circuit/v2/hop.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type { PeerId } from '@libp2p/interface-peer-id'
import { RecordEnvelope } from '@libp2p/peer-record'
import { logger } from '@libp2p/logger'
import type { Connection } from '@libp2p/interface-connection'
import type { Connection, Stream } from '@libp2p/interface-connection'
import { HopMessage, Limit, Reservation, Status, StopMessage } from './pb/index.js'
import type { Multiaddr } from '@multiformats/multiaddr'
import { multiaddr } from '@multiformats/multiaddr'
import type { Acl, ReservationStore, Abortable } from './interfaces.js'
import type { Acl, ReservationStore } from './interfaces.js'
import { RELAY_V2_HOP_CODEC } from '../multicodec.js'
import { stop } from './stop.js'
import { ReservationVoucherRecord } from './reservation-voucher.js'
Expand All @@ -14,7 +14,6 @@ import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { ProtobufStream } from 'it-pb-stream'
import { pbStream } from 'it-pb-stream'
import { CIRCUIT_PROTO_CODE } from '../constants.js'
import type { Uint8ArrayList } from 'uint8arraylist'
import type { PeerStore } from '@libp2p/interface-peer-store'
import { createLimitedRelay } from './util.js'
import type { CodeError } from '@libp2p/interfaces/errors'
Expand All @@ -26,7 +25,7 @@ const RELAYED = 'relayed'
export interface HopProtocolOptions {
connection: Connection
request: HopMessage
stream: Abortable<ProtobufStream<Uint8ArrayList | Uint8Array>>
stream: ProtobufStream<Stream>
relayPeer: PeerId
relayAddrs: Multiaddr[]
limit?: Limit
Expand All @@ -44,7 +43,7 @@ export async function handleHopProtocol (options: HopProtocolOptions): Promise<v
case HopMessage.Type.CONNECT: await handleConnect(options); break
default: {
log.error('invalid hop request type %s via peer %s', options.request.type, options.connection.remotePeer)
stream.value.pb(HopMessage).write({ type: HopMessage.Type.STATUS, status: Status.UNEXPECTED_MESSAGE })
stream.pb(HopMessage).write({ type: HopMessage.Type.STATUS, status: Status.UNEXPECTED_MESSAGE })
}
}
}
Expand Down Expand Up @@ -75,7 +74,7 @@ export async function reserve (connection: Connection): Promise<Reservation> {

const isRelayAddr = (ma: Multiaddr): boolean => ma.protoCodes().includes(CIRCUIT_PROTO_CODE)

async function handleReserve ({ connection, stream: { value: pbstr }, relayPeer, relayAddrs, limit, acl, reservationStore, peerStore }: HopProtocolOptions): Promise<void> {
async function handleReserve ({ connection, stream: pbstr, relayPeer, relayAddrs, limit, acl, reservationStore, peerStore }: HopProtocolOptions): Promise<void> {
const hopstr = pbstr.pb(HopMessage)
log('hop reserve request from %s', connection.remotePeer)

Expand Down Expand Up @@ -126,7 +125,7 @@ async function handleReserve ({ connection, stream: { value: pbstr }, relayPeer,

async function handleConnect (options: HopProtocolOptions): Promise<void> {
const { connection, stream, request, reservationStore, connectionManager, acl } = options
const hopstr = stream.value.pb(HopMessage)
const hopstr = stream.pb(HopMessage)

log('hop connect request from %s', connection.remotePeer)

Expand Down Expand Up @@ -186,7 +185,7 @@ async function handleConnect (options: HopProtocolOptions): Promise<void> {
}

hopstr.write({ type: HopMessage.Type.STATUS, status: Status.OK })
const sourceStream = { ...stream, value: stream.value.unwrap() }
const sourceStream = stream.unwrap()

log('connection to destination established, short circuiting streams...')
const limit = (await reservationStore.get(dstPeer))?.limit
Expand Down
8 changes: 0 additions & 8 deletions src/circuit/v2/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'
import type { Limit, Status } from './pb/index.js'

export type ReservationStatus = Status.OK | Status.PERMISSION_DENIED | Status.RESERVATION_REFUSED
Expand All @@ -28,9 +26,3 @@ export interface Acl {
*/
allowConnect: (src: PeerId, addr: Multiaddr, dst: PeerId) => Promise<AclStatus>
}

export type DuplexStream = Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>
export interface Abortable<T> {
value: T
abort: (err: Error) => void
}
13 changes: 4 additions & 9 deletions src/circuit/v2/stop.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@

import { Status, StopMessage } from './pb/index.js'
import type { Connection } from '@libp2p/interface-connection'
import type { Connection, Stream } from '@libp2p/interface-connection'

import { logger } from '@libp2p/logger'
import { RELAY_V2_STOP_CODEC } from '../multicodec.js'
import { multiaddr } from '@multiformats/multiaddr'
import { pbStream, ProtobufStream } from 'it-pb-stream'
import type { Uint8ArrayList } from 'uint8arraylist'
import type { DuplexStream, Abortable } from './interfaces.js'

const log = logger('libp2p:circuit:v2:stop')

export interface HandleStopOptions {
connection: Connection
request: StopMessage
pbstr: ProtobufStream<Uint8ArrayList | Uint8Array>
pbstr: ProtobufStream<Stream>
}

const isValidStop = (request: StopMessage): boolean => {
Expand Down Expand Up @@ -66,7 +64,7 @@ export interface StopOptions {
export async function stop ({
connection,
request
}: StopOptions): Promise<Abortable<DuplexStream> | undefined> {
}: StopOptions): Promise<Stream | undefined> {
const stream = await connection.newStream([RELAY_V2_STOP_CODEC])
log('starting circuit relay v2 stop request to %s', connection.remotePeer)
const pbstr = pbStream(stream)
Expand All @@ -86,10 +84,7 @@ export async function stop ({
}
if (response.status === Status.OK) {
log('stop request to %s was successful', connection.remotePeer)
return {
value: pbstr.unwrap(),
abort: (err: Error) => stream.abort(err)
}
return pbstr.unwrap()
}

log('stop request failed with code %d', response.status)
Expand Down
113 changes: 65 additions & 48 deletions src/circuit/v2/util.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import type { Sink, Source } from 'it-stream-types'
import { Uint8ArrayList } from 'uint8arraylist'
import type { Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'
import type { Limit } from './pb/index.js'
import { logger } from '@libp2p/logger'
import type { DuplexStream, Abortable } from './interfaces.js'
import type { Stream } from '@libp2p/interface-connection'

const log = logger('libp2p:circuit:v2:util')

const doRelay = (src: DuplexStream, dst: DuplexStream) => {
const doRelay = (src: Stream, dst: Stream) => {
queueMicrotask(() => {
achingbrain marked this conversation as resolved.
Show resolved Hide resolved
void dst.sink(src.source).catch(err => log.error('error while relating streams:', err))
})
Expand All @@ -16,85 +16,102 @@ const doRelay = (src: DuplexStream, dst: DuplexStream) => {
})
}

export function createLimitedRelay (source: Abortable<DuplexStream>, destination: Abortable<DuplexStream>, limit?: Limit) {
export function createLimitedRelay (source: Stream, destination: Stream, limit?: Limit) {
// trivial case
if (limit == null) {
doRelay(source.value, destination.value)
doRelay(source, destination)
return
}
const dataLimit = limit.data ?? BigInt(0)

const dataLimit = limit.data ?? 0n
const durationLimit = limit.duration ?? 0
const src = durationLimitDuplex(dataLimitDuplex(source, dataLimit), durationLimit)
const dst = durationLimitDuplex(dataLimitDuplex(destination, dataLimit), durationLimit)
doRelay(src.value, dst.value)

doRelay(src, dst)
}

const dataLimitSource = (source: Source<Uint8ArrayList>, abort: (err: Error) => void, limit: bigint): Source<Uint8ArrayList> => {
if (limit === BigInt(0)) {
return source
const dataLimitSource = (stream: Stream, limit: bigint): Stream => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe? While the implementation of it-pb-stream returns the underlying stream, the return type only guarantees returning a Duplex. The stream functions could be erased from the returned value in a future release without changing the API, which would cause this to break.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.0.1 changes the unwrap return type to be the type of the wrapped stream, so it should be safe.

if (limit === 0n) {
return stream
}

return (async function * (): Source<Uint8ArrayList> {
let total = BigInt(0)
const source = stream.source

stream.source = (async function * (): Source<Uint8ArrayList> {
let total = 0n

for await (const buf of source) {
const len = BigInt(buf.length)
const len = BigInt(buf.byteLength)
if (total + len > limit) {
log.error('attempted to send more data than limit: %s, resetting stream', limit.toString())
abort(new Error('exceeded connection data limit'))
stream.abort(new Error('exceeded connection data limit'))
return
}
total += len

yield buf

total += len
}
})()

return stream
}

const adaptSource = (source: Source<Uint8ArrayList | Uint8Array>): Source<Uint8ArrayList> => (async function * () {
for await (const buf of source) {
if (buf instanceof Uint8Array) {
yield Uint8ArrayList.fromUint8Arrays([buf])
} else {
yield buf
}
const dataLimitSink = (stream: Stream, limit: bigint): Stream => {
if (limit === 0n) {
return stream
}
})()

const dataLimitSink = (sink: Sink<Uint8ArrayList | Uint8Array>, abort: (err: Error) => void, limit: bigint): Sink<Uint8ArrayList | Uint8Array> => {
return async (source: Source<Uint8ArrayList | Uint8Array>) => await sink(
dataLimitSource(
adaptSource(source),
abort,
limit
)
)
}

const dataLimitDuplex = (duplex: Abortable<DuplexStream>, limit: bigint): Abortable<DuplexStream> => {
return {
...duplex,
value: {
...duplex.value,
source: dataLimitSource(duplex.value.source, duplex.abort, limit),
sink: dataLimitSink(duplex.value.sink, duplex.abort, limit)
}
const sink = stream.sink

stream.sink = async (source: Source<Uint8ArrayList | Uint8Array>) => {
await sink((async function * (): Source<Uint8ArrayList | Uint8Array> {
let total = 0n

for await (const buf of source) {
const len = BigInt(buf.byteLength)
if (total + len > limit) {
log.error('attempted to send more data than limit: %s, resetting stream', limit.toString())
stream.abort(new Error('exceeded connection data limit'))
return
}

total += len
yield buf
}
})())
}

return stream
}

const dataLimitDuplex = (stream: Stream, limit: bigint): Stream => {
dataLimitSource(stream, limit)
dataLimitSink(stream, limit)

return stream
}

const durationLimitDuplex = (duplex: Abortable<DuplexStream>, limit: number): Abortable<DuplexStream> => {
const durationLimitDuplex = (stream: Stream, limit: number): Stream => {
if (limit === 0) {
return duplex
return stream
}

let timedOut = false
const timeout = setTimeout(
() => {
timedOut = true
duplex.abort(new Error('exceeded connection duration limit'))
stream.abort(new Error('exceeded connection duration limit'))
},
limit
)
const source = (async function * (): Source<Uint8ArrayList> {

const source = stream.source

stream.source = (async function * (): Source<Uint8ArrayList> {
try {
for await (const buf of duplex.value.source) {
for await (const buf of source) {
if (timedOut) {
return
}
Expand All @@ -105,5 +122,5 @@ const durationLimitDuplex = (duplex: Abortable<DuplexStream>, limit: number): Ab
}
})()

return { ...duplex, value: { ...duplex.value, source } }
return stream
}
Loading