Skip to content

Commit

Permalink
fix: do not overwrite signal property of options (#2214)
Browse files Browse the repository at this point in the history
Where we set a default abort signal for an operation, copy the incoming
options object instead of overwriting the property as sometimes a
user will reuse the options object they have passed in and be surprised
when a signal that they didn't set aborts a subsequent operation.
  • Loading branch information
achingbrain authored Nov 7, 2023
1 parent fb8a6f1 commit 70d5efc
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 16 deletions.
9 changes: 7 additions & 2 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,16 @@ export class QueryManager implements Startable {

if (options.signal == null) {
// don't let queries run forever
options.signal = AbortSignal.timeout(DEFAULT_QUERY_TIMEOUT)
const signal = AbortSignal.timeout(DEFAULT_QUERY_TIMEOUT)

// this signal will get listened to for network requests, etc
// so make sure we don't make a lot of noise in the logs
setMaxListeners(Infinity, options.signal)
setMaxListeners(Infinity, signal)

options = {
...options,
signal
}
}

const signal = anySignal([this.shutDownController.signal, options.signal])
Expand Down
12 changes: 9 additions & 3 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,15 @@ export class ConnectionImpl implements Connection {

this.status = 'closing'

options.signal = options?.signal ?? AbortSignal.timeout(CLOSE_TIMEOUT)

setMaxListeners(Infinity, options.signal)
if (options.signal == null) {
const signal = AbortSignal.timeout(CLOSE_TIMEOUT)
setMaxListeners(Infinity, signal)

options = {
...options,
signal
}
}

try {
this.#log.trace('closing all streams')
Expand Down
10 changes: 9 additions & 1 deletion packages/libp2p/src/identify/identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,15 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
let stream: Stream | undefined

options.signal = options.signal ?? AbortSignal.timeout(this.timeout)
if (options.signal == null) {
const signal = AbortSignal.timeout(this.timeout)
setMaxListeners(Infinity, signal)

options = {
...options,
signal
}
}

try {
stream = await connection.newStream([this.identifyProtocolStr], {
Expand Down
13 changes: 10 additions & 3 deletions packages/libp2p/src/ping/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,14 @@ class DefaultPingService implements Startable, PingService {
let stream: Stream | undefined
let onAbort = (): void => {}

options.signal = options.signal ?? AbortSignal.timeout(this.timeout)
if (options.signal == null) {
const signal = AbortSignal.timeout(this.timeout)

options = {
...options,
signal
}
}

try {
stream = await connection.newStream(this.protocol, {
Expand All @@ -122,7 +129,7 @@ class DefaultPingService implements Startable, PingService {
}

// make stream abortable
options.signal.addEventListener('abort', onAbort, { once: true })
options.signal?.addEventListener('abort', onAbort, { once: true })

const result = await pipe(
[data],
Expand Down Expand Up @@ -150,7 +157,7 @@ class DefaultPingService implements Startable, PingService {

throw err
} finally {
options.signal.removeEventListener('abort', onAbort)
options.signal?.removeEventListener('abort', onAbort)
if (stream != null) {
await stream.close()
}
Expand Down
10 changes: 8 additions & 2 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import type { ConnectionManager } from '@libp2p/interface-internal/connection-ma
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { Duplex, Source } from 'it-stream-types'

const DEFAULT_PROTOCOL_SELECT_TIMEOUT = 30000

interface CreateConnectionOptions {
cryptoProtocol: string
direction: 'inbound' | 'outbound'
Expand Down Expand Up @@ -439,9 +441,13 @@ export class DefaultUpgrader implements Upgrader {
if (options.signal == null) {
this.#log('No abort signal was passed while trying to negotiate protocols %s falling back to default timeout', protocols)

options.signal = AbortSignal.timeout(30000)
const signal = AbortSignal.timeout(DEFAULT_PROTOCOL_SELECT_TIMEOUT)
setMaxListeners(Infinity, signal)

setMaxListeners(Infinity, options.signal)
options = {
...options,
signal
}
}

const { stream, protocol } = await mss.select(muxedStream, protocols, options)
Expand Down
2 changes: 1 addition & 1 deletion packages/protocol-perf/src/perf-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ export class PerfService implements Startable, PerfServiceInterface {
log('performed %s to %p', this.protocol, connection.remotePeer)
await stream.close()
} catch (err: any) {
log('error sending %s bytes to %p: %s', totalBytesSent, connection.remotePeer, err)
log('error sending %d/%d bytes to %p: %s', totalBytesSent, sendBytes, connection.remotePeer, err)
stream.abort(err)
throw err
}
Expand Down
9 changes: 8 additions & 1 deletion packages/transport-tcp/src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,14 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
return
}

options.signal = options.signal ?? AbortSignal.timeout(closeTimeout)
if (options.signal == null) {
const signal = AbortSignal.timeout(closeTimeout)

options = {
...options,
signal
}
}

try {
log('%s closing socket', lOptsStr)
Expand Down
14 changes: 11 additions & 3 deletions packages/transport-websockets/src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,15 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr,

async close (options: AbortOptions = {}) {
const start = Date.now()
options.signal = options.signal ?? AbortSignal.timeout(CLOSE_TIMEOUT)

if (options.signal == null) {
const signal = AbortSignal.timeout(CLOSE_TIMEOUT)

options = {
...options,
signal
}
}

const listener = (): void => {
const { host, port } = maConn.remoteAddr.toOptions()
Expand All @@ -51,15 +59,15 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr,
this.abort(new CodeError('Socket close timeout', 'ERR_SOCKET_CLOSE_TIMEOUT'))
}

options.signal.addEventListener('abort', listener)
options.signal?.addEventListener('abort', listener)

try {
await stream.close()
} catch (err: any) {
log.error('error closing WebSocket gracefully', err)
this.abort(err)
} finally {
options.signal.removeEventListener('abort', listener)
options.signal?.removeEventListener('abort', listener)
maConn.timeline.close = Date.now()
}
},
Expand Down

0 comments on commit 70d5efc

Please sign in to comment.