From 70d5efc2e901a2c419fe3f82d767f278b6d698fd Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 7 Nov 2023 16:53:39 +0000 Subject: [PATCH] fix: do not overwrite signal property of options (#2214) Where we set a default abort signal for an operation, copy the incoming options object instead of overwriting the property as sometimes a user will reuse the options object they have passed in and be surprised when a signal that they didn't set aborts a subsequent operation. --- packages/kad-dht/src/query/manager.ts | 9 +++++++-- packages/libp2p/src/connection/index.ts | 12 +++++++++--- packages/libp2p/src/identify/identify.ts | 10 +++++++++- packages/libp2p/src/ping/index.ts | 13 ++++++++++--- packages/libp2p/src/upgrader.ts | 10 ++++++++-- packages/protocol-perf/src/perf-service.ts | 2 +- packages/transport-tcp/src/socket-to-conn.ts | 9 ++++++++- .../transport-websockets/src/socket-to-conn.ts | 14 +++++++++++--- 8 files changed, 63 insertions(+), 16 deletions(-) diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index a163099c40..e797d1e823 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -113,11 +113,16 @@ export class QueryManager implements Startable { if (options.signal == null) { // don't let queries run forever - options.signal = AbortSignal.timeout(DEFAULT_QUERY_TIMEOUT) + const signal = AbortSignal.timeout(DEFAULT_QUERY_TIMEOUT) // this signal will get listened to for network requests, etc // so make sure we don't make a lot of noise in the logs - setMaxListeners(Infinity, options.signal) + setMaxListeners(Infinity, signal) + + options = { + ...options, + signal + } } const signal = anySignal([this.shutDownController.signal, options.signal]) diff --git a/packages/libp2p/src/connection/index.ts b/packages/libp2p/src/connection/index.ts index fbc62c72f1..ab0004256e 100644 --- a/packages/libp2p/src/connection/index.ts +++ b/packages/libp2p/src/connection/index.ts @@ -155,9 +155,15 @@ export class ConnectionImpl implements Connection { this.status = 'closing' - options.signal = options?.signal ?? AbortSignal.timeout(CLOSE_TIMEOUT) - - setMaxListeners(Infinity, options.signal) + if (options.signal == null) { + const signal = AbortSignal.timeout(CLOSE_TIMEOUT) + setMaxListeners(Infinity, signal) + + options = { + ...options, + signal + } + } try { this.#log.trace('closing all streams') diff --git a/packages/libp2p/src/identify/identify.ts b/packages/libp2p/src/identify/identify.ts index 8fef5c9949..300c1c7fbf 100644 --- a/packages/libp2p/src/identify/identify.ts +++ b/packages/libp2p/src/identify/identify.ts @@ -257,7 +257,15 @@ export class DefaultIdentifyService implements Startable, IdentifyService { async _identify (connection: Connection, options: AbortOptions = {}): Promise { let stream: Stream | undefined - options.signal = options.signal ?? AbortSignal.timeout(this.timeout) + if (options.signal == null) { + const signal = AbortSignal.timeout(this.timeout) + setMaxListeners(Infinity, signal) + + options = { + ...options, + signal + } + } try { stream = await connection.newStream([this.identifyProtocolStr], { diff --git a/packages/libp2p/src/ping/index.ts b/packages/libp2p/src/ping/index.ts index ea8d23db5e..1608f837b5 100644 --- a/packages/libp2p/src/ping/index.ts +++ b/packages/libp2p/src/ping/index.ts @@ -109,7 +109,14 @@ class DefaultPingService implements Startable, PingService { let stream: Stream | undefined let onAbort = (): void => {} - options.signal = options.signal ?? AbortSignal.timeout(this.timeout) + if (options.signal == null) { + const signal = AbortSignal.timeout(this.timeout) + + options = { + ...options, + signal + } + } try { stream = await connection.newStream(this.protocol, { @@ -122,7 +129,7 @@ class DefaultPingService implements Startable, PingService { } // make stream abortable - options.signal.addEventListener('abort', onAbort, { once: true }) + options.signal?.addEventListener('abort', onAbort, { once: true }) const result = await pipe( [data], @@ -150,7 +157,7 @@ class DefaultPingService implements Startable, PingService { throw err } finally { - options.signal.removeEventListener('abort', onAbort) + options.signal?.removeEventListener('abort', onAbort) if (stream != null) { await stream.close() } diff --git a/packages/libp2p/src/upgrader.ts b/packages/libp2p/src/upgrader.ts index f6b0bc574a..98d7326737 100644 --- a/packages/libp2p/src/upgrader.ts +++ b/packages/libp2p/src/upgrader.ts @@ -20,6 +20,8 @@ import type { ConnectionManager } from '@libp2p/interface-internal/connection-ma import type { Registrar } from '@libp2p/interface-internal/registrar' import type { Duplex, Source } from 'it-stream-types' +const DEFAULT_PROTOCOL_SELECT_TIMEOUT = 30000 + interface CreateConnectionOptions { cryptoProtocol: string direction: 'inbound' | 'outbound' @@ -439,9 +441,13 @@ export class DefaultUpgrader implements Upgrader { if (options.signal == null) { this.#log('No abort signal was passed while trying to negotiate protocols %s falling back to default timeout', protocols) - options.signal = AbortSignal.timeout(30000) + const signal = AbortSignal.timeout(DEFAULT_PROTOCOL_SELECT_TIMEOUT) + setMaxListeners(Infinity, signal) - setMaxListeners(Infinity, options.signal) + options = { + ...options, + signal + } } const { stream, protocol } = await mss.select(muxedStream, protocols, options) diff --git a/packages/protocol-perf/src/perf-service.ts b/packages/protocol-perf/src/perf-service.ts index 4f23cdbfe2..983490fc2f 100644 --- a/packages/protocol-perf/src/perf-service.ts +++ b/packages/protocol-perf/src/perf-service.ts @@ -199,7 +199,7 @@ export class PerfService implements Startable, PerfServiceInterface { log('performed %s to %p', this.protocol, connection.remotePeer) await stream.close() } catch (err: any) { - log('error sending %s bytes to %p: %s', totalBytesSent, connection.remotePeer, err) + log('error sending %d/%d bytes to %p: %s', totalBytesSent, sendBytes, connection.remotePeer, err) stream.abort(err) throw err } diff --git a/packages/transport-tcp/src/socket-to-conn.ts b/packages/transport-tcp/src/socket-to-conn.ts index fba2708330..c9093bde98 100644 --- a/packages/transport-tcp/src/socket-to-conn.ts +++ b/packages/transport-tcp/src/socket-to-conn.ts @@ -126,7 +126,14 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio return } - options.signal = options.signal ?? AbortSignal.timeout(closeTimeout) + if (options.signal == null) { + const signal = AbortSignal.timeout(closeTimeout) + + options = { + ...options, + signal + } + } try { log('%s closing socket', lOptsStr) diff --git a/packages/transport-websockets/src/socket-to-conn.ts b/packages/transport-websockets/src/socket-to-conn.ts index 91a44a5a44..b0498de33b 100644 --- a/packages/transport-websockets/src/socket-to-conn.ts +++ b/packages/transport-websockets/src/socket-to-conn.ts @@ -41,7 +41,15 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, async close (options: AbortOptions = {}) { const start = Date.now() - options.signal = options.signal ?? AbortSignal.timeout(CLOSE_TIMEOUT) + + if (options.signal == null) { + const signal = AbortSignal.timeout(CLOSE_TIMEOUT) + + options = { + ...options, + signal + } + } const listener = (): void => { const { host, port } = maConn.remoteAddr.toOptions() @@ -51,7 +59,7 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, this.abort(new CodeError('Socket close timeout', 'ERR_SOCKET_CLOSE_TIMEOUT')) } - options.signal.addEventListener('abort', listener) + options.signal?.addEventListener('abort', listener) try { await stream.close() @@ -59,7 +67,7 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, log.error('error closing WebSocket gracefully', err) this.abort(err) } finally { - options.signal.removeEventListener('abort', listener) + options.signal?.removeEventListener('abort', listener) maConn.timeline.close = Date.now() } },