Skip to content

Commit

Permalink
perf: optimize Readable.dump (#2402)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag authored Nov 6, 2023
1 parent deccde6 commit 7b5c851
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
65 changes: 40 additions & 25 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const kBody = Symbol('kBody')
const kAbort = Symbol('abort')
const kContentType = Symbol('kContentType')

const noop = () => {}

module.exports = class BodyReadable extends Readable {
constructor ({
resume,
Expand Down Expand Up @@ -149,37 +151,50 @@ module.exports = class BodyReadable extends Readable {
return this[kBody]
}

async dump (opts) {
dump (opts) {
let limit = opts && Number.isFinite(opts.limit) ? opts.limit : 262144
const signal = opts && opts.signal
const abortFn = () => {
this.destroy()
}
let signalListenerCleanup

if (signal) {
if (typeof signal !== 'object' || !('aborted' in signal)) {
throw new InvalidArgumentError('signal must be an AbortSignal')
}
util.throwIfAborted(signal)
signalListenerCleanup = util.addAbortListener(signal, abortFn)
}
try {
for await (const chunk of this) {
util.throwIfAborted(signal)
limit -= Buffer.byteLength(chunk)
if (limit < 0) {
return
try {
if (typeof signal !== 'object' || !('aborted' in signal)) {
throw new InvalidArgumentError('signal must be an AbortSignal')
}
util.throwIfAborted(signal)
} catch (err) {
return Promise.reject(err)
}
} catch {
util.throwIfAborted(signal)
} finally {
if (typeof signalListenerCleanup === 'function') {
signalListenerCleanup()
} else if (signalListenerCleanup) {
signalListenerCleanup[Symbol.dispose]()
}
}

if (this.closed) {
return Promise.resolve(null)
}

return new Promise((resolve, reject) => {
const signalListenerCleanup = signal
? util.addAbortListener(signal, () => {
this.destroy()
})
: noop

this
.on('close', function () {
signalListenerCleanup()
if (signal?.aborted) {
reject(signal.reason || Object.assign(new Error('The operation was aborted'), { name: 'AbortError' }))
} else {
resolve(null)
}
})
.on('error', noop)
.on('data', function (chunk) {
limit -= chunk.length
if (limit <= 0) {
this.destroy()
}
})
.resume()
})
}
}

Expand Down
9 changes: 0 additions & 9 deletions lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -431,16 +431,7 @@ function throwIfAborted (signal) {
}
}

let events
function addAbortListener (signal, listener) {
if (typeof Symbol.dispose === 'symbol') {
if (!events) {
events = require('events')
}
if (typeof events.addAbortListener === 'function' && 'aborted' in signal) {
return events.addAbortListener(signal, listener)
}
}
if ('addEventListener' in signal) {
signal.addEventListener('abort', listener, { once: true })
return () => signal.removeEventListener('abort', listener)
Expand Down

0 comments on commit 7b5c851

Please sign in to comment.