From 573410fb69580f677ece9f383b3f00c5c38782ef Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 10 Oct 2020 12:38:44 +0200 Subject: [PATCH] stream: multiple stream backports includes: * stream: simpler and faster Readable async iterator * stream: don't destroy on async iterator success * stream: async iterator stop read if destroyed PR-URL: https://github.com/nodejs/node/pull/34887 Refs: https://github.com/nodejs/node/pull/34035 Refs: https://github.com/nodejs/node/pull/35122 Refs: https://github.com/nodejs/node/pull/35640 Refs: https://github.com/nodejs/node/issues/34680 Reviewed-By: Luigi Pinca Reviewed-By: Matteo Collina --- lib/_stream_readable.js | 84 ++++++- lib/internal/streams/async_iterator.js | 221 ------------------ lib/internal/streams/destroy.js | 1 + lib/internal/streams/pipeline.js | 9 +- node.gyp | 1 - .../test-stream-readable-async-iterators.js | 192 +++++++++++---- 6 files changed, 237 insertions(+), 271 deletions(-) delete mode 100644 lib/internal/streams/async_iterator.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 1ac553154853c6..50f11bf970f23a 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -28,6 +28,7 @@ const { NumberParseInt, ObjectDefineProperties, ObjectSetPrototypeOf, + Promise, Set, SymbolAsyncIterator, Symbol @@ -60,11 +61,11 @@ const kPaused = Symbol('kPaused'); // Lazy loaded to improve the startup performance. let StringDecoder; -let createReadableStreamAsyncIterator; let from; ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); +function nop() {} const { errorOrDestroy } = destroyImpl; @@ -1076,13 +1077,86 @@ Readable.prototype.wrap = function(stream) { }; Readable.prototype[SymbolAsyncIterator] = function() { - if (createReadableStreamAsyncIterator === undefined) { - createReadableStreamAsyncIterator = - require('internal/streams/async_iterator'); + let stream = this; + + if (typeof stream.read !== 'function') { + // v1 stream + const src = stream; + stream = new Readable({ + objectMode: true, + destroy(err, callback) { + destroyImpl.destroyer(src, err); + callback(err); + } + }).wrap(src); } - return createReadableStreamAsyncIterator(this); + + const iter = createAsyncIterator(stream); + iter.stream = stream; + return iter; }; +async function* createAsyncIterator(stream) { + let callback = nop; + + function next(resolve) { + if (this === stream) { + callback(); + callback = nop; + } else { + callback = resolve; + } + } + + const state = stream._readableState; + + let error = state.errored; + let errorEmitted = state.errorEmitted; + let endEmitted = state.endEmitted; + let closeEmitted = state.closeEmitted; + + stream + .on('readable', next) + .on('error', function(err) { + error = err; + errorEmitted = true; + next.call(this); + }) + .on('end', function() { + endEmitted = true; + next.call(this); + }) + .on('close', function() { + closeEmitted = true; + next.call(this); + }); + + try { + while (true) { + const chunk = stream.destroyed ? null : stream.read(); + if (chunk !== null) { + yield chunk; + } else if (errorEmitted) { + throw error; + } else if (endEmitted) { + break; + } else if (closeEmitted) { + break; + } else { + await new Promise(next); + } + } + } catch (err) { + destroyImpl.destroyer(stream, err); + throw err; + } finally { + if (state.autoDestroy || !endEmitted) { + // TODO(ronag): ERR_PREMATURE_CLOSE? + destroyImpl.destroyer(stream, null); + } + } +} + // Making it explicit these properties are not enumerable // because otherwise some prototype manipulation in // userland will fail. diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js deleted file mode 100644 index 87b7f227d59070..00000000000000 --- a/lib/internal/streams/async_iterator.js +++ /dev/null @@ -1,221 +0,0 @@ -'use strict'; - -const { - ObjectCreate, - ObjectGetPrototypeOf, - ObjectSetPrototypeOf, - Promise, - PromiseReject, - PromiseResolve, - Symbol, -} = primordials; - -const finished = require('internal/streams/end-of-stream'); -const destroyImpl = require('internal/streams/destroy'); - -const kLastResolve = Symbol('lastResolve'); -const kLastReject = Symbol('lastReject'); -const kError = Symbol('error'); -const kEnded = Symbol('ended'); -const kLastPromise = Symbol('lastPromise'); -const kHandlePromise = Symbol('handlePromise'); -const kStream = Symbol('stream'); - -let Readable; - -function createIterResult(value, done) { - return { value, done }; -} - -function readAndResolve(iter) { - const resolve = iter[kLastResolve]; - if (resolve !== null) { - const data = iter[kStream].read(); - // We defer if data is null. We can be expecting either 'end' or 'error'. - if (data !== null) { - iter[kLastPromise] = null; - iter[kLastResolve] = null; - iter[kLastReject] = null; - resolve(createIterResult(data, false)); - } - } -} - -function onReadable(iter) { - // We wait for the next tick, because it might - // emit an error with `process.nextTick()`. - process.nextTick(readAndResolve, iter); -} - -function wrapForNext(lastPromise, iter) { - return (resolve, reject) => { - lastPromise.then(() => { - if (iter[kEnded]) { - resolve(createIterResult(undefined, true)); - return; - } - - iter[kHandlePromise](resolve, reject); - }, reject); - }; -} - -const AsyncIteratorPrototype = ObjectGetPrototypeOf( - ObjectGetPrototypeOf(async function* () {}).prototype); - -function finish(self, err) { - return new Promise((resolve, reject) => { - const stream = self[kStream]; - - finished(stream, (err) => { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - reject(err); - } else { - resolve(createIterResult(undefined, true)); - } - }); - destroyImpl.destroyer(stream, err); - }); -} - -const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ - get stream() { - return this[kStream]; - }, - - next() { - // If we have detected an error in the meanwhile - // reject straight away. - const error = this[kError]; - if (error !== null) { - return PromiseReject(error); - } - - if (this[kEnded]) { - return PromiseResolve(createIterResult(undefined, true)); - } - - if (this[kStream].destroyed) { - return new Promise((resolve, reject) => { - if (this[kError]) { - reject(this[kError]); - } else if (this[kEnded]) { - resolve(createIterResult(undefined, true)); - } else { - finished(this[kStream], (err) => { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - reject(err); - } else { - resolve(createIterResult(undefined, true)); - } - }); - } - }); - } - - // If we have multiple next() calls we will wait for the previous Promise to - // finish. This logic is optimized to support for await loops, where next() - // is only called once at a time. - const lastPromise = this[kLastPromise]; - let promise; - - if (lastPromise) { - promise = new Promise(wrapForNext(lastPromise, this)); - } else { - // Fast path needed to support multiple this.push() - // without triggering the next() queue. - const data = this[kStream].read(); - if (data !== null) { - return PromiseResolve(createIterResult(data, false)); - } - - promise = new Promise(this[kHandlePromise]); - } - - this[kLastPromise] = promise; - - return promise; - }, - - return() { - return finish(this); - }, - - throw(err) { - return finish(this, err); - }, -}, AsyncIteratorPrototype); - -const createReadableStreamAsyncIterator = (stream) => { - if (typeof stream.read !== 'function') { - // v1 stream - - if (!Readable) { - Readable = require('_stream_readable'); - } - - const src = stream; - stream = new Readable({ objectMode: true }).wrap(src); - finished(stream, (err) => destroyImpl.destroyer(src, err)); - } - - const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { - [kStream]: { value: stream, writable: true }, - [kLastResolve]: { value: null, writable: true }, - [kLastReject]: { value: null, writable: true }, - [kError]: { value: null, writable: true }, - [kEnded]: { - value: stream.readableEnded || stream._readableState.endEmitted, - writable: true - }, - // The function passed to new Promise is cached so we avoid allocating a new - // closure at every run. - [kHandlePromise]: { - value: (resolve, reject) => { - const data = iterator[kStream].read(); - if (data) { - iterator[kLastPromise] = null; - iterator[kLastResolve] = null; - iterator[kLastReject] = null; - resolve(createIterResult(data, false)); - } else { - iterator[kLastResolve] = resolve; - iterator[kLastReject] = reject; - } - }, - writable: true, - }, - }); - iterator[kLastPromise] = null; - - finished(stream, { writable: false }, (err) => { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - const reject = iterator[kLastReject]; - // Reject if we are waiting for data in the Promise returned by next() and - // store the error. - if (reject !== null) { - iterator[kLastPromise] = null; - iterator[kLastResolve] = null; - iterator[kLastReject] = null; - reject(err); - } - iterator[kError] = err; - return; - } - - const resolve = iterator[kLastResolve]; - if (resolve !== null) { - iterator[kLastPromise] = null; - iterator[kLastResolve] = null; - iterator[kLastReject] = null; - resolve(createIterResult(undefined, true)); - } - iterator[kEnded] = true; - }); - - stream.on('readable', onReadable.bind(null, iterator)); - - return iterator; -}; - -module.exports = createReadableStreamAsyncIterator; diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 8cb7509ea66aa2..8b052cc3c61c63 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -123,6 +123,7 @@ function undestroy() { if (w) { w.closed = false; + w.closeEmitted = false; w.destroyed = false; w.errored = null; w.ended = false; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 4786b906f4324c..33d47e52933afb 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -23,7 +23,7 @@ const { let EE; let PassThrough; -let createReadableStreamAsyncIterator; +let Readable; function destroyer(stream, reading, writing, callback) { callback = once(callback); @@ -113,11 +113,10 @@ function makeAsyncIterable(val) { } async function* fromReadable(val) { - if (!createReadableStreamAsyncIterator) { - createReadableStreamAsyncIterator = - require('internal/streams/async_iterator'); + if (!Readable) { + Readable = require('_stream_readable'); } - yield* createReadableStreamAsyncIterator(val); + yield* Readable.prototype[SymbolAsyncIterator].call(val); } async function pump(iterable, writable, finish) { diff --git a/node.gyp b/node.gyp index 2c919528490138..48ce65890c5017 100644 --- a/node.gyp +++ b/node.gyp @@ -222,7 +222,6 @@ 'lib/internal/worker/js_transferable.js', 'lib/internal/watchdog.js', 'lib/internal/streams/lazy_transform.js', - 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/buffer_list.js', 'lib/internal/streams/duplexpair.js', 'lib/internal/streams/from.js', diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 55d16a1c5d363e..db1a359b214399 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -9,19 +9,9 @@ const { pipeline } = require('stream'); const assert = require('assert'); +const http = require('http'); async function tests() { - { - const AsyncIteratorPrototype = Object.getPrototypeOf( - Object.getPrototypeOf(async function* () {}).prototype); - const rs = new Readable({ - read() {} - }); - assert.strictEqual( - Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())), - AsyncIteratorPrototype); - } - { // v1 stream @@ -53,9 +43,11 @@ async function tests() { }); const iter = Readable.prototype[Symbol.asyncIterator].call(stream); - iter.next().catch(common.mustCall((err) => { - assert.strictEqual(err.message, 'asd'); - })); + await iter.next() + .then(common.mustNotCall()) + .catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); } { @@ -189,12 +181,19 @@ async function tests() { resolved.forEach(common.mustCall( (item, i) => assert.strictEqual(item.value, 'hello-' + i), max)); - errors.forEach((promise) => { + errors.slice(0, 1).forEach((promise) => { promise.catch(common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); })); }); + errors.slice(1).forEach((promise) => { + promise.then(common.mustCall(({ done, value }) => { + assert.strictEqual(done, true); + assert.strictEqual(value, undefined); + })); + }); + readable.destroy(new Error('kaboom')); } @@ -284,28 +283,6 @@ async function tests() { assert.strictEqual(received, 1); } - { - // Iterator throw. - - const readable = new Readable({ - objectMode: true, - read() { - this.push('hello'); - } - }); - - readable.on('error', common.mustCall((err) => { - assert.strictEqual(err.message, 'kaboom'); - })); - - const it = readable[Symbol.asyncIterator](); - it.throw(new Error('kaboom')).catch(common.mustCall((err) => { - assert.strictEqual(err.message, 'kaboom'); - })); - - assert.strictEqual(readable.destroyed, true); - } - { console.log('destroyed by throw'); const readable = new Readable({ @@ -577,12 +554,15 @@ async function tests() { assert.strictEqual(e, err); })(), (async () => { let e; + let x; try { - await d; + x = await d; } catch (_e) { e = _e; } - assert.strictEqual(e, err); + assert.strictEqual(e, undefined); + assert.strictEqual(x.done, true); + assert.strictEqual(x.value, undefined); })()]); } @@ -602,6 +582,61 @@ async function tests() { assert.strictEqual(err, _err); })); } + + { + // Don't destroy if no auto destroy. + // https://github.com/nodejs/node/issues/35116 + + const r = new Readable({ + autoDestroy: false, + read() { + this.push('asd'); + this.push(null); + } + }); + + for await (const chunk of r) { + chunk; + } + assert.strictEqual(r.destroyed, false); + } + + { + // Destroy if no auto destroy and premature break. + // https://github.com/nodejs/node/pull/35122/files#r485678318 + + const r = new Readable({ + autoDestroy: false, + read() { + this.push('asd'); + } + }); + + for await (const chunk of r) { + chunk; + break; + } + assert.strictEqual(r.destroyed, true); + } + + { + // Don't destroy before 'end'. + + const r = new Readable({ + read() { + this.push('asd'); + this.push(null); + } + }).on('end', () => { + assert.strictEqual(r.destroyed, false); + }); + + for await (const chunk of r) { + chunk; + } + + assert.strictEqual(r.destroyed, true); + } } { @@ -664,5 +699,84 @@ async function tests() { }); } +{ + let _req; + const server = http.createServer((request, response) => { + response.statusCode = 404; + response.write('never ends'); + }); + + server.listen(() => { + _req = http.request(`http://localhost:${server.address().port}`) + .on('response', common.mustCall(async (res) => { + setTimeout(() => { + _req.destroy(new Error('something happened')); + }, 100); + + res.on('aborted', () => { + const err = new Error(); + err.code = 'ECONNRESET'; + res.emit('error', err); + }); + + res.on('error', common.mustCall()); + + let _err; + try { + for await (const chunk of res) { + chunk; + } + } catch (err) { + _err = err; + } + + assert.strictEqual(_err.code, 'ECONNRESET'); + server.close(); + })) + .on('error', common.mustCall()) + .end(); + }); +} + +{ + async function getParsedBody(request) { + let body = ''; + + for await (const data of request) { + body += data; + } + + try { + return JSON.parse(body); + } catch { + return {}; + } + } + + const str = JSON.stringify({ asd: true }); + const server = http.createServer(async (request, response) => { + const body = await getParsedBody(request); + response.statusCode = 200; + assert.strictEqual(JSON.stringify(body), str); + response.end(JSON.stringify(body)); + }).listen(() => { + http + .request({ + method: 'POST', + hostname: 'localhost', + port: server.address().port, + }) + .end(str) + .on('response', async (res) => { + let body = ''; + for await (const chunk of res) { + body += chunk; + } + assert.strictEqual(body, str); + server.close(); + }); + }); +} + // To avoid missing some tests if a promise does not resolve tests().then(common.mustCall());