Skip to content

Commit

Permalink
fix: limit max dial queue size (#2472)
Browse files Browse the repository at this point in the history
Add an upper bound to the maximum length of the dial queue.

Any attempts to dial when the queue is full will throw.
  • Loading branch information
achingbrain authored Apr 9, 2024
1 parent 936dbba commit 0cfcc4d
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 3 deletions.
5 changes: 5 additions & 0 deletions packages/libp2p/src/connection-manager/constants.defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,8 @@ export const MAX_INCOMING_PENDING_CONNECTIONS = 10
* failed to dial.
*/
export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure'

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxDialQueueLength
*/
export const MAX_DIAL_QUEUE_LENGTH = 500
11 changes: 10 additions & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import {
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS,
MAX_PEER_ADDRS_TO_DIAL,
LAST_DIAL_FAILURE_KEY
LAST_DIAL_FAILURE_KEY,
MAX_DIAL_QUEUE_LENGTH
} from './constants.js'
import { resolveMultiaddrs } from './utils.js'
import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting } from '@libp2p/interface'
Expand All @@ -38,6 +39,7 @@ interface DialQueueJobOptions extends QueueAddOptions {
interface DialerInit {
addressSorter?: AddressSorter
maxParallelDials?: number
maxDialQueueLength?: number
maxPeerAddrsToDial?: number
dialTimeout?: number
resolvers?: Record<string, Resolver>
Expand All @@ -47,6 +49,7 @@ interface DialerInit {
const defaultOptions = {
addressSorter: defaultAddressSort,
maxParallelDials: MAX_PARALLEL_DIALS,
maxDialQueueLength: MAX_DIAL_QUEUE_LENGTH,
maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL,
dialTimeout: DIAL_TIMEOUT,
resolvers: {
Expand All @@ -70,6 +73,7 @@ export class DialQueue {
private readonly components: DialQueueComponents
private readonly addressSorter: AddressSorter
private readonly maxPeerAddrsToDial: number
private readonly maxDialQueueLength: number
private readonly dialTimeout: number
private shutDownController: AbortController
private readonly connections: PeerMap<Connection[]>
Expand All @@ -78,6 +82,7 @@ export class DialQueue {
constructor (components: DialQueueComponents, init: DialerInit = {}) {
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
this.maxDialQueueLength = init.maxDialQueueLength ?? defaultOptions.maxDialQueueLength
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout
this.connections = init.connections ?? new PeerMap()
this.log = components.logger.forComponent('libp2p:connection-manager:dial-queue')
Expand Down Expand Up @@ -185,6 +190,10 @@ export class DialQueue {
return existingDial.join(options)
}

if (this.queue.size >= this.maxDialQueueLength) {
throw new CodeError('Dial queue is full', 'ERR_DIAL_QUEUE_FULL')
}

this.log('creating dial target for %p', peerId, multiaddrs.map(ma => ma.toString()))

return this.queue.add(async (options) => {
Expand Down
12 changes: 11 additions & 1 deletion packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import { AutoDial } from './auto-dial.js'
import { ConnectionPruner } from './connection-pruner.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { DialQueue } from './dial-queue.js'
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus, PeerRouting } from '@libp2p/interface'
import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal'
Expand Down Expand Up @@ -81,6 +81,15 @@ export interface ConnectionManagerInit {
*/
maxParallelDials?: number

/**
* The maximum size the dial queue is allowed to grow to. Promises returned
* when dialing peers after this limit is reached will not resolve until the
* queue size falls beneath this size.
*
* @default 500
*/
maxDialQueueLength?: number

/**
* Maximum number of addresses allowed for a given peer before giving up
*
Expand Down Expand Up @@ -238,6 +247,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
this.dialQueue = new DialQueue(components, {
addressSorter: init.addressSorter ?? defaultAddressSort,
maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS,
maxDialQueueLength: init.maxDialQueueLength ?? MAX_DIAL_QUEUE_LENGTH,
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
resolvers: init.resolvers ?? {
Expand Down
29 changes: 28 additions & 1 deletion packages/libp2p/test/connection-manager/direct.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { codes as ErrorCodes } from '../../src/errors.js'
import { createLibp2p } from '../../src/index.js'
import { DefaultTransportManager } from '../../src/transport-manager.js'
import type { Libp2p, Connection, PeerId } from '@libp2p/interface'
import type { Libp2p, Connection, PeerId, Transport } from '@libp2p/interface'
import type { TransportManager } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'

Expand Down Expand Up @@ -504,4 +504,31 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_DIALED_SELF)
})

it('should limit the maximum dial queue size', async () => {
const transport = stubInterface<Transport>({
filter: (ma) => ma,
dial: async () => {
await delay(1000)
return stubInterface<Connection>()
}
})

libp2p = await createLibp2p({
peerId,
transports: [
() => transport
],
connectionManager: {
maxDialQueueLength: 1,
maxParallelDials: 1
}
})

await expect(Promise.all([
libp2p.dial(multiaddr('/ip4/127.0.0.1/tcp/1234')),
libp2p.dial(multiaddr('/ip4/127.0.0.1/tcp/1235'))
])).to.eventually.be.rejected
.with.property('code', 'ERR_DIAL_QUEUE_FULL')
})
})

0 comments on commit 0cfcc4d

Please sign in to comment.