diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index f2c2dbcdd1..d9a6f6e3b6 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -57,3 +57,8 @@ export const MAX_INCOMING_PENDING_CONNECTIONS = 10 * failed to dial. */ export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure' + +/** + * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxDialQueueLength + */ +export const MAX_DIAL_QUEUE_LENGTH = 500 diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 01fa72af23..6b97917d09 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -13,7 +13,8 @@ import { DIAL_TIMEOUT, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, - LAST_DIAL_FAILURE_KEY + LAST_DIAL_FAILURE_KEY, + MAX_DIAL_QUEUE_LENGTH } from './constants.js' import { resolveMultiaddrs } from './utils.js' import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting } from '@libp2p/interface' @@ -38,6 +39,7 @@ interface DialQueueJobOptions extends QueueAddOptions { interface DialerInit { addressSorter?: AddressSorter maxParallelDials?: number + maxDialQueueLength?: number maxPeerAddrsToDial?: number dialTimeout?: number resolvers?: Record @@ -47,6 +49,7 @@ interface DialerInit { const defaultOptions = { addressSorter: defaultAddressSort, maxParallelDials: MAX_PARALLEL_DIALS, + maxDialQueueLength: MAX_DIAL_QUEUE_LENGTH, maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL, dialTimeout: DIAL_TIMEOUT, resolvers: { @@ -70,6 +73,7 @@ export class DialQueue { private readonly components: DialQueueComponents private readonly addressSorter: AddressSorter private readonly maxPeerAddrsToDial: number + private readonly maxDialQueueLength: number private readonly dialTimeout: number private shutDownController: AbortController private readonly connections: PeerMap @@ -78,6 +82,7 @@ export class DialQueue { constructor (components: DialQueueComponents, init: DialerInit = {}) { this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial + this.maxDialQueueLength = init.maxDialQueueLength ?? defaultOptions.maxDialQueueLength this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout this.connections = init.connections ?? new PeerMap() this.log = components.logger.forComponent('libp2p:connection-manager:dial-queue') @@ -185,6 +190,10 @@ export class DialQueue { return existingDial.join(options) } + if (this.queue.size >= this.maxDialQueueLength) { + throw new CodeError('Dial queue is full', 'ERR_DIAL_QUEUE_FULL') + } + this.log('creating dial target for %p', peerId, multiaddrs.map(ma => ma.toString())) return this.queue.add(async (options) => { diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index c48ca0e5aa..f859c70c1c 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -8,7 +8,7 @@ import { codes } from '../errors.js' import { getPeerAddress } from '../get-peer.js' import { AutoDial } from './auto-dial.js' import { ConnectionPruner } from './connection-pruner.js' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js' +import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js' import { DialQueue } from './dial-queue.js' import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus, PeerRouting } from '@libp2p/interface' import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal' @@ -81,6 +81,15 @@ export interface ConnectionManagerInit { */ maxParallelDials?: number + /** + * The maximum size the dial queue is allowed to grow to. Promises returned + * when dialing peers after this limit is reached will not resolve until the + * queue size falls beneath this size. + * + * @default 500 + */ + maxDialQueueLength?: number + /** * Maximum number of addresses allowed for a given peer before giving up * @@ -238,6 +247,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { this.dialQueue = new DialQueue(components, { addressSorter: init.addressSorter ?? defaultAddressSort, maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS, + maxDialQueueLength: init.maxDialQueueLength ?? MAX_DIAL_QUEUE_LENGTH, maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL, dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT, resolvers: init.resolvers ?? { diff --git a/packages/libp2p/test/connection-manager/direct.spec.ts b/packages/libp2p/test/connection-manager/direct.spec.ts index db5168dc2f..6c56f80873 100644 --- a/packages/libp2p/test/connection-manager/direct.spec.ts +++ b/packages/libp2p/test/connection-manager/direct.spec.ts @@ -27,7 +27,7 @@ import { DefaultConnectionManager } from '../../src/connection-manager/index.js' import { codes as ErrorCodes } from '../../src/errors.js' import { createLibp2p } from '../../src/index.js' import { DefaultTransportManager } from '../../src/transport-manager.js' -import type { Libp2p, Connection, PeerId } from '@libp2p/interface' +import type { Libp2p, Connection, PeerId, Transport } from '@libp2p/interface' import type { TransportManager } from '@libp2p/interface-internal' import type { Multiaddr } from '@multiformats/multiaddr' @@ -504,4 +504,31 @@ describe('libp2p.dialer (direct, WebSockets)', () => { .to.eventually.be.rejected() .and.to.have.property('code', ErrorCodes.ERR_DIALED_SELF) }) + + it('should limit the maximum dial queue size', async () => { + const transport = stubInterface({ + filter: (ma) => ma, + dial: async () => { + await delay(1000) + return stubInterface() + } + }) + + libp2p = await createLibp2p({ + peerId, + transports: [ + () => transport + ], + connectionManager: { + maxDialQueueLength: 1, + maxParallelDials: 1 + } + }) + + await expect(Promise.all([ + libp2p.dial(multiaddr('/ip4/127.0.0.1/tcp/1234')), + libp2p.dial(multiaddr('/ip4/127.0.0.1/tcp/1235')) + ])).to.eventually.be.rejected + .with.property('code', 'ERR_DIAL_QUEUE_FULL') + }) })