Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: Duplex autoDestroy with disabled readable/writable #32139

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions lib/_stream_duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ function Duplex(options) {

if (options.allowHalfOpen === false) {
this.allowHalfOpen = false;
this.once('end', onend);
}
}
}
Expand Down Expand Up @@ -128,18 +127,3 @@ ObjectDefineProperties(Duplex.prototype, {
}
}
});

// The no-half-open enforcer
function onend() {
// If the writable side ended, then we're ok.
if (this._writableState.ended)
return;

// No more data can be written.
// But allow more writes to happen in this tick.
process.nextTick(onEndNT, this);
}

function onEndNT(self) {
self.end();
}
21 changes: 19 additions & 2 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1217,17 +1217,34 @@ function endReadableNT(state, stream) {
state.endEmitted = true;
stream.emit('end');

if (state.autoDestroy) {
if (stream.writable && stream.allowHalfOpen === false) {
process.nextTick(endWritableNT, state, stream);
} else if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the writable side is ready for autoDestroy as well
const wState = stream._writableState;
if (!wState || (wState.autoDestroy && wState.finished)) {
const autoDestroy = !wState || (
wState.autoDestroy &&
// We don't expect the writable to ever 'finish'
// if writable is explicitly set to false.
(wState.finished || wState.writable === false)
);

if (autoDestroy) {
stream.destroy();
}
}
}
}

function endWritableNT(state, stream) {
const writable = stream.writable && !stream.writableEnded &&
!stream.destroyed;
if (writable) {
stream.end();
}
}

Readable.from = function(iterable, opts) {
if (from === undefined) {
from = require('internal/streams/from');
Expand Down
10 changes: 8 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,13 @@ function finish(stream, state) {
// In case of duplex streams we need a way to detect
// if the readable side is ready for autoDestroy as well
const rState = stream._readableState;
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
const autoDestroy = !rState || (
rState.autoDestroy &&
// We don't expect the readable to ever 'end'
// if readable is explicitly set to false.
(rState.endEmitted || rState.readable === false)
);
if (autoDestroy) {
stream.destroy();
}
}
Expand Down Expand Up @@ -748,7 +754,7 @@ ObjectDefineProperties(Writable.prototype, {
// Compat. The user might manually disable writable side through
// deprecated setter.
return !!w && w.writable !== false && !w.destroyed && !w.errored &&
!w.ending;
!w.ending && !w.ended;
},
set(val) {
// Backwards compatible.
Expand Down
44 changes: 44 additions & 0 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,47 @@ const assert = require('assert');

new MyDuplex();
}

{
const duplex = new Duplex({
writable: false,
autoDestroy: true,
write(chunk, enc, cb) { cb(); },
read() {},
});
duplex.push(null);
duplex.resume();
duplex.on('close', common.mustCall());
}

{
const duplex = new Duplex({
readable: false,
autoDestroy: true,
write(chunk, enc, cb) { cb(); },
read() {},
});
duplex.end();
duplex.on('close', common.mustCall());
}

{
const duplex = new Duplex({
allowHalfOpen: false,
autoDestroy: true,
write(chunk, enc, cb) { cb(); },
read() {},
});
duplex.push(null);
duplex.resume();
const orgEnd = duplex.end;
duplex.end = common.mustNotCall();
duplex.on('end', () => {
// Ensure end() is called in next tick to allow
// any pending writes to be invoked first.
process.nextTick(() => {
duplex.end = common.mustCall(orgEnd);
});
});
duplex.on('close', common.mustCall());
}
4 changes: 2 additions & 2 deletions test/parallel/test-stream-duplex-end.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const Duplex = require('stream').Duplex;
});
assert.strictEqual(stream.allowHalfOpen, false);
stream.on('finish', common.mustCall());
assert.strictEqual(stream.listenerCount('end'), 1);
assert.strictEqual(stream.listenerCount('end'), 0);
stream.resume();
stream.push(null);
}
Expand All @@ -35,7 +35,7 @@ const Duplex = require('stream').Duplex;
assert.strictEqual(stream.allowHalfOpen, false);
stream._writableState.ended = true;
stream.on('finish', common.mustNotCall());
assert.strictEqual(stream.listenerCount('end'), 1);
assert.strictEqual(stream.listenerCount('end'), 0);
stream.resume();
stream.push(null);
}