Skip to content

Commit 20d0a0e

Browse files
ronagcodebytere
authored andcommitted
stream: add async iterator support for v1 streams
PR-URL: #31316 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
1 parent f75fe9a commit 20d0a0e

File tree

2 files changed

+61
-1
lines changed

2 files changed

+61
-1
lines changed

lib/internal/streams/async_iterator.js

+18
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ const kLastPromise = Symbol('lastPromise');
2020
const kHandlePromise = Symbol('handlePromise');
2121
const kStream = Symbol('stream');
2222

23+
let Readable;
24+
2325
function createIterResult(value, done) {
2426
return { value, done };
2527
}
@@ -145,6 +147,22 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
145147
}, AsyncIteratorPrototype);
146148

147149
const createReadableStreamAsyncIterator = (stream) => {
150+
if (typeof stream.read !== 'function') {
151+
// v1 stream
152+
153+
if (!Readable) {
154+
Readable = require('_stream_readable');
155+
}
156+
157+
const src = stream;
158+
stream = new Readable({ objectMode: true }).wrap(src);
159+
finished(stream, (err) => {
160+
if (typeof src.destroy === 'function') {
161+
src.destroy(err);
162+
}
163+
});
164+
}
165+
148166
const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
149167
[kStream]: { value: stream, writable: true },
150168
[kLastResolve]: { value: null, writable: true },

test/parallel/test-stream-readable-async-iterators.js

+43-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
'use strict';
22

33
const common = require('../common');
4-
const { Readable, Transform, PassThrough, pipeline } = require('stream');
4+
const {
5+
Stream,
6+
Readable,
7+
Transform,
8+
PassThrough,
9+
pipeline
10+
} = require('stream');
511
const assert = require('assert');
612

713
async function tests() {
@@ -14,6 +20,42 @@ async function tests() {
1420
AsyncIteratorPrototype);
1521
}
1622

23+
{
24+
// v1 stream
25+
26+
const stream = new Stream();
27+
stream.destroy = common.mustCall();
28+
process.nextTick(() => {
29+
stream.emit('data', 'hello');
30+
stream.emit('data', 'world');
31+
stream.emit('end');
32+
});
33+
34+
let res = '';
35+
stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator];
36+
for await (const d of stream) {
37+
res += d;
38+
}
39+
assert.strictEqual(res, 'helloworld');
40+
}
41+
42+
{
43+
// v1 stream error
44+
45+
const stream = new Stream();
46+
stream.close = common.mustCall();
47+
process.nextTick(() => {
48+
stream.emit('data', 0);
49+
stream.emit('data', 1);
50+
stream.emit('error', new Error('asd'));
51+
});
52+
53+
const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
54+
iter.next().catch(common.mustCall((err) => {
55+
assert.strictEqual(err.message, 'asd');
56+
}));
57+
}
58+
1759
{
1860
const readable = new Readable({ objectMode: true, read() {} });
1961
readable.push(0);

0 commit comments

Comments
 (0)