diff --git a/benchmark/streams/readable-async-iterator.js b/benchmark/streams/readable-async-iterator.js new file mode 100644 index 00000000000000..dbe335ab98b651 --- /dev/null +++ b/benchmark/streams/readable-async-iterator.js @@ -0,0 +1,38 @@ +'use strict'; + +const common = require('../common'); +const Readable = require('stream').Readable; + +const bench = common.createBenchmark(main, { + n: [1e5], + sync: ['yes', 'no'], +}); + +async function main({ n, sync }) { + sync = sync === 'yes'; + + const s = new Readable({ + objectMode: true, + read() { + if (sync) { + this.push(1); + } else { + process.nextTick(() => { + this.push(1); + }); + } + } + }); + + bench.start(); + + let x = 0; + for await (const chunk of s) { + x += chunk; + if (x > n) { + break; + } + } + + bench.end(n); +} diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index af8786198674cb..a8e104588f1028 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -27,6 +27,7 @@ const { NumberIsNaN, ObjectDefineProperties, ObjectSetPrototypeOf, + Promise, Set, SymbolAsyncIterator, Symbol @@ -59,11 +60,11 @@ const kPaused = Symbol('kPaused'); // Lazy loaded to improve the startup performance. let StringDecoder; -let createReadableStreamAsyncIterator; let from; ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); +function nop() {} const { errorOrDestroy } = destroyImpl; @@ -1075,13 +1076,68 @@ Readable.prototype.wrap = function(stream) { }; Readable.prototype[SymbolAsyncIterator] = function() { - if (createReadableStreamAsyncIterator === undefined) { - createReadableStreamAsyncIterator = - require('internal/streams/async_iterator'); + let stream = this; + + if (typeof stream.read !== 'function') { + // v1 stream + const src = stream; + stream = new Readable({ + objectMode: true, + destroy(err, callback) { + destroyImpl.destroyer(src, err); + callback(); + } + }).wrap(src); } - return createReadableStreamAsyncIterator(this); + + const iter = createAsyncIterator(stream); + iter.stream = stream; + return iter; }; +async function* createAsyncIterator(stream) { + let callback = nop; + + function next(resolve) { + if (this === stream) { + callback(); + callback = nop; + } else { + callback = resolve; + } + } + + stream + .on('readable', next) + .on('error', next) + .on('end', next) + .on('close', next); + + try { + const state = stream._readableState; + while (true) { + const chunk = stream.read(); + if (chunk !== null) { + yield chunk; + } else if (state.errored) { + throw state.errored; + } else if (state.ended) { + break; + } else if (state.closed) { + // TODO(ronag): ERR_PREMATURE_CLOSE? + break; + } else { + await new Promise(next); + } + } + } catch (err) { + destroyImpl.destroyer(stream, err); + throw err; + } finally { + destroyImpl.destroyer(stream, null); + } +} + // Making it explicit these properties are not enumerable // because otherwise some prototype manipulation in // userland will fail. diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js deleted file mode 100644 index 87b7f227d59070..00000000000000 --- a/lib/internal/streams/async_iterator.js +++ /dev/null @@ -1,221 +0,0 @@ -'use strict'; - -const { - ObjectCreate, - ObjectGetPrototypeOf, - ObjectSetPrototypeOf, - Promise, - PromiseReject, - PromiseResolve, - Symbol, -} = primordials; - -const finished = require('internal/streams/end-of-stream'); -const destroyImpl = require('internal/streams/destroy'); - -const kLastResolve = Symbol('lastResolve'); -const kLastReject = Symbol('lastReject'); -const kError = Symbol('error'); -const kEnded = Symbol('ended'); -const kLastPromise = Symbol('lastPromise'); -const kHandlePromise = Symbol('handlePromise'); -const kStream = Symbol('stream'); - -let Readable; - -function createIterResult(value, done) { - return { value, done }; -} - -function readAndResolve(iter) { - const resolve = iter[kLastResolve]; - if (resolve !== null) { - const data = iter[kStream].read(); - // We defer if data is null. We can be expecting either 'end' or 'error'. - if (data !== null) { - iter[kLastPromise] = null; - iter[kLastResolve] = null; - iter[kLastReject] = null; - resolve(createIterResult(data, false)); - } - } -} - -function onReadable(iter) { - // We wait for the next tick, because it might - // emit an error with `process.nextTick()`. - process.nextTick(readAndResolve, iter); -} - -function wrapForNext(lastPromise, iter) { - return (resolve, reject) => { - lastPromise.then(() => { - if (iter[kEnded]) { - resolve(createIterResult(undefined, true)); - return; - } - - iter[kHandlePromise](resolve, reject); - }, reject); - }; -} - -const AsyncIteratorPrototype = ObjectGetPrototypeOf( - ObjectGetPrototypeOf(async function* () {}).prototype); - -function finish(self, err) { - return new Promise((resolve, reject) => { - const stream = self[kStream]; - - finished(stream, (err) => { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - reject(err); - } else { - resolve(createIterResult(undefined, true)); - } - }); - destroyImpl.destroyer(stream, err); - }); -} - -const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ - get stream() { - return this[kStream]; - }, - - next() { - // If we have detected an error in the meanwhile - // reject straight away. - const error = this[kError]; - if (error !== null) { - return PromiseReject(error); - } - - if (this[kEnded]) { - return PromiseResolve(createIterResult(undefined, true)); - } - - if (this[kStream].destroyed) { - return new Promise((resolve, reject) => { - if (this[kError]) { - reject(this[kError]); - } else if (this[kEnded]) { - resolve(createIterResult(undefined, true)); - } else { - finished(this[kStream], (err) => { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - reject(err); - } else { - resolve(createIterResult(undefined, true)); - } - }); - } - }); - } - - // If we have multiple next() calls we will wait for the previous Promise to - // finish. This logic is optimized to support for await loops, where next() - // is only called once at a time. - const lastPromise = this[kLastPromise]; - let promise; - - if (lastPromise) { - promise = new Promise(wrapForNext(lastPromise, this)); - } else { - // Fast path needed to support multiple this.push() - // without triggering the next() queue. - const data = this[kStream].read(); - if (data !== null) { - return PromiseResolve(createIterResult(data, false)); - } - - promise = new Promise(this[kHandlePromise]); - } - - this[kLastPromise] = promise; - - return promise; - }, - - return() { - return finish(this); - }, - - throw(err) { - return finish(this, err); - }, -}, AsyncIteratorPrototype); - -const createReadableStreamAsyncIterator = (stream) => { - if (typeof stream.read !== 'function') { - // v1 stream - - if (!Readable) { - Readable = require('_stream_readable'); - } - - const src = stream; - stream = new Readable({ objectMode: true }).wrap(src); - finished(stream, (err) => destroyImpl.destroyer(src, err)); - } - - const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { - [kStream]: { value: stream, writable: true }, - [kLastResolve]: { value: null, writable: true }, - [kLastReject]: { value: null, writable: true }, - [kError]: { value: null, writable: true }, - [kEnded]: { - value: stream.readableEnded || stream._readableState.endEmitted, - writable: true - }, - // The function passed to new Promise is cached so we avoid allocating a new - // closure at every run. - [kHandlePromise]: { - value: (resolve, reject) => { - const data = iterator[kStream].read(); - if (data) { - iterator[kLastPromise] = null; - iterator[kLastResolve] = null; - iterator[kLastReject] = null; - resolve(createIterResult(data, false)); - } else { - iterator[kLastResolve] = resolve; - iterator[kLastReject] = reject; - } - }, - writable: true, - }, - }); - iterator[kLastPromise] = null; - - finished(stream, { writable: false }, (err) => { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - const reject = iterator[kLastReject]; - // Reject if we are waiting for data in the Promise returned by next() and - // store the error. - if (reject !== null) { - iterator[kLastPromise] = null; - iterator[kLastResolve] = null; - iterator[kLastReject] = null; - reject(err); - } - iterator[kError] = err; - return; - } - - const resolve = iterator[kLastResolve]; - if (resolve !== null) { - iterator[kLastPromise] = null; - iterator[kLastResolve] = null; - iterator[kLastReject] = null; - resolve(createIterResult(undefined, true)); - } - iterator[kEnded] = true; - }); - - stream.on('readable', onReadable.bind(null, iterator)); - - return iterator; -}; - -module.exports = createReadableStreamAsyncIterator; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 4786b906f4324c..aaee0d24992af8 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -23,7 +23,7 @@ const { let EE; let PassThrough; -let createReadableStreamAsyncIterator; +let Readable; function destroyer(stream, reading, writing, callback) { callback = once(callback); @@ -113,11 +113,11 @@ function makeAsyncIterable(val) { } async function* fromReadable(val) { - if (!createReadableStreamAsyncIterator) { - createReadableStreamAsyncIterator = - require('internal/streams/async_iterator'); + if (!Readable) { + Readable = require('_stream_readable'); } - yield* createReadableStreamAsyncIterator(val); + + yield* Readable.prototype[SymbolAsyncIterator].call(val); } async function pump(iterable, writable, finish) { diff --git a/node.gyp b/node.gyp index 88942393ff3671..dd832ffc2adc12 100644 --- a/node.gyp +++ b/node.gyp @@ -222,7 +222,6 @@ 'lib/internal/worker/js_transferable.js', 'lib/internal/watchdog.js', 'lib/internal/streams/lazy_transform.js', - 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/buffer_list.js', 'lib/internal/streams/duplexpair.js', 'lib/internal/streams/from.js', diff --git a/test/parallel/test-readline-async-iterators-destroy.js b/test/parallel/test-readline-async-iterators-destroy.js index e96f831432b1eb..7c14a667062fec 100644 --- a/test/parallel/test-readline-async-iterators-destroy.js +++ b/test/parallel/test-readline-async-iterators-destroy.js @@ -75,6 +75,7 @@ async function testMutualDestroy() { break; } assert.deepStrictEqual(iteratedLines, expectedLines); + break; } assert.deepStrictEqual(iteratedLines, expectedLines); diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 55d16a1c5d363e..604ba3afb47fe7 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -11,17 +11,6 @@ const { const assert = require('assert'); async function tests() { - { - const AsyncIteratorPrototype = Object.getPrototypeOf( - Object.getPrototypeOf(async function* () {}).prototype); - const rs = new Readable({ - read() {} - }); - assert.strictEqual( - Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())), - AsyncIteratorPrototype); - } - { // v1 stream @@ -53,7 +42,9 @@ async function tests() { }); const iter = Readable.prototype[Symbol.asyncIterator].call(stream); - iter.next().catch(common.mustCall((err) => { + await iter.next(); + await iter.next(); + await iter.next().catch(common.mustCall((err) => { assert.strictEqual(err.message, 'asd'); })); } @@ -189,12 +180,19 @@ async function tests() { resolved.forEach(common.mustCall( (item, i) => assert.strictEqual(item.value, 'hello-' + i), max)); - errors.forEach((promise) => { + errors.slice(0, 1).forEach((promise) => { promise.catch(common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); })); }); + errors.slice(1).forEach((promise) => { + promise.then(common.mustCall(({ done, value }) => { + assert.strictEqual(done, true); + assert.strictEqual(value, undefined); + })); + }); + readable.destroy(new Error('kaboom')); } @@ -284,28 +282,6 @@ async function tests() { assert.strictEqual(received, 1); } - { - // Iterator throw. - - const readable = new Readable({ - objectMode: true, - read() { - this.push('hello'); - } - }); - - readable.on('error', common.mustCall((err) => { - assert.strictEqual(err.message, 'kaboom'); - })); - - const it = readable[Symbol.asyncIterator](); - it.throw(new Error('kaboom')).catch(common.mustCall((err) => { - assert.strictEqual(err.message, 'kaboom'); - })); - - assert.strictEqual(readable.destroyed, true); - } - { console.log('destroyed by throw'); const readable = new Readable({ @@ -577,12 +553,15 @@ async function tests() { assert.strictEqual(e, err); })(), (async () => { let e; + let x; try { - await d; + x = await d; } catch (_e) { e = _e; } - assert.strictEqual(e, err); + assert.strictEqual(e, undefined); + assert.strictEqual(x.done, true); + assert.strictEqual(x.value, undefined); })()]); }