Skip to content

Commit 4a2bd69

Browse files
ronagTrott
authored andcommitted
stream: fix destroy() behavior
Ensure errorEmitted is always set. Only emit 'error' once. PR-URL: #29058 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent a890771 commit 4a2bd69

10 files changed

+153
-84
lines changed

doc/api/stream.md

+3
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ The stream is not closed when the `'error'` event is emitted unless the
281281
[`autoDestroy`][writable-new] option was set to `true` when creating the
282282
stream.
283283

284+
After `'error'`, no further events other than `'close'` *should* be emitted
285+
(including `'error'` events).
286+
284287
##### Event: 'finish'
285288
<!-- YAML
286289
added: v0.9.4

lib/_stream_readable.js

+3
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) {
117117
this.resumeScheduled = false;
118118
this.paused = true;
119119

120+
// True if the error was already emitted and should not be thrown again
121+
this.errorEmitted = false;
122+
120123
// Should close be emitted on destroy. Defaults to true.
121124
this.emitClose = options.emitClose !== false;
122125

lib/_stream_writable.js

-2
Original file line numberDiff line numberDiff line change
@@ -429,13 +429,11 @@ function onwriteError(stream, state, sync, er, cb) {
429429
// This can emit finish, and it will always happen
430430
// after error
431431
process.nextTick(finishMaybe, stream, state);
432-
stream._writableState.errorEmitted = true;
433432
errorOrDestroy(stream, er);
434433
} else {
435434
// The caller expect this to happen before if
436435
// it is async
437436
cb(er);
438-
stream._writableState.errorEmitted = true;
439437
errorOrDestroy(stream, er);
440438
// This can emit finish, but finish must
441439
// always follow error

lib/internal/streams/destroy.js

+60-47
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,37 @@
11
'use strict';
22

3+
function needError(stream, err) {
4+
if (!err) {
5+
return false;
6+
}
7+
8+
const r = stream._readableState;
9+
const w = stream._writableState;
10+
11+
if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
12+
return false;
13+
}
14+
15+
if (w) {
16+
w.errorEmitted = true;
17+
}
18+
if (r) {
19+
r.errorEmitted = true;
20+
}
21+
22+
return true;
23+
}
24+
325
// Undocumented cb() API, needed for core, not for public API
426
function destroy(err, cb) {
5-
const readableDestroyed = this._readableState &&
6-
this._readableState.destroyed;
7-
const writableDestroyed = this._writableState &&
8-
this._writableState.destroyed;
27+
const r = this._readableState;
28+
const w = this._writableState;
929

10-
if (readableDestroyed || writableDestroyed) {
30+
if ((w && w.destroyed) || (r && r.destroyed)) {
1131
if (cb) {
1232
cb(err);
13-
} else if (err) {
14-
if (!this._writableState) {
15-
process.nextTick(emitErrorNT, this, err);
16-
} else if (!this._writableState.errorEmitted) {
17-
this._writableState.errorEmitted = true;
18-
process.nextTick(emitErrorNT, this, err);
19-
}
33+
} else if (needError(this, err)) {
34+
process.nextTick(emitErrorNT, this, err);
2035
}
2136

2237
return this;
@@ -25,28 +40,19 @@ function destroy(err, cb) {
2540
// We set destroyed to true before firing error callbacks in order
2641
// to make it re-entrance safe in case destroy() is called within callbacks
2742

28-
if (this._readableState) {
29-
this._readableState.destroyed = true;
43+
if (w) {
44+
w.destroyed = true;
3045
}
31-
32-
// If this is a duplex stream mark the writable part as destroyed as well
33-
if (this._writableState) {
34-
this._writableState.destroyed = true;
46+
if (r) {
47+
r.destroyed = true;
3548
}
3649

3750
this._destroy(err || null, (err) => {
38-
if (!cb && err) {
39-
if (!this._writableState) {
40-
process.nextTick(emitErrorAndCloseNT, this, err);
41-
} else if (!this._writableState.errorEmitted) {
42-
this._writableState.errorEmitted = true;
43-
process.nextTick(emitErrorAndCloseNT, this, err);
44-
} else {
45-
process.nextTick(emitCloseNT, this);
46-
}
47-
} else if (cb) {
51+
if (cb) {
4852
process.nextTick(emitCloseNT, this);
4953
cb(err);
54+
} else if (needError(this, err)) {
55+
process.nextTick(emitErrorAndCloseNT, this, err);
5056
} else {
5157
process.nextTick(emitCloseNT, this);
5258
}
@@ -61,29 +67,36 @@ function emitErrorAndCloseNT(self, err) {
6167
}
6268

6369
function emitCloseNT(self) {
64-
if (self._writableState && !self._writableState.emitClose)
70+
const r = self._readableState;
71+
const w = self._writableState;
72+
73+
if (w && !w.emitClose)
6574
return;
66-
if (self._readableState && !self._readableState.emitClose)
75+
if (r && !r.emitClose)
6776
return;
6877
self.emit('close');
6978
}
7079

7180
function undestroy() {
72-
if (this._readableState) {
73-
this._readableState.destroyed = false;
74-
this._readableState.reading = false;
75-
this._readableState.ended = false;
76-
this._readableState.endEmitted = false;
81+
const r = this._readableState;
82+
const w = this._writableState;
83+
84+
if (r) {
85+
r.destroyed = false;
86+
r.reading = false;
87+
r.ended = false;
88+
r.endEmitted = false;
89+
r.errorEmitted = false;
7790
}
7891

79-
if (this._writableState) {
80-
this._writableState.destroyed = false;
81-
this._writableState.ended = false;
82-
this._writableState.ending = false;
83-
this._writableState.finalCalled = false;
84-
this._writableState.prefinished = false;
85-
this._writableState.finished = false;
86-
this._writableState.errorEmitted = false;
92+
if (w) {
93+
w.destroyed = false;
94+
w.ended = false;
95+
w.ending = false;
96+
w.finalCalled = false;
97+
w.prefinished = false;
98+
w.finished = false;
99+
w.errorEmitted = false;
87100
}
88101
}
89102

@@ -98,12 +111,12 @@ function errorOrDestroy(stream, err) {
98111
// the error to be emitted nextTick. In a future
99112
// semver major update we should change the default to this.
100113

101-
const rState = stream._readableState;
102-
const wState = stream._writableState;
114+
const r = stream._readableState;
115+
const w = stream._writableState;
103116

104-
if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
117+
if ((r && r.autoDestroy) || (w && w.autoDestroy))
105118
stream.destroy(err);
106-
else
119+
else if (needError(stream, err))
107120
stream.emit('error', err);
108121
}
109122

test/parallel/test-net-connect-buffer.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,14 @@ tcp.listen(0, common.mustCall(function() {
6969
[],
7070
{}
7171
].forEach((value) => {
72-
common.expectsError(() => socket.write(value), {
72+
// We need to check the callback since 'error' will only
73+
// be emitted once per instance.
74+
socket.write(value, common.expectsError({
7375
code: 'ERR_INVALID_ARG_TYPE',
7476
type: TypeError,
7577
message: 'The "chunk" argument must be one of type string or Buffer. ' +
7678
`Received type ${typeof value}`
77-
});
79+
}));
7880
});
7981

8082
// Write a string that contains a multi-byte character sequence to test that
+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { Writable, Readable } = require('stream');
4+
5+
{
6+
const writable = new Writable();
7+
writable.on('error', common.mustCall());
8+
writable.end();
9+
writable.write('h');
10+
writable.write('h');
11+
}
12+
13+
{
14+
const readable = new Readable();
15+
readable.on('error', common.mustCall());
16+
readable.push(null);
17+
readable.push('h');
18+
readable.push('h');
19+
}

test/parallel/test-stream-readable-invalid-chunk.js

+24-9
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,32 @@
33
const common = require('../common');
44
const stream = require('stream');
55

6-
const readable = new stream.Readable({
7-
read: () => {}
8-
});
9-
10-
function checkError(fn) {
11-
common.expectsError(fn, {
6+
function testPushArg(val) {
7+
const readable = new stream.Readable({
8+
read: () => {}
9+
});
10+
readable.on('error', common.expectsError({
1211
code: 'ERR_INVALID_ARG_TYPE',
1312
type: TypeError
13+
}));
14+
readable.push(val);
15+
}
16+
17+
testPushArg([]);
18+
testPushArg({});
19+
testPushArg(0);
20+
21+
function testUnshiftArg(val) {
22+
const readable = new stream.Readable({
23+
read: () => {}
1424
});
25+
readable.on('error', common.expectsError({
26+
code: 'ERR_INVALID_ARG_TYPE',
27+
type: TypeError
28+
}));
29+
readable.unshift(val);
1530
}
1631

17-
checkError(() => readable.push([]));
18-
checkError(() => readable.push({}));
19-
checkError(() => readable.push(0));
32+
testUnshiftArg([]);
33+
testUnshiftArg({});
34+
testUnshiftArg(0);

test/parallel/test-stream-readable-unshift.js

-17
Original file line numberDiff line numberDiff line change
@@ -112,23 +112,6 @@ const { Readable } = require('stream');
112112

113113
}
114114

115-
{
116-
// Check that error is thrown for invalid chunks
117-
118-
const readable = new Readable({ read() {} });
119-
function checkError(fn) {
120-
common.expectsError(fn, {
121-
code: 'ERR_INVALID_ARG_TYPE',
122-
type: TypeError
123-
});
124-
}
125-
126-
checkError(() => readable.unshift([]));
127-
checkError(() => readable.unshift({}));
128-
checkError(() => readable.unshift(0));
129-
130-
}
131-
132115
{
133116
// Check that ObjectMode works
134117
const readable = new Readable({ objectMode: true, read() {} });

test/parallel/test-stream-unshift-read-race.js

+1-7
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,7 @@ w._write = function(chunk, encoding, cb) {
8686
};
8787

8888
r.on('end', common.mustCall(function() {
89-
common.expectsError(function() {
90-
r.unshift(Buffer.allocUnsafe(1));
91-
}, {
92-
code: 'ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
93-
type: Error,
94-
message: 'stream.unshift() after end event'
95-
});
89+
r.unshift(Buffer.allocUnsafe(1));
9690
w.end();
9791
}));
9892

test/parallel/test-stream2-writable.js

+39
Original file line numberDiff line numberDiff line change
@@ -402,3 +402,42 @@ const helloWorldBuffer = Buffer.from('hello world');
402402
w.write(Buffer.allocUnsafe(1));
403403
w.end(Buffer.allocUnsafe(0));
404404
}
405+
406+
{
407+
// Verify that error is only emitted once when failing in _finish.
408+
const w = new W();
409+
410+
w._final = common.mustCall(function(cb) {
411+
cb(new Error('test'));
412+
});
413+
w.on('error', common.mustCall((err) => {
414+
assert.strictEqual(w._writableState.errorEmitted, true);
415+
assert.strictEqual(err.message, 'test');
416+
w.on('error', common.mustNotCall());
417+
w.destroy(new Error());
418+
}));
419+
w.end();
420+
}
421+
422+
{
423+
// Verify that error is only emitted once when failing in write.
424+
const w = new W();
425+
w.on('error', common.mustCall((err) => {
426+
assert.strictEqual(w._writableState.errorEmitted, true);
427+
assert.strictEqual(err.code, 'ERR_STREAM_NULL_VALUES');
428+
}));
429+
w.write(null);
430+
w.destroy(new Error());
431+
}
432+
433+
{
434+
// Verify that error is only emitted once when failing in write after end.
435+
const w = new W();
436+
w.on('error', common.mustCall((err) => {
437+
assert.strictEqual(w._writableState.errorEmitted, true);
438+
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
439+
}));
440+
w.end();
441+
w.write('hello');
442+
w.destroy(new Error());
443+
}

0 commit comments

Comments
 (0)