From 67ed526ab03c3e9b2f41290b7cd6e9c2e7d9b4dd Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 8 Dec 2019 09:12:14 +0100 Subject: [PATCH] stream: error state cleanup Clean up end simplify errored state. - errorEmitted should be set in the same tick as 'error' is emitted. - errored should be set as soon as an error occurs. - errored should exist on Readable as well. - refactor destroy logic and make it easier to follow. PR-URL: https://github.com/nodejs/node/pull/30851 Reviewed-By: Matteo Collina Reviewed-By: Anna Henningsen --- lib/_stream_readable.js | 3 + lib/internal/streams/destroy.js | 96 +++++++++++-------- test/parallel/test-stream-readable-destroy.js | 57 ++++++++++- test/parallel/test-stream-writable-destroy.js | 34 +++++-- 4 files changed, 140 insertions(+), 50 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 6226bbf5eb4063..1e42b15363fb36 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -140,6 +140,9 @@ function ReadableState(options, stream, isDuplex) { // Has it been destroyed this.destroyed = false; + // Indicates whether the stream has errored. + this.errored = false; + // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index ded75b255934c4..fb206c6c83d0a0 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -1,41 +1,25 @@ 'use strict'; -function needError(stream, err) { - if (!err) { - return false; - } - - const r = stream._readableState; - const w = stream._writableState; - - if ((w && w.errorEmitted) || (r && r.errorEmitted)) { - return false; - } - - if (w) { - w.errorEmitted = true; - } - if (r) { - r.errorEmitted = true; - } - - return true; -} - // Undocumented cb() API, needed for core, not for public API. // The cb() will be invoked synchronously if _destroy is synchronous. +// If cb is passed no 'error' event will be emitted. function destroy(err, cb) { const r = this._readableState; const w = this._writableState; - if (w && err) { - w.errored = true; + if (err) { + if (w) { + w.errored = true; + } + if (r) { + r.errored = true; + } } if ((w && w.destroyed) || (r && r.destroyed)) { if (cb) { cb(err); - } else if (needError(this, err)) { + } else if (err) { process.nextTick(emitErrorNT, this, err); } @@ -53,17 +37,24 @@ function destroy(err, cb) { } this._destroy(err || null, (err) => { - const emitClose = (w && w.emitClose) || (r && r.emitClose); + if (err) { + if (w) { + w.errored = true; + } + if (r) { + r.errored = true; + } + } + if (cb) { // Invoke callback before scheduling emitClose so that callback // can schedule before. cb(err); - if (emitClose) { - process.nextTick(emitCloseNT, this); - } - } else if (needError(this, err)) { - process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err); - } else if (emitClose) { + // Don't emit 'error' if passed a callback. + process.nextTick(emitCloseNT, this); + } else if (err) { + process.nextTick(emitErrorCloseNT, this, err); + } else { process.nextTick(emitCloseNT, this); } }); @@ -72,15 +63,34 @@ function destroy(err, cb) { } function emitErrorCloseNT(self, err) { - self.emit('error', err); - self.emit('close'); + emitErrorNT(self, err); + emitCloseNT(self); } function emitCloseNT(self) { - self.emit('close'); + const r = self._readableState; + const w = self._writableState; + + if ((w && w.emitClose) || (r && r.emitClose)) { + self.emit('close'); + } } function emitErrorNT(self, err) { + const r = self._readableState; + const w = self._writableState; + + if ((w && w.errorEmitted) || (r && r.errorEmitted)) { + return; + } + + if (w) { + w.errorEmitted = true; + } + if (r) { + r.errorEmitted = true; + } + self.emit('error', err); } @@ -90,6 +100,7 @@ function undestroy() { if (r) { r.destroyed = false; + r.errored = false; r.reading = false; r.ended = false; r.endEmitted = false; @@ -118,14 +129,17 @@ function errorOrDestroy(stream, err) { const r = stream._readableState; const w = stream._writableState; - if (w & err) { - w.errored = true; - } - if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err); - else if (needError(stream, err)) - stream.emit('error', err); + else if (err) { + if (w) { + w.errored = true; + } + if (r) { + r.errored = true; + } + emitErrorNT(stream, err); + } } diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index 7687ea90cc82d8..d446dd3611781e 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -129,13 +129,20 @@ const assert = require('assert'); cb(expected); }); + let ticked = false; read.on('end', common.mustNotCall('no end event')); read.on('error', common.mustCall((err) => { + assert.strictEqual(ticked, true); + assert.strictEqual(read._readableState.errorEmitted, true); + assert.strictEqual(read._readableState.errored, true); assert.strictEqual(err, expected); })); read.destroy(); + assert.strictEqual(read._readableState.errorEmitted, false); + assert.strictEqual(read._readableState.errored, true); assert.strictEqual(read.destroyed, true); + ticked = true; } { @@ -174,10 +181,58 @@ const assert = require('assert'); const expected = new Error('kaboom'); - read.on('close', common.mustCall()); + let ticked = false; + read.on('close', common.mustCall(() => { + assert.strictEqual(read._readableState.errorEmitted, false); + assert.strictEqual(ticked, true); + })); + // 'error' should not be emitted since a callback is passed to + // destroy(err, callback); + read.on('error', common.mustNotCall()); + + assert.strictEqual(read._readableState.errored, false); + assert.strictEqual(read._readableState.errorEmitted, false); + read.destroy(expected, common.mustCall(function(err) { + assert.strictEqual(read._readableState.errored, true); assert.strictEqual(err, expected); })); + assert.strictEqual(read._readableState.errorEmitted, false); + assert.strictEqual(read._readableState.errored, true); + ticked = true; +} + +{ + const readable = new Readable({ + destroy: common.mustCall(function(err, cb) { + process.nextTick(cb, new Error('kaboom 1')); + }), + read() {} + }); + + let ticked = false; + readable.on('close', common.mustCall(() => { + assert.strictEqual(ticked, true); + assert.strictEqual(readable._readableState.errorEmitted, true); + })); + readable.on('error', common.mustCall((err) => { + assert.strictEqual(ticked, true); + assert.strictEqual(err.message, 'kaboom 2'); + assert.strictEqual(readable._readableState.errorEmitted, true); + })); + + readable.destroy(); + assert.strictEqual(readable.destroyed, true); + assert.strictEqual(readable._readableState.errored, false); + assert.strictEqual(readable._readableState.errorEmitted, false); + + // Test case where `readable.destroy()` is called again with an error before + // the `_destroy()` callback is called. + readable.destroy(new Error('kaboom 2')); + assert.strictEqual(readable._readableState.errorEmitted, false); + assert.strictEqual(readable._readableState.errored, true); + + ticked = true; } { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 1186c618634cab..d321d808199d56 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -157,13 +157,22 @@ const assert = require('assert'); write(chunk, enc, cb) { cb(); } }); - write.on('close', common.mustCall()); - write.on('error', common.mustCall()); + let ticked = false; + write.on('close', common.mustCall(() => { + assert.strictEqual(ticked, true); + })); + write.on('error', common.mustCall((err) => { + assert.strictEqual(ticked, true); + assert.strictEqual(err.message, 'kaboom 1'); + assert.strictEqual(write._writableState.errorEmitted, true); + })); write.destroy(new Error('kaboom 1')); write.destroy(new Error('kaboom 2')); - assert.strictEqual(write._writableState.errorEmitted, true); + assert.strictEqual(write._writableState.errored, true); + assert.strictEqual(write._writableState.errorEmitted, false); assert.strictEqual(write.destroyed, true); + ticked = true; } { @@ -176,20 +185,29 @@ const assert = require('assert'); } }); - writable.on('close', common.mustCall()); - writable.on('error', common.expectsError({ - type: Error, - message: 'kaboom 2' + let ticked = false; + writable.on('close', common.mustCall(() => { + assert.strictEqual(ticked, true); + assert.strictEqual(writable._writableState.errorEmitted, true); + })); + writable.on('error', common.mustCall((err) => { + assert.strictEqual(ticked, true); + assert.strictEqual(err.message, 'kaboom 2'); + assert.strictEqual(writable._writableState.errorEmitted, true); })); writable.destroy(); assert.strictEqual(writable.destroyed, true); + assert.strictEqual(writable._writableState.errored, false); assert.strictEqual(writable._writableState.errorEmitted, false); // Test case where `writable.destroy()` is called again with an error before // the `_destroy()` callback is called. writable.destroy(new Error('kaboom 2')); - assert.strictEqual(writable._writableState.errorEmitted, true); + assert.strictEqual(writable._writableState.errorEmitted, false); + assert.strictEqual(writable._writableState.errored, true); + + ticked = true; } {