From 41c80f6abe3f2bb89f369e0e66a8a8179ea2c124 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 28 Jun 2020 18:38:20 +0200 Subject: [PATCH] stream: destroy wrapped streams on error Stream should be destroyed and update state accordingly when the wrapped stream emits error. Does some additional cleanup with future TODOs that might be worth looking into. PR-URL: https://github.com/nodejs/node/pull/34102 Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca Reviewed-By: Anna Henningsen --- lib/_stream_readable.js | 28 +++++++++++++--- .../test-stream2-readable-wrap-error.js | 32 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream2-readable-wrap-error.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index fc0baca1b81235..a466011b682beb 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -66,7 +66,6 @@ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); const { errorOrDestroy } = destroyImpl; -const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { // Sadly this is not cacheable as some libraries bundle their own @@ -1034,10 +1033,29 @@ Readable.prototype.wrap = function(stream) { } } - // Proxy certain important events. - for (const kProxyEvent of kProxyEvents) { - stream.on(kProxyEvent, this.emit.bind(this, kProxyEvent)); - } + stream.on('error', (err) => { + errorOrDestroy(this, err); + }); + + stream.on('close', () => { + // TODO(ronag): Update readable state? + this.emit('close'); + }); + + stream.on('destroy', () => { + // TODO(ronag): this.destroy()? + this.emit('destroy'); + }); + + stream.on('pause', () => { + // TODO(ronag): this.pause()? + this.emit('pause'); + }); + + stream.on('resume', () => { + // TODO(ronag): this.resume()? + this.emit('resume'); + }); // When we try to consume some more bytes, simply unpause the // underlying stream. diff --git a/test/parallel/test-stream2-readable-wrap-error.js b/test/parallel/test-stream2-readable-wrap-error.js new file mode 100644 index 00000000000000..b56b9bc41c7527 --- /dev/null +++ b/test/parallel/test-stream2-readable-wrap-error.js @@ -0,0 +1,32 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const Readable = require('_stream_readable'); +const EE = require('events').EventEmitter; + +const oldStream = new EE(); +oldStream.pause = () => {}; +oldStream.resume = () => {}; + +{ + const r = new Readable({ autoDestroy: true }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, true); + })); + oldStream.emit('error', new Error()); +} + +{ + const r = new Readable({ autoDestroy: false }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, false); + })); + oldStream.emit('error', new Error()); +}