Skip to content

Commit 5a95fa4

Browse files
ronagcodebytere
authored andcommitted
stream: normalize async iterator stream destroy
PR-URL: #31316 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
1 parent 20d0a0e commit 5a95fa4

File tree

2 files changed

+30
-6
lines changed

2 files changed

+30
-6
lines changed

lib/internal/streams/async_iterator.js

+11-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ const kStream = Symbol('stream');
2222

2323
let Readable;
2424

25+
function destroy(stream, err) {
26+
// request.destroy just do .end - .abort is what we want
27+
if (typeof stream.abort === 'function') return stream.abort();
28+
if (stream.req &&
29+
typeof stream.req.abort === 'function') return stream.req.abort();
30+
if (typeof stream.destroy === 'function') return stream.destroy(err);
31+
if (typeof stream.close === 'function') return stream.close();
32+
}
33+
2534
function createIterResult(value, done) {
2635
return { value, done };
2736
}
@@ -141,7 +150,7 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
141150
resolve(createIterResult(undefined, true));
142151
}
143152
});
144-
stream.destroy();
153+
destroy(stream);
145154
});
146155
},
147156
}, AsyncIteratorPrototype);
@@ -156,11 +165,7 @@ const createReadableStreamAsyncIterator = (stream) => {
156165

157166
const src = stream;
158167
stream = new Readable({ objectMode: true }).wrap(src);
159-
finished(stream, (err) => {
160-
if (typeof src.destroy === 'function') {
161-
src.destroy(err);
162-
}
163-
});
168+
finished(stream, (err) => destroy(src, err));
164169
}
165170

166171
const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {

test/parallel/test-stream-readable-async-iterators.js

+19
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,25 @@ async function tests() {
5656
}));
5757
}
5858

59+
{
60+
// Non standard stream cleanup
61+
62+
const readable = new Readable({ autoDestroy: false, read() {} });
63+
readable.push('asd');
64+
readable.push('asd');
65+
readable.destroy = null;
66+
readable.close = common.mustCall(() => {
67+
readable.emit('close');
68+
});
69+
70+
await (async () => {
71+
for await (const d of readable) {
72+
d;
73+
return;
74+
}
75+
})();
76+
}
77+
5978
{
6079
const readable = new Readable({ objectMode: true, read() {} });
6180
readable.push(0);

0 commit comments

Comments
 (0)