Skip to content

Commit 125ba87

Browse files
committed
stream: close iterator in Readable.from
Use for-of loop to traverse iterator and properly close it if not all of its values are consumed. Fixes: #32842
1 parent cbe955c commit 125ba87

File tree

2 files changed

+177
-28
lines changed

2 files changed

+177
-28
lines changed

lib/internal/streams/from.js

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
const {
44
SymbolAsyncIterator,
5-
SymbolIterator
5+
SymbolIterator,
6+
Promise
67
} = primordials;
78
const { Buffer } = require('buffer');
89

@@ -11,7 +12,6 @@ const {
1112
} = require('internal/errors').codes;
1213

1314
function from(Readable, iterable, opts) {
14-
let iterator;
1515
if (typeof iterable === 'string' || iterable instanceof Buffer) {
1616
return new Readable({
1717
objectMode: true,
@@ -23,41 +23,73 @@ function from(Readable, iterable, opts) {
2323
});
2424
}
2525

26-
if (iterable && iterable[SymbolAsyncIterator])
27-
iterator = iterable[SymbolAsyncIterator]();
28-
else if (iterable && iterable[SymbolIterator])
29-
iterator = iterable[SymbolIterator]();
30-
else
31-
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
26+
let onDataNeeded;
3227

3328
const readable = new Readable({
3429
objectMode: true,
35-
...opts
30+
...opts,
31+
read() {
32+
onDataNeeded && onDataNeeded();
33+
},
34+
async destroy(error, cb) {
35+
onDataNeeded && onDataNeeded();
36+
try {
37+
await pumping;
38+
} catch (e) {
39+
// Do not hide present error
40+
if (!error) error = e;
41+
}
42+
cb(error);
43+
},
3644
});
37-
// Reading boolean to protect against _read
38-
// being called before last iteration completion.
39-
let reading = false;
40-
readable._read = function() {
41-
if (!reading) {
42-
reading = true;
43-
next();
44-
}
45-
};
46-
async function next() {
45+
46+
if (!iterable[SymbolAsyncIterator] && !iterable[SymbolIterator])
47+
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
48+
49+
const pumping = pump();
50+
51+
return readable;
52+
53+
async function pump() {
54+
/*
55+
We're iterating over sync or async iterator with the appropriate sync
56+
or async version of the `for-of` loop.
57+
58+
`for-await-of` loop has an edge case when looping over synchronous
59+
iterator.
60+
61+
It does not close synchronous iterator with .return() if that iterator
62+
yields rejected Promise, so finally blocks within such an iterator are
63+
never executed.
64+
65+
In the application code developers can choose between async and sync
66+
forms of the loop depending on their needs, but in the library code we
67+
have to handle such edge cases properly and close iterators anyway.
68+
*/
4769
try {
48-
const { value, done } = await iterator.next();
49-
if (done) {
50-
readable.push(null);
51-
} else if (readable.push(await value)) {
52-
next();
70+
if (iterable[SymbolAsyncIterator]) {
71+
for await (const data of iterable) {
72+
if (readable.destroyed) return;
73+
if (!readable.push(data)) {
74+
await new Promise((resolve) => { onDataNeeded = resolve; });
75+
if (readable.destroyed) return;
76+
}
77+
}
5378
} else {
54-
reading = false;
79+
for (const data of iterable) {
80+
const value = await data;
81+
if (readable.destroyed) return;
82+
if (!readable.push(value)) {
83+
await new Promise((resolve) => { onDataNeeded = resolve; });
84+
if (readable.destroyed) return;
85+
}
86+
}
5587
}
56-
} catch (err) {
57-
readable.destroy(err);
88+
if (!readable.destroyed) readable.push(null);
89+
} catch (error) {
90+
if (!readable.destroyed) readable.destroy(error);
5891
}
5992
}
60-
return readable;
6193
}
6294

6395
module.exports = from;
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
'use strict';
2+
3+
const { mustCall, mustNotCall } = require('../common');
4+
const { Readable } = require('stream');
5+
const { strictEqual } = require('assert');
6+
7+
async function asyncSupport() {
8+
const finallyMustCall = mustCall();
9+
async function* generate() {
10+
try {
11+
yield 'a';
12+
mustNotCall('only first item is read');
13+
} finally {
14+
finallyMustCall();
15+
}
16+
}
17+
18+
const stream = Readable.from(generate());
19+
20+
for await (const chunk of stream) {
21+
strictEqual(chunk, 'a');
22+
break;
23+
}
24+
}
25+
26+
asyncSupport().then(mustCall());
27+
28+
async function syncSupport() {
29+
const finallyMustCall = mustCall();
30+
function* generate() {
31+
try {
32+
yield 'a';
33+
mustNotCall('only first item is read');
34+
} finally {
35+
finallyMustCall();
36+
}
37+
}
38+
39+
const stream = Readable.from(generate());
40+
41+
for await (const chunk of stream) {
42+
strictEqual(chunk, 'a');
43+
break;
44+
}
45+
}
46+
47+
syncSupport().then(mustCall());
48+
49+
async function syncPromiseSupport() {
50+
const finallyMustCall = mustCall();
51+
function* generate() {
52+
try {
53+
yield Promise.resolve('a');
54+
mustNotCall('only first item is read');
55+
} finally {
56+
finallyMustCall();
57+
}
58+
}
59+
60+
const stream = Readable.from(generate());
61+
62+
for await (const chunk of stream) {
63+
strictEqual(chunk, 'a');
64+
break;
65+
}
66+
}
67+
68+
syncPromiseSupport().then(mustCall());
69+
70+
async function syncRejectedSupport() {
71+
const finallyMustCall = mustCall();
72+
const noBodyCall = mustNotCall();
73+
const catchMustCall = mustCall();
74+
75+
function* generate() {
76+
try {
77+
yield Promise.reject('a');
78+
mustNotCall();
79+
} finally {
80+
finallyMustCall();
81+
}
82+
}
83+
84+
const stream = Readable.from(generate());
85+
86+
try {
87+
for await (const chunk of stream) {
88+
noBodyCall(chunk);
89+
}
90+
} catch {
91+
catchMustCall();
92+
}
93+
}
94+
95+
syncRejectedSupport().then(mustCall());
96+
97+
async function noReturnAfterThrow() {
98+
const returnMustNotCall = mustNotCall();
99+
const noBodyCall = mustNotCall();
100+
const catchMustCall = mustCall();
101+
102+
const stream = Readable.from({
103+
[Symbol.asyncIterator]() { return this; },
104+
async next() { throw new Error('a'); },
105+
async return() { returnMustNotCall(); return { done: true }; },
106+
});
107+
108+
try {
109+
for await (const chunk of stream) {
110+
noBodyCall(chunk);
111+
}
112+
} catch {
113+
catchMustCall();
114+
}
115+
}
116+
117+
noReturnAfterThrow().then(mustCall());

0 commit comments

Comments
 (0)