Skip to content

Commit ccc94a9

Browse files
committed
stream: add auto-destroy mode
1 parent 6223236 commit ccc94a9

File tree

4 files changed

+128
-11
lines changed

4 files changed

+128
-11
lines changed

doc/api/stream.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1521,6 +1521,8 @@ changes:
15211521
[`stream._destroy()`][writable-_destroy] method.
15221522
* `final` {Function} Implementation for the
15231523
[`stream._final()`][stream-_final] method.
1524+
* `autoDestroy` {boolean} Whether this stream should automatically call
1525+
.destroy() on itself after ending.
15241526

15251527
```js
15261528
const { Writable } = require('stream');
@@ -1770,6 +1772,8 @@ constructor and implement the `readable._read()` method.
17701772
method.
17711773
* `destroy` {Function} Implementation for the
17721774
[`stream._destroy()`][readable-_destroy] method.
1775+
* `autoDestroy` {boolean} Whether this stream should automatically call
1776+
.destroy() on itself after ending.
17731777

17741778
```js
17751779
const { Readable } = require('stream');

lib/_stream_readable.js

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) {
117117
// Should close be emitted on destroy. Defaults to true.
118118
this.emitClose = options.emitClose !== false;
119119

120+
// Should .destroy() be called after 'end' (and potentially 'finish')
121+
this.autoDestroy = !!options.autoDestroy;
122+
120123
// has it been destroyed
121124
this.destroyed = false;
122125

@@ -235,7 +238,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
235238
if (!skipChunkCheck)
236239
er = chunkInvalid(state, chunk);
237240
if (er) {
238-
stream.emit('error', er);
241+
errorOrDestroy(stream, er);
239242
} else if (state.objectMode || chunk && chunk.length > 0) {
240243
if (typeof chunk !== 'string' &&
241244
!state.objectMode &&
@@ -245,11 +248,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
245248

246249
if (addToFront) {
247250
if (state.endEmitted)
248-
stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
251+
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
249252
else
250253
addChunk(stream, state, chunk, true);
251254
} else if (state.ended) {
252-
stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
255+
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
253256
} else if (state.destroyed) {
254257
return false;
255258
} else {
@@ -581,7 +584,7 @@ function maybeReadMore_(stream, state) {
581584
// for virtual (non-string, non-buffer) streams, "length" is somewhat
582585
// arbitrary, and perhaps not very meaningful.
583586
Readable.prototype._read = function(n) {
584-
this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
587+
errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
585588
};
586589

587590
Readable.prototype.pipe = function(dest, pipeOpts) {
@@ -687,7 +690,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
687690
unpipe();
688691
dest.removeListener('error', onerror);
689692
if (EE.listenerCount(dest, 'error') === 0)
690-
dest.emit('error', er);
693+
errorOrDestroy(dest, er);
691694
}
692695

693696
// Make sure our error handler is attached before userland ones.
@@ -1084,6 +1087,28 @@ function endReadable(stream) {
10841087
}
10851088
}
10861089

1090+
function writableAutoDestroy(wState) {
1091+
// In case of duplex streams we need a way to detect
1092+
// if the writable side is ready for autoDestroy as well
1093+
return !wState || (wState.autoDestroy && wState.finished);
1094+
}
1095+
1096+
function errorOrDestroy(stream, err) {
1097+
// We have tests that rely on errors being emitted
1098+
// in the same tick, so changing this is semver major.
1099+
// For now when you opt-in to autoDestroy we allow
1100+
// the error to be emitted nextTick. In a future
1101+
// semver major update we should change the default to this.
1102+
1103+
const rState = stream._readableState;
1104+
const wState = stream._writableState;
1105+
1106+
if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
1107+
stream.destroy(err);
1108+
else
1109+
stream.emit('error', err);
1110+
}
1111+
10871112
function endReadableNT(state, stream) {
10881113
debug('endReadableNT', state.endEmitted, state.length);
10891114

@@ -1092,5 +1117,9 @@ function endReadableNT(state, stream) {
10921117
state.endEmitted = true;
10931118
stream.readable = false;
10941119
stream.emit('end');
1120+
1121+
if (state.autoDestroy && writableAutoDestroy(stream._writableState)) {
1122+
stream.destroy();
1123+
}
10951124
}
10961125
}

lib/_stream_writable.js

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ function WritableState(options, stream, isDuplex) {
147147
// Should close be emitted on destroy. Defaults to true.
148148
this.emitClose = options.emitClose !== false;
149149

150+
// Should .destroy() be called after 'finish' (and potentially 'end')
151+
this.autoDestroy = !!options.autoDestroy;
152+
150153
// count buffered requests
151154
this.bufferedRequestCount = 0;
152155

@@ -235,14 +238,14 @@ function Writable(options) {
235238

236239
// Otherwise people can pipe Writable streams, which is just wrong.
237240
Writable.prototype.pipe = function() {
238-
this.emit('error', new ERR_STREAM_CANNOT_PIPE());
241+
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
239242
};
240243

241244

242245
function writeAfterEnd(stream, cb) {
243246
var er = new ERR_STREAM_WRITE_AFTER_END();
244247
// TODO: defer error events consistently everywhere, not just the cb
245-
stream.emit('error', er);
248+
errorOrDestroy(stream, er);
246249
process.nextTick(cb, er);
247250
}
248251

@@ -258,7 +261,7 @@ function validChunk(stream, state, chunk, cb) {
258261
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
259262
}
260263
if (er) {
261-
stream.emit('error', er);
264+
errorOrDestroy(stream, er);
262265
process.nextTick(cb, er);
263266
return false;
264267
}
@@ -422,13 +425,13 @@ function onwriteError(stream, state, sync, er, cb) {
422425
// after error
423426
process.nextTick(finishMaybe, stream, state);
424427
stream._writableState.errorEmitted = true;
425-
stream.emit('error', er);
428+
errorOrDestroy(stream, er);
426429
} else {
427430
// the caller expect this to happen before if
428431
// it is async
429432
cb(er);
430433
stream._writableState.errorEmitted = true;
431-
stream.emit('error', er);
434+
errorOrDestroy(stream, er);
432435
// this can emit finish, but finish must
433436
// always follow error
434437
finishMaybe(stream, state);
@@ -612,7 +615,7 @@ function callFinal(stream, state) {
612615
stream._final((err) => {
613616
state.pendingcb--;
614617
if (err) {
615-
stream.emit('error', err);
618+
errorOrDestroy(stream, err);
616619
}
617620
state.prefinished = true;
618621
stream.emit('prefinish');
@@ -632,13 +635,39 @@ function prefinish(stream, state) {
632635
}
633636
}
634637

638+
function readableAutoDestroy(rState) {
639+
// In case of duplex streams we need a way to detect
640+
// if the readable side is ready for autoDestroy as well
641+
return !rState || (rState.autoDestroy && rState.endEmitted);
642+
}
643+
644+
function errorOrDestroy(stream, err) {
645+
// We have tests that rely on errors being emitted
646+
// in the same tick, so changing this is semver major.
647+
// For now when you opt-in to autoDestroy we allow
648+
// the error to be emitted nextTick. In a future
649+
// semver major update we should change the default to this.
650+
651+
const rState = stream._readableState;
652+
const wState = stream._writableState;
653+
654+
if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
655+
stream.destroy(err);
656+
else
657+
stream.emit('error', err);
658+
}
659+
635660
function finishMaybe(stream, state) {
636661
var need = needFinish(state);
637662
if (need) {
638663
prefinish(stream, state);
639664
if (state.pendingcb === 0) {
640665
state.finished = true;
641666
stream.emit('finish');
667+
668+
if (state.autoDestroy && readableAutoDestroy(stream._readableState)) {
669+
stream.destroy();
670+
}
642671
}
643672
}
644673
return need;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
'use strict';
2+
const common = require('../common');
3+
const stream = require('stream');
4+
5+
{
6+
const r = new stream.Readable({
7+
autoDestroy: true,
8+
read() {
9+
this.push('hello');
10+
this.push('world');
11+
this.push(null);
12+
},
13+
destroy: common.mustCall((err, cb) => cb())
14+
});
15+
16+
r.resume();
17+
r.on('end', common.mustCall());
18+
r.on('close', common.mustCall());
19+
}
20+
21+
{
22+
const w = new stream.Writable({
23+
autoDestroy: true,
24+
write(data, enc, cb) {
25+
cb(null);
26+
},
27+
destroy: common.mustCall((err, cb) => cb())
28+
});
29+
30+
w.write('hello');
31+
w.write('world');
32+
w.end();
33+
34+
w.on('finish', common.mustCall());
35+
w.on('close', common.mustCall());
36+
}
37+
38+
{
39+
const t = new stream.Transform({
40+
autoDestroy: true,
41+
transform(data, enc, cb) {
42+
cb(null, data);
43+
},
44+
destroy: common.mustCall((err, cb) => cb())
45+
});
46+
47+
t.write('hello');
48+
t.write('world');
49+
t.end();
50+
51+
t.resume();
52+
t.on('end', common.mustCall());
53+
t.on('finish', common.mustCall());
54+
t.on('close', common.mustCall());
55+
}

0 commit comments

Comments
 (0)