From 507240cec74f463fdff76922bb979e765f84555a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vadzim=20Zie=C5=84ka?= Date: Tue, 14 Apr 2020 16:25:55 +0300 Subject: [PATCH] stream: close iterator in Readable.from Call iterator.return() if not all of its values are consumed. Fixes: https://github.com/nodejs/node/issues/32842 PR-URL: https://github.com/nodejs/node/pull/32844 Reviewed-By: Robert Nagy Reviewed-By: Matteo Collina Reviewed-By: Zeyu Yang --- lib/internal/streams/from.js | 32 +++- .../test-readable-from-iterator-closing.js | 174 ++++++++++++++++++ 2 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-readable-from-iterator-closing.js diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index ab6db00a125a0b..ca567914bbf0fe 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -34,21 +34,51 @@ function from(Readable, iterable, opts) { objectMode: true, ...opts }); + // Reading boolean to protect against _read // being called before last iteration completion. let reading = false; + + // needToClose boolean if iterator needs to be explicitly closed + let needToClose = false; + readable._read = function() { if (!reading) { reading = true; next(); } }; + + readable._destroy = function(error, cb) { + if (needToClose) { + needToClose = false; + close().then( + () => process.nextTick(cb, error), + (e) => process.nextTick(cb, error || e), + ); + } else { + cb(error); + } + }; + + async function close() { + if (typeof iterator.return === 'function') { + const { value } = await iterator.return(); + await value; + } + } + async function next() { try { + needToClose = false; const { value, done } = await iterator.next(); + needToClose = !done; + const resolved = await value; if (done) { readable.push(null); - } else if (readable.push(await value)) { + } else if (readable.destroyed) { + await close(); + } else if (readable.push(resolved)) { next(); } else { reading = false; diff --git a/test/parallel/test-readable-from-iterator-closing.js b/test/parallel/test-readable-from-iterator-closing.js new file mode 100644 index 00000000000000..2d42e3e14010d5 --- /dev/null +++ b/test/parallel/test-readable-from-iterator-closing.js @@ -0,0 +1,174 @@ +'use strict'; + +const { mustCall, mustNotCall } = require('../common'); +const { Readable } = require('stream'); +const { strictEqual } = require('assert'); + +async function asyncSupport() { + const finallyMustCall = mustCall(); + const bodyMustCall = mustCall(); + + async function* infiniteGenerate() { + try { + while (true) yield 'a'; + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + break; + } +} + +async function syncSupport() { + const finallyMustCall = mustCall(); + const bodyMustCall = mustCall(); + + function* infiniteGenerate() { + try { + while (true) yield 'a'; + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + break; + } +} + +async function syncPromiseSupport() { + const returnMustBeAwaited = mustCall(); + const bodyMustCall = mustCall(); + + function* infiniteGenerate() { + try { + while (true) yield Promise.resolve('a'); + } finally { + // eslint-disable-next-line no-unsafe-finally + return { then(cb) { + returnMustBeAwaited(); + cb(); + } }; + } + } + + const stream = Readable.from(infiniteGenerate()); + + for await (const chunk of stream) { + bodyMustCall(); + strictEqual(chunk, 'a'); + break; + } +} + +async function syncRejectedSupport() { + const returnMustBeAwaited = mustCall(); + const bodyMustNotCall = mustNotCall(); + const catchMustCall = mustCall(); + const secondNextMustNotCall = mustNotCall(); + + function* generate() { + try { + yield Promise.reject('a'); + secondNextMustNotCall(); + } finally { + // eslint-disable-next-line no-unsafe-finally + return { then(cb) { + returnMustBeAwaited(); + cb(); + } }; + } + } + + const stream = Readable.from(generate()); + + try { + for await (const chunk of stream) { + bodyMustNotCall(chunk); + } + } catch { + catchMustCall(); + } +} + +async function noReturnAfterThrow() { + const returnMustNotCall = mustNotCall(); + const bodyMustNotCall = mustNotCall(); + const catchMustCall = mustCall(); + const nextMustCall = mustCall(); + + const stream = Readable.from({ + [Symbol.asyncIterator]() { return this; }, + async next() { + nextMustCall(); + throw new Error('a'); + }, + async return() { + returnMustNotCall(); + return { done: true }; + }, + }); + + try { + for await (const chunk of stream) { + bodyMustNotCall(chunk); + } + } catch { + catchMustCall(); + } +} + +async function closeStreamWhileNextIsPending() { + const finallyMustCall = mustCall(); + const dataMustCall = mustCall(); + + let resolveDestroy; + const destroyed = + new Promise((resolve) => { resolveDestroy = mustCall(resolve); }); + let resolveYielded; + const yielded = + new Promise((resolve) => { resolveYielded = mustCall(resolve); }); + + async function* infiniteGenerate() { + try { + while (true) { + yield 'a'; + resolveYielded(); + await destroyed; + } + } finally { + finallyMustCall(); + } + } + + const stream = Readable.from(infiniteGenerate()); + + stream.on('data', (data) => { + dataMustCall(); + strictEqual(data, 'a'); + }); + + yielded.then(() => { + stream.destroy(); + resolveDestroy(); + }); +} + +Promise.all([ + asyncSupport(), + syncSupport(), + syncPromiseSupport(), + syncRejectedSupport(), + noReturnAfterThrow(), + closeStreamWhileNextIsPending(), +]).then(mustCall());