Skip to content

Commit

Permalink
stream: improve Readable.from error handling
Browse files Browse the repository at this point in the history
PR-URL: #37158
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
benjamingr authored and danielleadams committed Feb 16, 2021
1 parent e63b380 commit 1fea051
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
33 changes: 16 additions & 17 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const {
PromisePrototypeThen,
SymbolAsyncIterator,
SymbolIterator
} = primordials;
Expand Down Expand Up @@ -42,9 +43,6 @@ function from(Readable, iterable, opts) {
// being called before last iteration completion.
let reading = false;

// Flag for when iterator needs to be explicitly closed.
let needToClose = false;

readable._read = function() {
if (!reading) {
reading = true;
Expand All @@ -53,18 +51,23 @@ function from(Readable, iterable, opts) {
};

readable._destroy = function(error, cb) {
if (needToClose) {
needToClose = false;
close().then(
() => process.nextTick(cb, error),
(e) => process.nextTick(cb, error || e),
);
} else {
cb(error);
}
PromisePrototypeThen(
close(error),
() => process.nextTick(cb, error), // nextTick is here in case cb throws
(e) => process.nextTick(cb, e || error),
);
};

async function close() {
async function close(error) {
const hadError = (error !== undefined) && (error !== null);
const hasThrow = typeof iterator.throw === 'function';
if (hadError && hasThrow) {
const { value, done } = await iterator.throw(error);
await value;
if (done) {
return;
}
}
if (typeof iterator.return === 'function') {
const { value } = await iterator.return();
await value;
Expand All @@ -73,13 +76,9 @@ function from(Readable, iterable, opts) {

async function next() {
try {
needToClose = false;
const { value, done } = await iterator.next();
needToClose = !done;
if (done) {
readable.push(null);
} else if (readable.destroyed) {
await close();
} else {
const res = await value;
if (res === null) {
Expand Down
23 changes: 22 additions & 1 deletion test/parallel/test-readable-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { mustCall } = require('../common');
const { once } = require('events');
const { Readable } = require('stream');
const { strictEqual, throws } = require('assert');
const common = require('../common');

{
throws(() => {
Expand Down Expand Up @@ -187,6 +188,25 @@ async function endWithError() {
}
}

async function destroyingStreamWithErrorThrowsInGenerator() {
const validateError = common.mustCall((e) => {
strictEqual(e, 'Boum');
});
async function* generate() {
try {
yield 1;
yield 2;
yield 3;
throw new Error();
} catch (e) {
validateError(e);
}
}
const stream = Readable.from(generate());
stream.read();
stream.once('error', common.mustCall());
stream.destroy('Boum');
}

Promise.all([
toReadableBasicSupport(),
Expand All @@ -198,5 +218,6 @@ Promise.all([
toReadableOnDataNonObject(),
destroysTheStreamWhenThrowing(),
asTransformStream(),
endWithError()
endWithError(),
destroyingStreamWithErrorThrowsInGenerator(),
]).then(mustCall());

0 comments on commit 1fea051

Please sign in to comment.