From ab901790810d8ce59724af1706c9a9e74341b8ee Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 15 Aug 2024 09:24:06 +0100 Subject: [PATCH] fix!: remove autodialer (#2639) The autodialer is a feature from an older time when the DHT was less reliable and we didn't have things like the random walk component. There's not a lot of benefit in opening connections to any old peer, instead protocols now have better ways of targetting the kind of peers they require. Actively dialing peers harms platforms where connections are extremely expensive such as react-native so this feature has been removed. Closes #2621 Fixes #2418 BREAKING CHANGE: the autodialer has been removed as well as the corresponding config keys --------- Co-authored-by: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> --- doc/CONFIGURATION.md | 3 +- doc/LIMITS.md | 6 - interop/test/fixtures/get-libp2p.ts | 3 - packages/integration-tests/.aegir.js | 3 +- .../integration-tests/test/bootstrap.spec.ts | 51 +++ .../test/circuit-relay.node.ts | 3 - packages/integration-tests/test/fetch.spec.ts | 3 - packages/integration-tests/test/interop.ts | 5 +- packages/integration-tests/test/peers.spec.ts | 59 ++++ packages/libp2p/.aegir.js | 3 +- packages/libp2p/package.json | 1 + .../src/connection-manager/auto-dial.ts | 285 ----------------- .../connection-manager/constants.browser.ts | 10 - .../connection-manager/constants.defaults.ts | 41 +-- .../src/connection-manager/constants.ts | 10 - .../src/connection-manager/dial-queue.ts | 19 +- .../libp2p/src/connection-manager/index.ts | 191 +++++------- .../src/connection-manager/reconnect-queue.ts | 134 ++++++++ .../test/connection-manager/auto-dial.spec.ts | 293 ------------------ .../test/connection-manager/direct.node.ts | 1 - .../test/connection-manager/direct.spec.ts | 1 - .../test/connection-manager/index.node.ts | 139 --------- .../test/connection-manager/index.spec.ts | 15 +- .../reconnect-queue.spec.ts | 117 +++++++ packages/libp2p/test/registrar/errors.spec.ts | 1 - .../peer-discovery-bootstrap/package.json | 1 + .../peer-discovery-bootstrap/src/index.ts | 18 +- .../test/bootstrap.spec.ts | 42 ++- .../test/compliance.spec.ts | 4 +- packages/transport-webrtc/.aegir.js | 1 - packages/transport-webrtc/test/basics.spec.ts | 3 - .../transport-webtransport/test/browser.ts | 3 - 32 files changed, 532 insertions(+), 937 deletions(-) create mode 100644 packages/integration-tests/test/peers.spec.ts delete mode 100644 packages/libp2p/src/connection-manager/auto-dial.ts create mode 100644 packages/libp2p/src/connection-manager/reconnect-queue.ts delete mode 100644 packages/libp2p/test/connection-manager/auto-dial.spec.ts create mode 100644 packages/libp2p/test/connection-manager/reconnect-queue.spec.ts diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index 2ef97839c1..b5debc6e4d 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -626,8 +626,7 @@ const node = await createLibp2p({ noise() ], connectionManager: { - maxConnections: Infinity, - minConnections: 0 + maxConnections: Infinity } }) ``` diff --git a/doc/LIMITS.md b/doc/LIMITS.md index 5d5efd523e..2187ffabc9 100644 --- a/doc/LIMITS.md +++ b/doc/LIMITS.md @@ -38,12 +38,6 @@ const node = await createLibp2p({ */ maxConnections: 100, - /** - * If the number of open connections goes below this number, the node - * will try to connect to randomly selected peers from the peer store - */ - minConnections: 50, - /** * How many connections can be open but not yet upgraded */ diff --git a/interop/test/fixtures/get-libp2p.ts b/interop/test/fixtures/get-libp2p.ts index f5196366f2..069fc90326 100644 --- a/interop/test/fixtures/get-libp2p.ts +++ b/interop/test/fixtures/get-libp2p.ts @@ -25,9 +25,6 @@ const IP = process.env.ip ?? '0.0.0.0' export async function getLibp2p (): Promise> { const options: Libp2pOptions<{ ping: PingService, identify: Identify }> = { start: true, - connectionManager: { - minConnections: 0 - }, connectionGater: { denyDialMultiaddr: async () => false }, diff --git a/packages/integration-tests/.aegir.js b/packages/integration-tests/.aegir.js index 078dd47d7d..22a2d0a630 100644 --- a/packages/integration-tests/.aegir.js +++ b/packages/integration-tests/.aegir.js @@ -24,8 +24,7 @@ export default { const peerId = await createEd25519PeerId() const libp2p = await createLibp2p({ connectionManager: { - inboundConnectionThreshold: Infinity, - minConnections: 0 + inboundConnectionThreshold: Infinity }, addresses: { listen: [ diff --git a/packages/integration-tests/test/bootstrap.spec.ts b/packages/integration-tests/test/bootstrap.spec.ts index 3334ccf8ac..e9f80a169a 100644 --- a/packages/integration-tests/test/bootstrap.spec.ts +++ b/packages/integration-tests/test/bootstrap.spec.ts @@ -2,8 +2,11 @@ import { bootstrap } from '@libp2p/bootstrap' import { TypedEventEmitter, peerDiscoverySymbol } from '@libp2p/interface' +import { mplex } from '@libp2p/mplex' import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { plaintext } from '@libp2p/plaintext' import { webSockets } from '@libp2p/websockets' +import * as Filter from '@libp2p/websockets/filters' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { createLibp2p } from 'libp2p' @@ -103,4 +106,52 @@ describe('bootstrap', () => { return deferred.promise }) + + it('bootstrap should dial all peers in the list', async () => { + const deferred = defer() + + const bootstrappers = [ + `${process.env.RELAY_MULTIADDR}` + ] + + libp2p = await createLibp2p({ + connectionEncryption: [ + plaintext() + ], + transports: [ + webSockets({ + filter: Filter.all + }) + ], + streamMuxers: [ + mplex() + ], + peerDiscovery: [ + bootstrap({ + list: bootstrappers + }) + ], + connectionGater: { + denyDialMultiaddr: () => false + } + }) + + const expectedPeers = new Set( + bootstrappers.map(ma => multiaddr(ma).getPeerId()) + ) + + libp2p.addEventListener('connection:open', (evt) => { + const { remotePeer } = evt.detail + + expectedPeers.delete(remotePeer.toString()) + if (expectedPeers.size === 0) { + libp2p.removeEventListener('connection:open') + deferred.resolve() + } + }) + + await libp2p.start() + + return deferred.promise + }) }) diff --git a/packages/integration-tests/test/circuit-relay.node.ts b/packages/integration-tests/test/circuit-relay.node.ts index 162fec3e41..e0647b9ce4 100644 --- a/packages/integration-tests/test/circuit-relay.node.ts +++ b/packages/integration-tests/test/circuit-relay.node.ts @@ -42,9 +42,6 @@ async function createClient (options: Libp2pOptions = {}): Promise { connectionEncryption: [ plaintext() ], - connectionManager: { - minConnections: 0 - }, services: { identify: identify() }, diff --git a/packages/integration-tests/test/fetch.spec.ts b/packages/integration-tests/test/fetch.spec.ts index c8ca47ade2..ad9c6569ac 100644 --- a/packages/integration-tests/test/fetch.spec.ts +++ b/packages/integration-tests/test/fetch.spec.ts @@ -11,9 +11,6 @@ async function createNode (): Promise> { return createLibp2p(createBaseOptions({ services: { fetch: fetch() - }, - connectionManager: { - minConnections: 0 } })) } diff --git a/packages/integration-tests/test/interop.ts b/packages/integration-tests/test/interop.ts index a768481179..69f85de94e 100644 --- a/packages/integration-tests/test/interop.ts +++ b/packages/integration-tests/test/interop.ts @@ -135,10 +135,7 @@ async function createJsPeer (options: SpawnOptions): Promise { }, transports: [tcp(), circuitRelayTransport()], streamMuxers: [], - connectionEncryption: [noise()], - connectionManager: { - minConnections: 0 - } + connectionEncryption: [noise()] } if (options.noListen !== true) { diff --git a/packages/integration-tests/test/peers.spec.ts b/packages/integration-tests/test/peers.spec.ts new file mode 100644 index 0000000000..2992de118d --- /dev/null +++ b/packages/integration-tests/test/peers.spec.ts @@ -0,0 +1,59 @@ +/* eslint-env mocha */ + +import { KEEP_ALIVE, type Libp2p } from '@libp2p/interface' +import { expect } from 'aegir/chai' +import { createLibp2p } from 'libp2p' +import pWaitFor from 'p-wait-for' +import { createBaseOptions } from './fixtures/base-options.js' + +describe('peers', () => { + let nodes: Libp2p[] + + beforeEach(async () => { + nodes = await Promise.all([ + createLibp2p(createBaseOptions()), + createLibp2p(createBaseOptions()), + createLibp2p(createBaseOptions()) + ]) + }) + + afterEach(async () => Promise.all(nodes.map(async n => { await n.stop() }))) + + it('should redial a peer tagged with KEEP_ALIVE', async () => { + await nodes[0].dial(nodes[1].getMultiaddrs()) + + expect(nodes[0].getConnections(nodes[1].peerId)).to.not.be.empty() + + await nodes[0].peerStore.merge(nodes[1].peerId, { + tags: { + [KEEP_ALIVE]: { + value: 1 + } + } + }) + + await Promise.all( + nodes[0].getConnections(nodes[1].peerId).map(async conn => conn.close()) + ) + + await pWaitFor(async () => { + return nodes[0].getConnections(nodes[1].peerId).length > 0 + }, { + interval: 100, + timeout: { + milliseconds: 5000, + message: 'Did not reconnect to peer tagged with KEEP_ALIVE' + } + }) + }) + + it('should store the multiaddr for a peer after a successful dial', async () => { + await nodes[0].dial(nodes[1].getMultiaddrs()) + + expect(nodes[0].getConnections(nodes[1].peerId)).to.not.be.empty() + + const peer = await nodes[0].peerStore.get(nodes[1].peerId) + expect(peer.addresses).to.not.be.empty() + expect(peer.metadata.get('last-dial-success')).to.be.ok() + }) +}) diff --git a/packages/libp2p/.aegir.js b/packages/libp2p/.aegir.js index a89fba7ad6..a7250fb938 100644 --- a/packages/libp2p/.aegir.js +++ b/packages/libp2p/.aegir.js @@ -20,8 +20,7 @@ export default { const peerId = await createEd25519PeerId() const libp2p = await createLibp2p({ connectionManager: { - inboundConnectionThreshold: Infinity, - minConnections: 0 + inboundConnectionThreshold: Infinity }, addresses: { listen: [ diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index b20d3ef6d1..1f38a9ad90 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -107,6 +107,7 @@ "merge-options": "^3.0.4", "multiformats": "^13.1.0", "p-defer": "^4.0.1", + "p-retry": "^6.2.0", "progress-events": "^1.0.0", "race-event": "^1.3.0", "race-signal": "^1.0.2", diff --git a/packages/libp2p/src/connection-manager/auto-dial.ts b/packages/libp2p/src/connection-manager/auto-dial.ts deleted file mode 100644 index 16fada9221..0000000000 --- a/packages/libp2p/src/connection-manager/auto-dial.ts +++ /dev/null @@ -1,285 +0,0 @@ -import { PeerMap, PeerSet } from '@libp2p/peer-collections' -import { PeerQueue } from '@libp2p/utils/peer-queue' -import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js' -import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Startable, Metrics } from '@libp2p/interface' -import type { ConnectionManager } from '@libp2p/interface-internal' - -interface AutoDialInit { - minConnections?: number - maxQueueLength?: number - autoDialConcurrency?: number - autoDialPriority?: number - autoDialInterval?: number - autoDialPeerRetryThreshold?: number - autoDialDiscoveredPeersDebounce?: number -} - -interface AutoDialComponents { - connectionManager: ConnectionManager - peerStore: PeerStore - events: TypedEventTarget - logger: ComponentLogger - metrics?: Metrics -} - -const defaultOptions = { - minConnections: MIN_CONNECTIONS, - maxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH, - autoDialConcurrency: AUTO_DIAL_CONCURRENCY, - autoDialPriority: AUTO_DIAL_PRIORITY, - autoDialInterval: AUTO_DIAL_INTERVAL, - autoDialPeerRetryThreshold: AUTO_DIAL_PEER_RETRY_THRESHOLD, - autoDialDiscoveredPeersDebounce: AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE -} - -export class AutoDial implements Startable { - private readonly connectionManager: ConnectionManager - private readonly peerStore: PeerStore - private readonly queue: PeerQueue - private readonly minConnections: number - private readonly autoDialPriority: number - private readonly autoDialIntervalMs: number - private readonly autoDialMaxQueueLength: number - private readonly autoDialPeerRetryThresholdMs: number - private readonly autoDialDiscoveredPeersDebounce: number - private autoDialInterval?: ReturnType - private started: boolean - private running: boolean - private readonly log: Logger - - /** - * Proactively tries to connect to known peers stored in the PeerStore. - * It will keep the number of connections below the upper limit and sort - * the peers to connect based on whether we know their keys and protocols. - */ - constructor (components: AutoDialComponents, init: AutoDialInit) { - this.connectionManager = components.connectionManager - this.peerStore = components.peerStore - this.minConnections = init.minConnections ?? defaultOptions.minConnections - this.autoDialPriority = init.autoDialPriority ?? defaultOptions.autoDialPriority - this.autoDialIntervalMs = init.autoDialInterval ?? defaultOptions.autoDialInterval - this.autoDialMaxQueueLength = init.maxQueueLength ?? defaultOptions.maxQueueLength - this.autoDialPeerRetryThresholdMs = init.autoDialPeerRetryThreshold ?? defaultOptions.autoDialPeerRetryThreshold - this.autoDialDiscoveredPeersDebounce = init.autoDialDiscoveredPeersDebounce ?? defaultOptions.autoDialDiscoveredPeersDebounce - this.log = components.logger.forComponent('libp2p:connection-manager:auto-dial') - this.started = false - this.running = false - this.queue = new PeerQueue({ - concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency, - metricName: 'libp2p_autodial_queue', - metrics: components.metrics - }) - this.queue.addEventListener('error', (evt) => { - this.log.error('error during auto-dial', evt.detail) - }) - - // check the min connection limit whenever a peer disconnects - components.events.addEventListener('connection:close', () => { - this.autoDial() - .catch(err => { - this.log.error(err) - }) - }) - - // sometimes peers are discovered in quick succession so add a small - // debounce to ensure all eligible peers are autodialed - let debounce: ReturnType - - // when new peers are discovered, dial them if we don't have - // enough connections - components.events.addEventListener('peer:discovery', () => { - clearTimeout(debounce) - debounce = setTimeout(() => { - this.autoDial() - .catch(err => { - this.log.error(err) - }) - }, this.autoDialDiscoveredPeersDebounce) - }) - } - - isStarted (): boolean { - return this.started - } - - start (): void { - this.started = true - } - - afterStart (): void { - this.autoDial() - .catch(err => { - this.log.error('error while autodialing', err) - }) - } - - stop (): void { - // clear the queue - this.queue.clear() - clearTimeout(this.autoDialInterval) - this.started = false - this.running = false - } - - async autoDial (): Promise { - if (!this.started || this.running) { - return - } - - const connections = this.connectionManager.getConnectionsMap() - const numConnections = connections.size - - // already have enough connections - if (numConnections >= this.minConnections) { - if (this.minConnections > 0) { - this.log.trace('have enough connections %d/%d', numConnections, this.minConnections) - } - - // no need to schedule next autodial as it will be run when on - // connection:close event - return - } - - if (this.queue.size > this.autoDialMaxQueueLength) { - this.log('not enough connections %d/%d but auto dial queue is full', numConnections, this.minConnections) - this.sheduleNextAutodial() - return - } - - this.running = true - - this.log('not enough connections %d/%d - will dial peers to increase the number of connections', numConnections, this.minConnections) - - const dialQueue = new PeerSet( - // @ts-expect-error boolean filter removes falsy peer IDs - this.connectionManager.getDialQueue() - .map(queue => queue.peerId) - .filter(Boolean) - ) - - // sort peers on whether we know protocols or public keys for them - const peers = await this.peerStore.all({ - filters: [ - // remove some peers - (peer) => { - // remove peers without addresses - if (peer.addresses.length === 0) { - this.log.trace('not autodialing %p because they have no addresses', peer.id) - return false - } - - // remove peers we are already connected to - if (connections.has(peer.id)) { - this.log.trace('not autodialing %p because they are already connected', peer.id) - return false - } - - // remove peers we are already dialling - if (dialQueue.has(peer.id)) { - this.log.trace('not autodialing %p because they are already being dialed', peer.id) - return false - } - - // remove peers already in the autodial queue - if (this.queue.has(peer.id)) { - this.log.trace('not autodialing %p because they are already being autodialed', peer.id) - return false - } - - return true - } - ] - }) - - // shuffle the peers - this is so peers with the same tag values will be - // dialled in a different order each time - const shuffledPeers = peers.sort(() => Math.random() > 0.5 ? 1 : -1) - - // sort shuffled peers by tag value - const peerValues = new PeerMap() - for (const peer of shuffledPeers) { - if (peerValues.has(peer.id)) { - continue - } - - // sum all tag values - peerValues.set(peer.id, [...peer.tags.values()].reduce((acc, curr) => { - return acc + curr.value - }, 0)) - } - - // sort by value, highest to lowest - const sortedPeers = shuffledPeers.sort((a, b) => { - const peerAValue = peerValues.get(a.id) ?? 0 - const peerBValue = peerValues.get(b.id) ?? 0 - - if (peerAValue > peerBValue) { - return -1 - } - - if (peerAValue < peerBValue) { - return 1 - } - - return 0 - }) - - const peersThatHaveNotFailed = sortedPeers.filter(peer => { - const lastDialFailure = peer.metadata.get(LAST_DIAL_FAILURE_KEY) - - if (lastDialFailure == null) { - return true - } - - const lastDialFailureTimestamp = parseInt(uint8ArrayToString(lastDialFailure)) - - if (isNaN(lastDialFailureTimestamp)) { - return true - } - - // only dial if the time since the last failure is above the retry threshold - return Date.now() - lastDialFailureTimestamp > this.autoDialPeerRetryThresholdMs - }) - - this.log('selected %d/%d peers to dial', peersThatHaveNotFailed.length, peers.length) - - for (const peer of peersThatHaveNotFailed) { - this.queue.add(async () => { - const numConnections = this.connectionManager.getConnectionsMap().size - - // Check to see if we still need to auto dial - if (numConnections >= this.minConnections) { - this.log('got enough connections now %d/%d', numConnections, this.minConnections) - this.queue.clear() - return - } - - this.log('connecting to a peerStore stored peer %p', peer.id) - await this.connectionManager.openConnection(peer.id, { - priority: this.autoDialPriority - }) - }, { - peerId: peer.id - }).catch(err => { - this.log.error('could not connect to peerStore stored peer', err) - }) - } - - this.running = false - this.sheduleNextAutodial() - } - - private sheduleNextAutodial (): void { - if (!this.started) { - return - } - - this.autoDialInterval = setTimeout(() => { - this.autoDial() - .catch(err => { - this.log.error('error while autodialing', err) - }) - }, this.autoDialIntervalMs) - } -} diff --git a/packages/libp2p/src/connection-manager/constants.browser.ts b/packages/libp2p/src/connection-manager/constants.browser.ts index 2c369c1245..cdafdbcdb2 100644 --- a/packages/libp2p/src/connection-manager/constants.browser.ts +++ b/packages/libp2p/src/connection-manager/constants.browser.ts @@ -1,10 +1,5 @@ export * from './constants.defaults.js' -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#minConnections - */ -export const MIN_CONNECTIONS = 5 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxConnections */ @@ -14,8 +9,3 @@ export const MAX_CONNECTIONS = 100 * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDials */ export const MAX_PARALLEL_DIALS = 50 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold - */ -export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60 * 7 diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index d9a6f6e3b6..628088e8ed 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -13,31 +13,6 @@ export const INBOUND_UPGRADE_TIMEOUT = 2e3 */ export const MAX_PEER_ADDRS_TO_DIAL = 25 -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialInterval - */ -export const AUTO_DIAL_INTERVAL = 5000 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialConcurrency - */ -export const AUTO_DIAL_CONCURRENCY = 25 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialPriority - */ -export const AUTO_DIAL_PRIORITY = 0 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialMaxQueueLength - */ -export const AUTO_DIAL_MAX_QUEUE_LENGTH = 100 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialDiscoveredPeersDebounce - */ -export const AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE = 10 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundConnectionThreshold */ @@ -48,16 +23,28 @@ export const INBOUND_CONNECTION_THRESHOLD = 5 */ export const MAX_INCOMING_PENDING_CONNECTIONS = 10 +/** + * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelReconnects + */ +export const MAX_PARALLEL_RECONNECTS = 5 + /** * Store as part of the peer store metadata for a given peer, the value for this - * key is a timestamp of the last time a dial attempted failed with the relevant - * peer stored as a string. + * key is a timestamp of the last time a dial attempt failed with the timestamp + * stored as a string. * * Used to insure we do not endlessly try to auto dial peers we have recently * failed to dial. */ export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure' +/** + * Store as part of the peer store metadata for a given peer, the value for this + * key is a timestamp of the last time a dial attempt succeeded with the + * timestamp stored as a string. + */ +export const LAST_DIAL_SUCCESS_KEY = 'last-dial-success' + /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxDialQueueLength */ diff --git a/packages/libp2p/src/connection-manager/constants.ts b/packages/libp2p/src/connection-manager/constants.ts index a6a6c486f4..422074f57c 100644 --- a/packages/libp2p/src/connection-manager/constants.ts +++ b/packages/libp2p/src/connection-manager/constants.ts @@ -1,10 +1,5 @@ export * from './constants.defaults.js' -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#minConnections - */ -export const MIN_CONNECTIONS = 50 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxConnections */ @@ -14,8 +9,3 @@ export const MAX_CONNECTIONS = 300 * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDials */ export const MAX_PARALLEL_DIALS = 100 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold - */ -export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60 diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 9d927e4c51..d88b04d254 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -16,7 +16,8 @@ import { MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, LAST_DIAL_FAILURE_KEY, - MAX_DIAL_QUEUE_LENGTH + MAX_DIAL_QUEUE_LENGTH, + LAST_DIAL_SUCCESS_KEY } from './constants.js' import { resolveMultiaddrs } from './utils.js' import { DEFAULT_DIAL_PRIORITY } from './index.js' @@ -244,6 +245,20 @@ export class DialQueue { this.log('dial to %a succeeded', address.multiaddr) + // record the successful dial and the address + try { + await this.components.peerStore.merge(conn.remotePeer, { + multiaddrs: [ + conn.remoteAddr + ], + metadata: { + [LAST_DIAL_SUCCESS_KEY]: uint8ArrayFromString(Date.now().toString()) + } + }) + } catch (err: any) { + this.log.error('could not update last dial failure key for %p', peerId, err) + } + return conn } catch (err: any) { this.log.error('dial failed to %a', address.multiaddr, err) @@ -251,7 +266,7 @@ export class DialQueue { if (peerId != null) { // record the failed dial try { - await this.components.peerStore.patch(peerId, { + await this.components.peerStore.merge(peerId, { metadata: { [LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString()) } diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index 72972f060e..5b2e59e778 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -1,4 +1,4 @@ -import { InvalidParametersError, KEEP_ALIVE, NotStartedError } from '@libp2p/interface' +import { InvalidParametersError, NotStartedError, start, stop } from '@libp2p/interface' import { PeerMap } from '@libp2p/peer-collections' import { defaultAddressSort } from '@libp2p/utils/address-sort' import { RateLimiter } from '@libp2p/utils/rate-limiter' @@ -6,11 +6,11 @@ import { type Multiaddr, type Resolver, multiaddr } from '@multiformats/multiadd import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' import { CustomProgressEvent } from 'progress-events' import { getPeerAddress } from '../get-peer.js' -import { AutoDial } from './auto-dial.js' import { ConnectionPruner } from './connection-pruner.js' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js' +import { DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL } from './constants.js' import { DialQueue } from './dial-queue.js' -import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus, PeerRouting, IsDialableOptions } from '@libp2p/interface' +import { ReconnectQueue } from './reconnect-queue.js' +import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, Startable, PendingDialStatus, PeerRouting, IsDialableOptions } from '@libp2p/interface' import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal' import type { JobStatus } from '@libp2p/utils/queue' @@ -18,57 +18,13 @@ export const DEFAULT_DIAL_PRIORITY = 50 export interface ConnectionManagerInit { /** - * The maximum number of connections libp2p is willing to have before it starts - * pruning connections to reduce resource usage. (default: 300, 100 in browsers) + * The maximum number of connections libp2p is willing to have before it + * starts pruning connections to reduce resource usage. + * + * @default 300/100 */ maxConnections?: number - /** - * The minimum number of connections below which libp2p will start to dial peers - * from the peer book. Setting this to 0 effectively disables this behaviour. - * (default: 50, 5 in browsers) - */ - minConnections?: number - - /** - * How long to wait between attempting to keep our number of concurrent connections - * above minConnections (default: 5000) - */ - autoDialInterval?: number - - /** - * When dialling peers from the peer book to keep the number of open connections - * above `minConnections`, add dials for this many peers to the dial queue - * at once. (default: 25) - */ - autoDialConcurrency?: number - - /** - * To allow user dials to take priority over auto dials, use this value as the - * dial priority. (default: 0) - */ - autoDialPriority?: number - - /** - * Limit the maximum number of peers to dial when trying to keep the number of - * open connections above `minConnections`. (default: 100) - */ - autoDialMaxQueueLength?: number - - /** - * When we've failed to dial a peer, do not autodial them again within this - * number of ms. (default: 1 minute, 7 minutes in browsers) - */ - autoDialPeerRetryThreshold?: number - - /** - * Newly discovered peers may be auto-dialed to increase the number of open - * connections, but they can be discovered in quick succession so add a small - * delay before attempting to dial them in case more peers have been - * discovered. (default: 10ms) - */ - autoDialDiscoveredPeersDebounce?: number - /** * Sort the known addresses of a peer before trying to dial, By default public * addresses will be dialled before private (e.g. loopback or LAN) addresses. @@ -77,7 +33,8 @@ export interface ConnectionManagerInit { /** * The maximum number of dials across all peers to execute in parallel. - * (default: 100, 50 in browsers) + * + * @default 100/50 */ maxParallelDials?: number @@ -105,7 +62,9 @@ export interface ConnectionManagerInit { /** * When a new inbound connection is opened, the upgrade process (e.g. protect, - * encrypt, multiplex etc) must complete within this number of ms. (default: 30s) + * encrypt, multiplex etc) must complete within this number of ms. + * + * @default 30000 */ inboundUpgradeTimeout?: number @@ -116,7 +75,8 @@ export interface ConnectionManagerInit { /** * A list of multiaddrs that will always be allowed (except if they are in the - * deny list) to open connections to this node even if we've reached maxConnections + * deny list) to open connections to this node even if we've reached + * maxConnections */ allow?: string[] @@ -133,23 +93,52 @@ export interface ConnectionManagerInit { inboundConnectionThreshold?: number /** - * The maximum number of parallel incoming connections allowed that have yet to - * complete the connection upgrade - e.g. choosing connection encryption, muxer, etc. - * (default: 10) + * The maximum number of parallel incoming connections allowed that have yet + * to complete the connection upgrade - e.g. choosing connection encryption, + * muxer, etc. + * + * @default 10 */ maxIncomingPendingConnections?: number + + /** + * When a peer tagged with `KEEP_ALIVE` disconnects, attempt to redial them + * this many times. + * + * @default 5 + */ + reconnectRetries?: number + + /** + * When a peer tagged with `KEEP_ALIVE` disconnects, wait this long between + * each retry. Note this will be multiplied by `reconnectFactor` to create an + * increasing retry backoff. + * + * @default 1000 + */ + reconnectRetryInterval?: number + + /** + * When a peer tagged with `KEEP_ALIVE` disconnects, apply this multiplication + * factor to the time interval between each retry. + * + * @default 2 + */ + reconnectBackoffFactor?: number + + /** + * When a peers tagged with `KEEP_ALIVE` disconnect, reconnect to this many at + * once. + * + * @default 5 + */ + maxParallelReconnects?: number } const defaultOptions = { - minConnections: MIN_CONNECTIONS, maxConnections: MAX_CONNECTIONS, inboundConnectionThreshold: INBOUND_CONNECTION_THRESHOLD, - maxIncomingPendingConnections: MAX_INCOMING_PENDING_CONNECTIONS, - autoDialConcurrency: AUTO_DIAL_CONCURRENCY, - autoDialPriority: AUTO_DIAL_PRIORITY, - autoDialMaxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH, - autoDialPeerRetryThreshold: AUTO_DIAL_PEER_RETRY_THRESHOLD, - autoDialDiscoveredPeersDebounce: AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE + maxIncomingPendingConnections: MAX_INCOMING_PENDING_CONNECTIONS } export interface DefaultConnectionManagerComponents { @@ -176,7 +165,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { private readonly maxConnections: number public readonly dialQueue: DialQueue - public readonly autoDial: AutoDial + public readonly reconnectQueue: ReconnectQueue public readonly connectionPruner: ConnectionPruner private readonly inboundConnectionRateLimiter: RateLimiter private readonly peerStore: PeerStore @@ -186,10 +175,9 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { constructor (components: DefaultConnectionManagerComponents, init: ConnectionManagerInit = {}) { this.maxConnections = init.maxConnections ?? defaultOptions.maxConnections - const minConnections = init.minConnections ?? defaultOptions.minConnections - if (this.maxConnections < minConnections) { - throw new InvalidParametersError('Connection Manager maxConnections must be greater than minConnections') + if (this.maxConnections < 1) { + throw new InvalidParametersError('Connection Manager maxConnections must be greater than 0') } /** @@ -221,21 +209,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { duration: 1 }) - // controls what happens when we don't have enough connections - this.autoDial = new AutoDial({ - connectionManager: this, - peerStore: components.peerStore, - events: components.events, - logger: components.logger - }, { - minConnections, - autoDialConcurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency, - autoDialPriority: init.autoDialPriority ?? defaultOptions.autoDialPriority, - autoDialPeerRetryThreshold: init.autoDialPeerRetryThreshold ?? defaultOptions.autoDialPeerRetryThreshold, - autoDialDiscoveredPeersDebounce: init.autoDialDiscoveredPeersDebounce ?? defaultOptions.autoDialDiscoveredPeersDebounce, - maxQueueLength: init.autoDialMaxQueueLength ?? defaultOptions.autoDialMaxQueueLength - }) - // controls what happens when we have too many connections this.connectionPruner = new ConnectionPruner({ connectionManager: this, @@ -258,6 +231,18 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { }, connections: this.connections }) + + this.reconnectQueue = new ReconnectQueue({ + events: components.events, + peerStore: components.peerStore, + logger: components.logger, + connectionManager: this + }, { + retries: init.reconnectRetries, + retryInterval: init.reconnectRetryInterval, + backoffFactor: init.reconnectBackoffFactor, + maxParallelReconnects: init.maxParallelReconnects + }) } readonly [Symbol.toStringTag] = '@libp2p/connection-manager' @@ -349,45 +334,23 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { } }) - this.dialQueue.start() - this.autoDial.start() + await start( + this.dialQueue, + this.reconnectQueue + ) this.started = true this.log('started') } - async afterStart (): Promise { - // re-connect to any peers with the KEEP_ALIVE tag - void Promise.resolve() - .then(async () => { - const keepAlivePeers: Peer[] = await this.peerStore.all({ - filters: [(peer) => { - return peer.tags.has(KEEP_ALIVE) - }] - }) - - await Promise.all( - keepAlivePeers.map(async peer => { - await this.openConnection(peer.id) - .catch(err => { - this.log.error(err) - }) - }) - ) - }) - .catch(err => { - this.log.error(err) - }) - - this.autoDial.afterStart() - } - /** * Stops the Connection Manager */ async stop (): Promise { - this.dialQueue.stop() - this.autoDial.stop() + await stop( + this.reconnectQueue, + this.dialQueue + ) // Close all connections we're tracking const tasks: Array> = [] diff --git a/packages/libp2p/src/connection-manager/reconnect-queue.ts b/packages/libp2p/src/connection-manager/reconnect-queue.ts new file mode 100644 index 0000000000..75929fe6db --- /dev/null +++ b/packages/libp2p/src/connection-manager/reconnect-queue.ts @@ -0,0 +1,134 @@ +import { KEEP_ALIVE } from '@libp2p/interface' +import { PeerQueue } from '@libp2p/utils/peer-queue' +import pRetry from 'p-retry' +import { MAX_PARALLEL_RECONNECTS } from './constants.js' +import type { ComponentLogger, Libp2pEvents, Logger, Metrics, Peer, PeerId, PeerStore, Startable, TypedEventTarget } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +export interface ReconnectQueueComponents { + connectionManager: ConnectionManager + events: TypedEventTarget + peerStore: PeerStore + logger: ComponentLogger + metrics?: Metrics +} + +export interface ReconnectQueueInit { + retries?: number + retryInterval?: number + backoffFactor?: number + maxParallelReconnects?: number +} + +/** + * When peers tagged with `KEEP_ALIVE` disconnect, this component attempts to + * redial them + */ +export class ReconnectQueue implements Startable { + private readonly log: Logger + private readonly queue: PeerQueue + private started: boolean + private readonly peerStore: PeerStore + private readonly retries: number + private readonly retryInterval?: number + private readonly backoffFactor?: number + private readonly connectionManager: ConnectionManager + + constructor (components: ReconnectQueueComponents, init: ReconnectQueueInit = {}) { + this.log = components.logger.forComponent('libp2p:reconnect-queue') + this.peerStore = components.peerStore + this.connectionManager = components.connectionManager + this.queue = new PeerQueue({ + concurrency: init.maxParallelReconnects ?? MAX_PARALLEL_RECONNECTS, + metricName: 'libp2p_reconnect_queue', + metrics: components.metrics + }) + this.started = false + this.retries = init.retries ?? 5 + this.backoffFactor = init.backoffFactor + this.retryInterval = init.retryInterval + + components.events.addEventListener('peer:disconnect', (evt) => { + this.maybeReconnect(evt.detail) + .catch(err => { + this.log.error('failed to maybe reconnect to %p', evt.detail, err) + }) + }) + } + + private async maybeReconnect (peerId: PeerId): Promise { + if (!this.started) { + return + } + + const peer = await this.peerStore.get(peerId) + + if (!peer.tags.has(KEEP_ALIVE)) { + return + } + + if (this.queue.has(peerId)) { + return + } + + this.queue.add(async (options) => { + await pRetry(async (attempt) => { + if (!this.started) { + return + } + + try { + await this.connectionManager.openConnection(peerId, { + signal: options?.signal + }) + } catch (err) { + this.log('reconnecting to %p attempt %d of %d failed', peerId, attempt, this.retries, err) + throw err + } + }, { + signal: options?.signal, + retries: this.retries, + factor: this.backoffFactor, + minTimeout: this.retryInterval + }) + }, { + peerId + }) + .catch(err => { + this.log.error('failed to reconnect to %p', peerId, err) + }) + } + + start (): void { + this.started = true + } + + async afterStart (): Promise { + // re-connect to any peers with the KEEP_ALIVE tag + void Promise.resolve() + .then(async () => { + const keepAlivePeers: Peer[] = await this.peerStore.all({ + filters: [(peer) => { + return peer.tags.has(KEEP_ALIVE) + }] + }) + + await Promise.all( + keepAlivePeers.map(async peer => { + await this.connectionManager.openConnection(peer.id) + .catch(err => { + this.log.error(err) + }) + }) + ) + }) + .catch(err => { + this.log.error(err) + }) + } + + stop (): void { + this.started = false + this.queue.abort() + } +} diff --git a/packages/libp2p/test/connection-manager/auto-dial.spec.ts b/packages/libp2p/test/connection-manager/auto-dial.spec.ts deleted file mode 100644 index 2d7bdcec03..0000000000 --- a/packages/libp2p/test/connection-manager/auto-dial.spec.ts +++ /dev/null @@ -1,293 +0,0 @@ -/* eslint-env mocha */ - -import { TypedEventEmitter, type TypedEventTarget, type Libp2pEvents, type Connection, type PeerId, type PeerStore, type Peer } from '@libp2p/interface' -import { matchPeerId } from '@libp2p/interface-compliance-tests/matchers' -import { defaultLogger } from '@libp2p/logger' -import { PeerMap } from '@libp2p/peer-collections' -import { createEd25519PeerId } from '@libp2p/peer-id-factory' -import { PersistentPeerStore } from '@libp2p/peer-store' -import { multiaddr } from '@multiformats/multiaddr' -import { expect } from 'aegir/chai' -import { MemoryDatastore } from 'datastore-core' -import delay from 'delay' -import pWaitFor from 'p-wait-for' -import Sinon from 'sinon' -import { stubInterface } from 'sinon-ts' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { defaultComponents } from '../../src/components.js' -import { AutoDial } from '../../src/connection-manager/auto-dial.js' -import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js' -import type { ConnectionManager } from '@libp2p/interface-internal' - -describe('auto-dial', () => { - let autoDialer: AutoDial - let events: TypedEventTarget - let peerStore: PeerStore - let peerId: PeerId - - beforeEach(async () => { - peerId = await createEd25519PeerId() - events = new TypedEventEmitter() - peerStore = new PersistentPeerStore({ - datastore: new MemoryDatastore(), - events, - peerId, - logger: defaultLogger() - }) - }) - - afterEach(() => { - if (autoDialer != null) { - autoDialer.stop() - } - }) - - it('should not dial peers without multiaddrs', async () => { - // peers with protocols are dialled before peers without protocols - const peerWithAddress: Peer = { - id: await createEd25519PeerId(), - protocols: [ - '/foo/bar' - ], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const peerWithoutAddress: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [], - metadata: new Map(), - tags: new Map() - } - - await peerStore.save(peerWithAddress.id, peerWithAddress) - await peerStore.save(peerWithoutAddress.id, peerWithoutAddress) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10, - autoDialInterval: 10000 - }) - autoDialer.start() - void autoDialer.autoDial() - - await pWaitFor(() => { - return connectionManager.openConnection.callCount === 1 - }) - await delay(1000) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithAddress.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithoutAddress.id))).to.be.false() - }) - - it('should not dial connected peers', async () => { - const connectedPeer: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const unConnectedPeer: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - - await peerStore.save(connectedPeer.id, connectedPeer) - await peerStore.save(unConnectedPeer.id, unConnectedPeer) - - const connectionMap = new PeerMap() - connectionMap.set(connectedPeer.id, [stubInterface()]) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(connectionMap), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10 - }) - autoDialer.start() - await autoDialer.autoDial() - - await pWaitFor(() => connectionManager.openConnection.callCount === 1) - await delay(1000) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(unConnectedPeer.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(connectedPeer.id))).to.be.false() - }) - - it('should not dial peers already in the dial queue', async () => { - // peers with protocols are dialled before peers without protocols - const peerInDialQueue: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const peerNotInDialQueue: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - - await peerStore.save(peerInDialQueue.id, peerInDialQueue) - await peerStore.save(peerNotInDialQueue.id, peerNotInDialQueue) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([{ - id: 'foo', - peerId: peerInDialQueue.id, - multiaddrs: [], - status: 'queued' - }]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10 - }) - autoDialer.start() - await autoDialer.autoDial() - - await pWaitFor(() => connectionManager.openConnection.callCount === 1) - await delay(1000) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(peerNotInDialQueue.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(peerInDialQueue.id))).to.be.false() - }) - - it('should not start parallel autodials', async () => { - const peerStoreAllSpy = Sinon.spy(peerStore, 'all') - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10, - autoDialInterval: 10000 - }) - autoDialer.start() - - // call autodial twice - await Promise.all([ - autoDialer.autoDial(), - autoDialer.autoDial() - ]) - - // should only have queried peer store once - expect(peerStoreAllSpy.callCount).to.equal(1) - }) - - it('should not re-dial peers we have recently failed to dial', async () => { - const peerWithAddress: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const undialablePeer: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), - isCertified: true - }], - // we failed to dial them recently - metadata: new Map([[LAST_DIAL_FAILURE_KEY, uint8ArrayFromString(`${Date.now() - 10}`)]]), - tags: new Map() - } - - await peerStore.save(peerWithAddress.id, peerWithAddress) - await peerStore.save(undialablePeer.id, undialablePeer) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10, - autoDialPeerRetryThreshold: 2000 - }) - autoDialer.start() - - void autoDialer.autoDial() - - await pWaitFor(() => { - return connectionManager.openConnection.callCount === 1 - }) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithAddress.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.false() - - // pass the retry threshold - await delay(2000) - - // autodial again - void autoDialer.autoDial() - - await pWaitFor(() => { - return connectionManager.openConnection.callCount === 3 - }) - - // should have retried the unreachable peer - expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.true() - }) -}) diff --git a/packages/libp2p/test/connection-manager/direct.node.ts b/packages/libp2p/test/connection-manager/direct.node.ts index 653d6ba174..a71fe105d2 100644 --- a/packages/libp2p/test/connection-manager/direct.node.ts +++ b/packages/libp2p/test/connection-manager/direct.node.ts @@ -90,7 +90,6 @@ describe('dialing (direct, TCP)', () => { localComponents.peerStore = new PersistentPeerStore(localComponents) localComponents.connectionManager = new DefaultConnectionManager(localComponents, { maxConnections: 100, - minConnections: 50, inboundUpgradeTimeout: 1000 }) localComponents.addressManager = new DefaultAddressManager(localComponents) diff --git a/packages/libp2p/test/connection-manager/direct.spec.ts b/packages/libp2p/test/connection-manager/direct.spec.ts index f1a86da40a..a02ece928d 100644 --- a/packages/libp2p/test/connection-manager/direct.spec.ts +++ b/packages/libp2p/test/connection-manager/direct.spec.ts @@ -55,7 +55,6 @@ describe('dialing (direct, WebSockets)', () => { }) localComponents.connectionManager = new DefaultConnectionManager(localComponents, { maxConnections: 100, - minConnections: 50, inboundUpgradeTimeout: 1000 }) diff --git a/packages/libp2p/test/connection-manager/index.node.ts b/packages/libp2p/test/connection-manager/index.node.ts index 473279bfab..e4cdc1746b 100644 --- a/packages/libp2p/test/connection-manager/index.node.ts +++ b/packages/libp2p/test/connection-manager/index.node.ts @@ -6,10 +6,8 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { dns } from '@multiformats/dns' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' -import delay from 'delay' import all from 'it-all' import { pipe } from 'it-pipe' -import pWaitFor from 'p-wait-for' import sinon from 'sinon' import { stubInterface } from 'sinon-ts' import { defaultComponents } from '../../src/components.js' @@ -59,7 +57,6 @@ describe('Connection Manager', () => { }) const connectionManager = new DefaultConnectionManager(components, { maxConnections: 1000, - minConnections: 50, inboundUpgradeTimeout: 1000 }) @@ -97,7 +94,6 @@ describe('Connection Manager', () => { }) const connectionManager = new DefaultConnectionManager(components, { maxConnections: 1000, - minConnections: 50, inboundUpgradeTimeout: 1000 }) @@ -198,141 +194,6 @@ describe('libp2p.connections', () => { sinon.reset() }) - it('should connect to all the peers stored in the PeerStore, if their number is below minConnections', async () => { - libp2p = await createNode({ - started: false, - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections: 3 - } - } - }) - - // Populate PeerStore before starting - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - await libp2p.peerStore.patch(nodes[1].peerId, { - multiaddrs: nodes[1].getMultiaddrs() - }) - - await libp2p.start() - - // Wait for peers to connect - await pWaitFor(() => libp2p.getConnections().length === 2) - - await libp2p.stop() - }) - - it('should connect to all the peers stored in the PeerStore until reaching the minConnections', async () => { - const minConnections = 1 - libp2p = await createNode({ - started: false, - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections, - maxConnections: 1 - } - } - }) - - // Populate PeerStore before starting - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - await libp2p.peerStore.patch(nodes[1].peerId, { - multiaddrs: nodes[1].getMultiaddrs() - }) - - await libp2p.start() - - // Wait for peer to connect - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === minConnections) - - // Wait more time to guarantee no other connection happened - await delay(200) - expect(libp2p.components.connectionManager.getConnections().length).to.eql(minConnections) - - await libp2p.stop() - }) - - // flaky - it.skip('should connect to all the peers stored in the PeerStore until reaching the minConnections sorted', async () => { - const minConnections = 1 - libp2p = await createNode({ - started: false, - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections, - maxConnections: 1 - } - } - }) - - // Populate PeerStore before starting - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - await libp2p.peerStore.patch(nodes[1].peerId, { - multiaddrs: nodes[1].getMultiaddrs(), - protocols: ['/protocol-min-conns'] - }) - - await libp2p.start() - - // Wait for peer to connect - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === minConnections) - - // Should have connected to the peer with protocols - expect(libp2p.components.connectionManager.getConnections(nodes[0].peerId)).to.be.empty() - expect(libp2p.components.connectionManager.getConnections(nodes[1].peerId)).to.not.be.empty() - - await libp2p.stop() - }) - - it('should connect to peers in the PeerStore when a peer disconnected', async () => { - const minConnections = 1 - - libp2p = await createNode({ - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections - } - } - }) - - // Populate PeerStore after starting (discovery) - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - - // Wait for peer to connect - const conn = await libp2p.dial(nodes[0].peerId) - expect(libp2p.components.connectionManager.getConnections(nodes[0].peerId)).to.not.be.empty() - - await conn.close() - // Closed - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === 0) - // Connected - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === 1) - - expect(libp2p.components.connectionManager.getConnections(nodes[0].peerId)).to.not.be.empty() - - await libp2p.stop() - }) - it('should be closed status once immediately stopping', async () => { libp2p = await createNode({ config: createBaseOptions({ diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index eb0380fae8..9b52ee8a74 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -19,8 +19,6 @@ import type { TransportManager } from '@libp2p/interface-internal' const defaultOptions = { maxConnections: 10, - minConnections: 1, - autoDialInterval: Infinity, inboundUpgradeTimeout: 10000 } @@ -85,8 +83,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, - minConnections: 2 + maxConnections: max } }), started: false @@ -144,8 +141,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, - minConnections: 2 + maxConnections: max } }), started: false @@ -210,7 +206,6 @@ describe('Connection Manager', () => { config: createBaseOptions({ connectionManager: { maxConnections: max, - minConnections: 0, allow: [ '/ip4/83.13.55.32' ] @@ -294,8 +289,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, - minConnections: 0 + maxConnections: max } }), started: false @@ -327,8 +321,7 @@ describe('Connection Manager', () => { await expect(createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: 5, - minConnections: 6 + maxConnections: -1 } }), started: false diff --git a/packages/libp2p/test/connection-manager/reconnect-queue.spec.ts b/packages/libp2p/test/connection-manager/reconnect-queue.spec.ts new file mode 100644 index 0000000000..d952587466 --- /dev/null +++ b/packages/libp2p/test/connection-manager/reconnect-queue.spec.ts @@ -0,0 +1,117 @@ +/* eslint-env mocha */ + +import { KEEP_ALIVE, TypedEventEmitter, start, stop } from '@libp2p/interface' +import { peerLogger } from '@libp2p/logger' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { expect } from 'aegir/chai' +import delay from 'delay' +import pRetry from 'p-retry' +import sinon from 'sinon' +import { type StubbedInstance, stubInterface } from 'sinon-ts' +import { ReconnectQueue } from '../../src/connection-manager/reconnect-queue.js' +import type { ComponentLogger, Libp2pEvents, PeerStore, TypedEventTarget, Peer } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +describe('reconnect queue', () => { + let components: { + connectionManager: StubbedInstance + events: TypedEventTarget + peerStore: StubbedInstance + logger: ComponentLogger + } + let queue: ReconnectQueue + + beforeEach(async () => { + const peerId = await createEd25519PeerId() + + components = { + connectionManager: stubInterface(), + events: new TypedEventEmitter(), + peerStore: stubInterface(), + logger: peerLogger(peerId) + } + }) + + afterEach(async () => { + await stop(queue) + + sinon.reset() + }) + + it('should reconnect to KEEP_ALIVE peers on startup', async () => { + queue = new ReconnectQueue(components) + + const keepAlivePeer = await createEd25519PeerId() + + components.peerStore.all.resolves([ + stubInterface({ + id: keepAlivePeer, + tags: new Map([[KEEP_ALIVE, { + value: 1 + }]]) + }) + ]) + + await start(queue) + + await pRetry(() => { + expect(components.connectionManager.openConnection.calledWith(keepAlivePeer)).to.be.true() + }, { + retries: 5, + factor: 1 + }) + }) + + it('should reconnect to KEEP_ALIVE peers on disconnect', async () => { + queue = new ReconnectQueue(components) + + const keepAlivePeer = await createEd25519PeerId() + + components.peerStore.all.resolves([]) + components.peerStore.get.withArgs(keepAlivePeer).resolves( + stubInterface({ + id: keepAlivePeer, + tags: new Map([[KEEP_ALIVE, { + value: 1 + }]]) + }) + ) + + await start(queue) + + components.events.safeDispatchEvent('peer:disconnect', new CustomEvent('peer:disconnect', { + detail: keepAlivePeer + })) + + await pRetry(() => { + expect(components.connectionManager.openConnection.calledWith(keepAlivePeer)).to.be.true() + }, { + retries: 5, + factor: 1 + }) + }) + + it('should not reconnect to non-KEEP_ALIVE peers on disconnect', async () => { + queue = new ReconnectQueue(components) + + const nonKeepAlivePeer = await createEd25519PeerId() + + components.peerStore.all.resolves([]) + components.peerStore.get.withArgs(nonKeepAlivePeer).resolves( + stubInterface({ + id: nonKeepAlivePeer, + tags: new Map() + }) + ) + + await start(queue) + + components.events.safeDispatchEvent('peer:disconnect', new CustomEvent('peer:disconnect', { + detail: nonKeepAlivePeer + })) + + await delay(1000) + + expect(components.connectionManager.openConnection.calledWith(nonKeepAlivePeer)).to.be.false() + }) +}) diff --git a/packages/libp2p/test/registrar/errors.spec.ts b/packages/libp2p/test/registrar/errors.spec.ts index ff78d2097d..4ed601ab81 100644 --- a/packages/libp2p/test/registrar/errors.spec.ts +++ b/packages/libp2p/test/registrar/errors.spec.ts @@ -31,7 +31,6 @@ describe('registrar errors', () => { }) components.peerStore = new PersistentPeerStore(components) components.connectionManager = new DefaultConnectionManager(components, { - minConnections: 50, maxConnections: 1000, inboundUpgradeTimeout: 1000 }) diff --git a/packages/peer-discovery-bootstrap/package.json b/packages/peer-discovery-bootstrap/package.json index f37f005e1d..8c79d45d68 100644 --- a/packages/peer-discovery-bootstrap/package.json +++ b/packages/peer-discovery-bootstrap/package.json @@ -55,6 +55,7 @@ }, "dependencies": { "@libp2p/interface": "^1.7.0", + "@libp2p/interface-internal": "^1.3.3", "@libp2p/peer-id": "^4.2.4", "@multiformats/mafmt": "^12.1.6", "@multiformats/multiaddr": "^12.2.3" diff --git a/packages/peer-discovery-bootstrap/src/index.ts b/packages/peer-discovery-bootstrap/src/index.ts index eb09310505..140a2078cd 100644 --- a/packages/peer-discovery-bootstrap/src/index.ts +++ b/packages/peer-discovery-bootstrap/src/index.ts @@ -37,10 +37,10 @@ import { peerIdFromString } from '@libp2p/peer-id' import { P2P } from '@multiformats/mafmt' import { multiaddr } from '@multiformats/multiaddr' import type { ComponentLogger, Logger, PeerDiscovery, PeerDiscoveryEvents, PeerInfo, PeerStore, Startable } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' const DEFAULT_BOOTSTRAP_TAG_NAME = 'bootstrap' const DEFAULT_BOOTSTRAP_TAG_VALUE = 50 -const DEFAULT_BOOTSTRAP_TAG_TTL = 120000 const DEFAULT_BOOTSTRAP_DISCOVERY_TIMEOUT = 1000 export interface BootstrapInit { @@ -55,7 +55,9 @@ export interface BootstrapInit { timeout?: number /** - * Tag a bootstrap peer with this name before "discovering" it (default: 'bootstrap') + * Tag a bootstrap peer with this name before "discovering" it + * + * @default 'bootstrap' */ tagName?: string @@ -65,7 +67,7 @@ export interface BootstrapInit { tagValue?: number /** - * Cause the bootstrap peer tag to be removed after this number of ms (default: 2 minutes) + * Cause the bootstrap peer tag to be removed after this number of ms */ tagTTL?: number } @@ -73,6 +75,7 @@ export interface BootstrapInit { export interface BootstrapComponents { peerStore: PeerStore logger: ComponentLogger + connectionManager: ConnectionManager } /** @@ -166,9 +169,10 @@ class Bootstrap extends TypedEventEmitter implements PeerDi tags: { [this._init.tagName ?? DEFAULT_BOOTSTRAP_TAG_NAME]: { value: this._init.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE, - ttl: this._init.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL + ttl: this._init.tagTTL } - } + }, + multiaddrs: peerData.multiaddrs }) // check we are still running @@ -177,6 +181,10 @@ class Bootstrap extends TypedEventEmitter implements PeerDi } this.safeDispatchEvent('peer', { detail: peerData }) + this.components.connectionManager.openConnection(peerData.id) + .catch(err => { + this.log.error('could not dial bootstrap peer %p', peerData.id, err) + }) } } diff --git a/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts b/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts index 8549f71012..7429053fb4 100644 --- a/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts +++ b/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts @@ -7,13 +7,20 @@ import { IPFS } from '@multiformats/mafmt' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { type StubbedInstance, stubInterface } from 'sinon-ts' -import { bootstrap, type BootstrapComponents } from '../src/index.js' +import { bootstrap } from '../src/index.js' import peerList from './fixtures/default-peers.js' import partialValidPeerList from './fixtures/some-invalid-peers.js' -import type { PeerStore } from '@libp2p/interface' +import type { ComponentLogger, PeerStore } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +export interface StubbedBootstrapComponents { + peerStore: PeerStore + logger: ComponentLogger + connectionManager: StubbedInstance +} describe('bootstrap', () => { - let components: BootstrapComponents + let components: StubbedBootstrapComponents let peerStore: StubbedInstance beforeEach(async () => { @@ -21,7 +28,8 @@ describe('bootstrap', () => { components = { peerStore, - logger: defaultLogger() + logger: defaultLogger(), + connectionManager: stubInterface() } }) @@ -49,6 +57,27 @@ describe('bootstrap', () => { await stop(r) }) + it('should dial bootstrap peers', async function () { + this.timeout(5 * 1000) + const r = bootstrap({ + list: peerList, + timeout: 100 + })(components) + + await start(r) + + await new Promise(resolve => { + const interval = setInterval(() => { + if (components.connectionManager.openConnection.callCount === 1) { + clearInterval(interval) + resolve() + } + }, 100) + }) + + await stop(r) + }) + it('should tag bootstrap peers', async function () { this.timeout(5 * 1000) @@ -92,7 +121,10 @@ describe('bootstrap', () => { value: tagValue, ttl: tagTTL } - } + }, + multiaddrs: [ + bootstrapper0ma + ] }) await stop(r) diff --git a/packages/peer-discovery-bootstrap/test/compliance.spec.ts b/packages/peer-discovery-bootstrap/test/compliance.spec.ts index 9a5a46cb95..78c80419df 100644 --- a/packages/peer-discovery-bootstrap/test/compliance.spec.ts +++ b/packages/peer-discovery-bootstrap/test/compliance.spec.ts @@ -6,13 +6,15 @@ import { stubInterface } from 'sinon-ts' import { bootstrap } from '../src/index.js' import peerList from './fixtures/default-peers.js' import type { PeerStore } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' describe('compliance tests', () => { tests({ async setup () { const components = { peerStore: stubInterface(), - logger: defaultLogger() + logger: defaultLogger(), + connectionManager: stubInterface() } return bootstrap({ diff --git a/packages/transport-webrtc/.aegir.js b/packages/transport-webrtc/.aegir.js index e95fcc9e5e..db2ccf46cb 100644 --- a/packages/transport-webrtc/.aegir.js +++ b/packages/transport-webrtc/.aegir.js @@ -35,7 +35,6 @@ export default { }) }, connectionManager: { - minConnections: 0, inboundConnectionThreshold: Infinity } }) diff --git a/packages/transport-webrtc/test/basics.spec.ts b/packages/transport-webrtc/test/basics.spec.ts index e8df7d08ba..22b3667e8b 100644 --- a/packages/transport-webrtc/test/basics.spec.ts +++ b/packages/transport-webrtc/test/basics.spec.ts @@ -44,9 +44,6 @@ async function createNode (): Promise { connectionGater: { denyDialMultiaddr: () => false }, - connectionManager: { - minConnections: 0 - }, services: { identify: identify() } diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index b2c7e10691..7af761bed3 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -21,9 +21,6 @@ describe('libp2p-webtransport', () => { connectionGater: { denyDialMultiaddr: async () => false }, - connectionManager: { - minConnections: 0 - }, services: { ping: ping() }