Skip to content

Commit 8f86986

Browse files
committed
stream: use callback to properly propagate error
The stream will be destroyed upstream through the proper error flow. PR-URL: #29179 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent b9da063 commit 8f86986

File tree

7 files changed

+95
-15
lines changed

7 files changed

+95
-15
lines changed

lib/_stream_readable.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ function ReadableState(options, stream, isDuplex) {
144144
// Has it been destroyed
145145
this.destroyed = false;
146146

147-
// Indicates whether the stream has errored.
147+
// Indicates whether the stream has errored. When true no further
148+
// _read calls, 'data' or 'readable' events should occur. This is needed
149+
// since when autoDestroy is disabled we need a way to tell whether the
150+
// stream has failed.
148151
this.errored = false;
149152

150153
// Indicates whether the stream has finished destroying.
@@ -258,7 +261,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
258261
addChunk(stream, state, chunk, true);
259262
} else if (state.ended) {
260263
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
261-
} else if (state.destroyed) {
264+
} else if (state.destroyed || state.errored) {
262265
return false;
263266
} else {
264267
state.reading = false;
@@ -453,9 +456,9 @@ Readable.prototype.read = function(n) {
453456
}
454457

455458
// However, if we've ended, then there's no point, if we're already
456-
// reading, then it's unnecessary, and if we're destroyed, then it's
457-
// not allowed.
458-
if (state.ended || state.reading || state.destroyed) {
459+
// reading, then it's unnecessary, and if we're destroyed or errored,
460+
// then it's not allowed.
461+
if (state.ended || state.reading || state.destroyed || state.errored) {
459462
doRead = false;
460463
debug('reading or ended', doRead);
461464
} else if (doRead) {
@@ -553,7 +556,7 @@ function emitReadable(stream) {
553556
function emitReadable_(stream) {
554557
const state = stream._readableState;
555558
debug('emitReadable_', state.destroyed, state.length, state.ended);
556-
if (!state.destroyed && (state.length || state.ended)) {
559+
if (!state.destroyed && !state.errored && (state.length || state.ended)) {
557560
stream.emit('readable');
558561
state.emittedReadable = false;
559562
}

lib/_stream_writable.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,13 @@ function onwrite(stream, er) {
416416

417417
if (er) {
418418
state.errored = true;
419+
420+
// In case of duplex streams we need to notify the readable side of the
421+
// error.
422+
if (stream._readableState) {
423+
stream._readableState.errored = true;
424+
}
425+
419426
if (sync) {
420427
process.nextTick(onwriteError, stream, state, er, cb);
421428
} else {

lib/internal/http2/core.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,10 +1995,19 @@ class Http2Stream extends Duplex {
19951995

19961996
let req;
19971997

1998+
// writeGeneric does not destroy on error and we cannot enable autoDestroy,
1999+
// so make sure to destroy on error.
2000+
const callback = (err) => {
2001+
if (err) {
2002+
this.destroy(err);
2003+
}
2004+
cb(err);
2005+
};
2006+
19982007
if (writev)
1999-
req = writevGeneric(this, data, cb);
2008+
req = writevGeneric(this, data, callback);
20002009
else
2001-
req = writeGeneric(this, data, encoding, cb);
2010+
req = writeGeneric(this, data, encoding, callback);
20022011

20032012
trackWriteState(this, req.bytes);
20042013
}

lib/internal/stream_base_commons.js

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,14 @@ function onWriteComplete(status) {
8888
return;
8989
}
9090

91+
// TODO (ronag): This should be moved before if(stream.destroyed)
92+
// in order to avoid swallowing error.
9193
if (status < 0) {
9294
const ex = errnoException(status, 'write', this.error);
93-
stream.destroy(ex, this.callback);
95+
if (typeof this.callback === 'function')
96+
this.callback(ex);
97+
else
98+
stream.destroy(ex);
9499
return;
95100
}
96101

@@ -134,24 +139,24 @@ function writevGeneric(self, data, cb) {
134139
// Retain chunks
135140
if (err === 0) req._chunks = chunks;
136141

137-
afterWriteDispatched(self, req, err, cb);
142+
afterWriteDispatched(req, err, cb);
138143
return req;
139144
}
140145

141146
function writeGeneric(self, data, encoding, cb) {
142147
const req = createWriteWrap(self[kHandle]);
143148
const err = handleWriteReq(req, data, encoding);
144149

145-
afterWriteDispatched(self, req, err, cb);
150+
afterWriteDispatched(req, err, cb);
146151
return req;
147152
}
148153

149-
function afterWriteDispatched(self, req, err, cb) {
154+
function afterWriteDispatched(req, err, cb) {
150155
req.bytes = streamBaseState[kBytesWritten];
151156
req.async = !!streamBaseState[kLastWriteWasAsync];
152157

153158
if (err !== 0)
154-
return self.destroy(errnoException(err, 'write', req.error), cb);
159+
return cb(errnoException(err, 'write', req.error));
155160

156161
if (!req.async) {
157162
cb();
@@ -264,7 +269,6 @@ function setStreamTimeout(msecs, callback) {
264269
}
265270

266271
module.exports = {
267-
createWriteWrap,
268272
writevGeneric,
269273
writeGeneric,
270274
onStreamRead,
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const net = require('net');
5+
6+
const tcp = net.Server(common.mustCall((s) => {
7+
tcp.close();
8+
9+
let buf = '';
10+
s.setEncoding('utf8');
11+
s.on('data', function(d) {
12+
buf += d;
13+
});
14+
15+
s.on('end', common.mustCall(function() {
16+
console.error('SERVER: end', buf);
17+
assert.strictEqual(buf, "L'État, c'est moi");
18+
s.end();
19+
}));
20+
}));
21+
22+
tcp.listen(0, common.mustCall(function() {
23+
const socket = net.Stream({ highWaterMark: 0 });
24+
25+
let connected = false;
26+
assert.strictEqual(socket.pending, true);
27+
socket.connect(this.address().port, common.mustCall(() => connected = true));
28+
29+
assert.strictEqual(socket.pending, true);
30+
assert.strictEqual(socket.connecting, true);
31+
assert.strictEqual(socket.readyState, 'opening');
32+
33+
// Write a string that contains a multi-byte character sequence to test that
34+
// `bytesWritten` is incremented with the # of bytes, not # of characters.
35+
const a = "L'État, c'est ";
36+
const b = 'moi';
37+
38+
// We're still connecting at this point so the datagram is first pushed onto
39+
// the connect queue. Make sure that it's not added to `bytesWritten` again
40+
// when the actual write happens.
41+
const r = socket.write(a, common.mustCall((er) => {
42+
console.error('write cb');
43+
assert.ok(connected);
44+
assert.strictEqual(socket.bytesWritten, Buffer.from(a + b).length);
45+
assert.strictEqual(socket.pending, false);
46+
}));
47+
socket.on('close', common.mustCall(() => {
48+
assert.strictEqual(socket.pending, true);
49+
}));
50+
51+
assert.strictEqual(socket.bytesWritten, Buffer.from(a).length);
52+
assert.strictEqual(r, false);
53+
socket.end(b);
54+
55+
assert.strictEqual(socket.readyState, 'opening');
56+
}));

test/parallel/test-net-write-arguments.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ assert.throws(() => {
2525
[],
2626
{}
2727
].forEach((value) => {
28+
const socket = net.Stream({ highWaterMark: 0 });
2829
// We need to check the callback since 'error' will only
2930
// be emitted once per instance.
3031
assert.throws(() => {

test/parallel/test-wrap-js-stream-exceptions.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ process.once('uncaughtException', common.mustCall((err) => {
1010
}));
1111

1212
const socket = new JSStreamWrap(new Duplex({
13-
read: common.mustNotCall(),
13+
read: common.mustCall(),
1414
write: common.mustCall((buffer, data, cb) => {
1515
throw new Error('exception!');
1616
})

0 commit comments

Comments
 (0)