diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index b832c973a1ee18..fe2281df471dc7 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -66,7 +66,6 @@ function Duplex(options) { if (options.allowHalfOpen === false) { this.allowHalfOpen = false; - this.once('end', onend); } } } @@ -128,18 +127,3 @@ ObjectDefineProperties(Duplex.prototype, { } } }); - -// The no-half-open enforcer -function onend() { - // If the writable side ended, then we're ok. - if (this._writableState.ended) - return; - - // No more data can be written. - // But allow more writes to happen in this tick. - process.nextTick(onEndNT, this); -} - -function onEndNT(self) { - self.end(); -} diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index dfbd023d24e185..1df50ba200fe0f 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -1217,17 +1217,34 @@ function endReadableNT(state, stream) { state.endEmitted = true; stream.emit('end'); - if (state.autoDestroy) { + if (stream.writable && stream.allowHalfOpen === false) { + process.nextTick(endWritableNT, state, stream); + } else if (state.autoDestroy) { // In case of duplex streams we need a way to detect // if the writable side is ready for autoDestroy as well const wState = stream._writableState; - if (!wState || (wState.autoDestroy && wState.finished)) { + const autoDestroy = !wState || ( + wState.autoDestroy && + // We don't expect the writable to ever 'finish' + // if writable is explicitly set to false. + (wState.finished || wState.writable === false) + ); + + if (autoDestroy) { stream.destroy(); } } } } +function endWritableNT(state, stream) { + const writable = stream.writable && !stream.writableEnded && + !stream.destroyed; + if (writable) { + stream.end(); + } +} + Readable.from = function(iterable, opts) { if (from === undefined) { from = require('internal/streams/from'); diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index b24192101c71a3..c3a7a35d2b3f6f 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -675,7 +675,13 @@ function finish(stream, state) { // In case of duplex streams we need a way to detect // if the readable side is ready for autoDestroy as well const rState = stream._readableState; - if (!rState || (rState.autoDestroy && rState.endEmitted)) { + const autoDestroy = !rState || ( + rState.autoDestroy && + // We don't expect the readable to ever 'end' + // if readable is explicitly set to false. + (rState.endEmitted || rState.readable === false) + ); + if (autoDestroy) { stream.destroy(); } } @@ -748,7 +754,7 @@ ObjectDefineProperties(Writable.prototype, { // Compat. The user might manually disable writable side through // deprecated setter. return !!w && w.writable !== false && !w.destroyed && !w.errored && - !w.ending; + !w.ending && !w.ended; }, set(val) { // Backwards compatible. diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js index 3c38d2c364051c..e7c91ec797beb3 100644 --- a/test/parallel/test-stream-duplex-destroy.js +++ b/test/parallel/test-stream-duplex-destroy.js @@ -194,3 +194,47 @@ const assert = require('assert'); new MyDuplex(); } + +{ + const duplex = new Duplex({ + writable: false, + autoDestroy: true, + write(chunk, enc, cb) { cb(); }, + read() {}, + }); + duplex.push(null); + duplex.resume(); + duplex.on('close', common.mustCall()); +} + +{ + const duplex = new Duplex({ + readable: false, + autoDestroy: true, + write(chunk, enc, cb) { cb(); }, + read() {}, + }); + duplex.end(); + duplex.on('close', common.mustCall()); +} + +{ + const duplex = new Duplex({ + allowHalfOpen: false, + autoDestroy: true, + write(chunk, enc, cb) { cb(); }, + read() {}, + }); + duplex.push(null); + duplex.resume(); + const orgEnd = duplex.end; + duplex.end = common.mustNotCall(); + duplex.on('end', () => { + // Ensure end() is called in next tick to allow + // any pending writes to be invoked first. + process.nextTick(() => { + duplex.end = common.mustCall(orgEnd); + }); + }); + duplex.on('close', common.mustCall()); +} diff --git a/test/parallel/test-stream-duplex-end.js b/test/parallel/test-stream-duplex-end.js index 8ee19346d3abe5..2c7706146eb882 100644 --- a/test/parallel/test-stream-duplex-end.js +++ b/test/parallel/test-stream-duplex-end.js @@ -22,7 +22,7 @@ const Duplex = require('stream').Duplex; }); assert.strictEqual(stream.allowHalfOpen, false); stream.on('finish', common.mustCall()); - assert.strictEqual(stream.listenerCount('end'), 1); + assert.strictEqual(stream.listenerCount('end'), 0); stream.resume(); stream.push(null); } @@ -35,7 +35,7 @@ const Duplex = require('stream').Duplex; assert.strictEqual(stream.allowHalfOpen, false); stream._writableState.ended = true; stream.on('finish', common.mustNotCall()); - assert.strictEqual(stream.listenerCount('end'), 1); + assert.strictEqual(stream.listenerCount('end'), 0); stream.resume(); stream.push(null); }