Skip to content

Commit

Permalink
fix: stream error timings
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Dec 6, 2023
1 parent 107664a commit 1fa32f2
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 31 deletions.
69 changes: 46 additions & 23 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ module.exports = class BodyReadable extends Readable {
return super.destroy(err)
}

_destroy (err, callback) {
// Workaround for Node "bug". If the stream is destroyed in same
// tick as it is created, then a user who is waiting for a
// promise (i.e micro tick) for installing a 'error' listener will
// never get a chance and will always encounter an unhandled exception.
// - tick => process.nextTick(fn)
// - micro tick => queueMicrotask(fn)
queueMicrotask(() => {
callback(err)
})
}

emit (ev, ...args) {
if (ev === 'data') {
// Node < 16.7
Expand Down Expand Up @@ -166,7 +178,7 @@ module.exports = class BodyReadable extends Readable {
}
}

if (this.closed) {
if (this._readableState.closeEmitted) {
return Promise.resolve(null)
}

Expand Down Expand Up @@ -210,33 +222,44 @@ function isUnusable (self) {
}

async function consume (stream, type) {
if (isUnusable(stream)) {
throw new TypeError('unusable')
}

assert(!stream[kConsume])

return new Promise((resolve, reject) => {
stream[kConsume] = {
type,
stream,
resolve,
reject,
length: 0,
body: []
}
if (isUnusable(stream)) {
const rState = stream._readableState
if (rState.destroyed && rState.closeEmitted === false) {
stream
.on('error', err => {
reject(err)
})
.on('close', () => {
reject(new TypeError('unusable'))
})
} else {
reject(rState.errored ?? new TypeError('unusable'))
}
} else {
stream[kConsume] = {
type,
stream,
resolve,
reject,
length: 0,
body: []
}

stream
.on('error', function (err) {
consumeFinish(this[kConsume], err)
})
.on('close', function () {
if (this[kConsume].body !== null) {
consumeFinish(this[kConsume], new RequestAbortedError())
}
})
stream
.on('error', function (err) {
consumeFinish(this[kConsume], err)
})
.on('close', function () {
if (this[kConsume].body !== null) {
consumeFinish(this[kConsume], new RequestAbortedError())
}
})

process.nextTick(consumeStart, stream[kConsume])
queueMicrotask(() => consumeStart(stream[kConsume]))
}
})
}

Expand Down
22 changes: 14 additions & 8 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1963,12 +1963,19 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
body.resume()
}
}
const onAbort = function () {
if (finished) {
return
const onClose = function () {
// 'close' might be emitted *before* 'error' for
// broken streams. Wait a tick to avoid this case.
queueMicrotask(() => {
// It's only safe to remove 'error' listener after
// 'close'.
body.removeListener('error', onFinished)
})

if (!finished) {
const err = new RequestAbortedError()
queueMicrotask(() => onFinished(err))
}
const err = new RequestAbortedError()
queueMicrotask(() => onFinished(err))
}
const onFinished = function (err) {
if (finished) {
Expand All @@ -1986,8 +1993,7 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
body
.removeListener('data', onData)
.removeListener('end', onFinished)
.removeListener('error', onFinished)
.removeListener('close', onAbort)
.removeListener('close', onClose)

if (!err) {
try {
Expand All @@ -2010,7 +2016,7 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
.on('data', onData)
.on('end', onFinished)
.on('error', onFinished)
.on('close', onAbort)
.on('close', onClose)

if (body.resume) {
body.resume()
Expand Down
37 changes: 37 additions & 0 deletions test/readable.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,40 @@ test('avoid body reordering', async function (t) {

t.equal(text, 'helloworld')
})

test('destroy timing text', async function (t) {
t.plan(1)

function resume () {
}
function abort () {
}
const _err = new Error('kaboom')
const r = new Readable({ resume, abort })
r.destroy(_err)
try {
await r.text()
} catch (err) {
t.same(err, _err)
}
})

test('destroy timing promise', async function (t) {
t.plan(1)

function resume () {
}
function abort () {
}
const r = await new Promise(resolve => {
const r = new Readable({ resume, abort })
r.destroy(new Error('kaboom'))
resolve(r)
})
await new Promise(resolve => {
r.on('error', err => {
t.ok(err)
resolve(null)
})
})
})

0 comments on commit 1fa32f2

Please sign in to comment.