Skip to content

Commit f4609bd

Browse files
ronagdnlup
authored andcommitted
stream: bypass legacy destroy for pipeline and async iteration
PR-URL: #38505 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent c0becbc commit f4609bd

File tree

6 files changed

+188
-12
lines changed

6 files changed

+188
-12
lines changed

lib/_http_client.js

+2
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const {
5353
prepareError,
5454
} = require('_http_common');
5555
const { OutgoingMessage } = require('_http_outgoing');
56+
const { kDestroy } = require('internal/streams/destroy');
5657
const Agent = require('_http_agent');
5758
const { Buffer } = require('buffer');
5859
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
@@ -609,6 +610,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
609610
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
610611
req.res = res;
611612
res.req = req;
613+
res[kDestroy] = null;
612614

613615
// Add our listener first, so that we guarantee socket cleanup
614616
res.on('end', responseOnEnd);

lib/_http_incoming.js

+8-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const {
3131
} = primordials;
3232

3333
const { Readable, finished } = require('stream');
34+
const { kDestroy } = require('internal/streams/destroy');
3435

3536
const kHeaders = Symbol('kHeaders');
3637
const kHeadersCount = Symbol('kHeadersCount');
@@ -188,13 +189,18 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
188189
this.socket.destroy(err);
189190
const cleanup = finished(this.socket, (e) => {
190191
cleanup();
191-
onError(this, e || err, cb);
192+
process.nextTick(onError, this, e || err, cb);
192193
});
193194
} else {
194-
onError(this, err, cb);
195+
process.nextTick(onError, this, err, cb);
195196
}
196197
};
197198

199+
IncomingMessage.prototype[kDestroy] = function(err) {
200+
this.socket = null;
201+
this.destroy(err);
202+
};
203+
198204
IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
199205
function _addHeaderLines(headers, n) {
200206
if (headers && headers.length) {

lib/_http_server.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,7 @@ function onServerResponseClose() {
231231
// where the ServerResponse object has already been deconstructed.
232232
// Fortunately, that requires only a single if check. :-)
233233
if (this._httpMessage) {
234-
this._httpMessage.destroyed = true;
235-
this._httpMessage._closed = true;
236-
this._httpMessage.emit('close');
234+
emitCloseNT(this._httpMessage);
237235
}
238236
}
239237

@@ -837,9 +835,11 @@ function resOnFinish(req, res, socket, state, server) {
837835
}
838836

839837
function emitCloseNT(self) {
840-
self.destroyed = true;
841-
self._closed = true;
842-
self.emit('close');
838+
if (!self.destroyed) {
839+
self.destroyed = true;
840+
self._closed = true;
841+
self.emit('close');
842+
}
843843
}
844844

845845
// The following callback is issued after the headers have been read on a

lib/internal/streams/destroy.js

+55-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const {
55
codes: {
66
ERR_MULTIPLE_CALLBACK,
77
},
8+
AbortError,
89
} = require('internal/errors');
910
const {
1011
Symbol,
@@ -363,15 +364,65 @@ function isRequest(stream) {
363364
return stream && stream.setHeader && typeof stream.abort === 'function';
364365
}
365366

367+
const kDestroyed = Symbol('kDestroyed');
368+
369+
function emitCloseLegacy(stream) {
370+
stream.emit('close');
371+
}
372+
373+
function emitErrorCloseLegacy(stream, err) {
374+
stream.emit('error', err);
375+
process.nextTick(emitCloseLegacy, stream);
376+
}
377+
378+
function isDestroyed(stream) {
379+
return stream.destroyed || stream[kDestroyed];
380+
}
381+
382+
function isReadable(stream) {
383+
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
384+
}
385+
386+
function isWritable(stream) {
387+
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
388+
}
389+
366390
// Normalize destroy for legacy.
367391
function destroyer(stream, err) {
368-
if (isRequest(stream)) return stream.abort();
369-
if (isRequest(stream.req)) return stream.req.abort();
370-
if (typeof stream.destroy === 'function') return stream.destroy(err);
371-
if (typeof stream.close === 'function') return stream.close();
392+
if (isDestroyed(stream)) {
393+
return;
394+
}
395+
396+
if (!err && (isReadable(stream) || isWritable(stream))) {
397+
err = new AbortError();
398+
}
399+
400+
// TODO: Remove isRequest branches.
401+
if (typeof stream[kDestroy] === 'function') {
402+
stream[kDestroy](err);
403+
} else if (isRequest(stream)) {
404+
stream.abort();
405+
} else if (isRequest(stream.req)) {
406+
stream.req.abort();
407+
} else if (typeof stream.destroy === 'function') {
408+
stream.destroy(err);
409+
} else if (typeof stream.close === 'function') {
410+
// TODO: Don't lose err?
411+
stream.close();
412+
} else if (err) {
413+
process.nextTick(emitErrorCloseLegacy, stream);
414+
} else {
415+
process.nextTick(emitCloseLegacy, stream);
416+
}
417+
418+
if (!stream.destroyed) {
419+
stream[kDestroyed] = true;
420+
}
372421
}
373422

374423
module.exports = {
424+
kDestroy,
425+
isDestroyed,
375426
construct,
376427
destroyer,
377428
destroy,

lib/stream.js

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33+
const { destroyer } = require('internal/streams/destroy');
3334
const eos = require('internal/streams/end-of-stream');
3435
const internalBuffer = require('internal/buffer');
3536

@@ -45,6 +46,7 @@ Stream.pipeline = pipeline;
4546
const { addAbortSignal } = require('internal/streams/add-abort-signal');
4647
Stream.addAbortSignal = addAbortSignal;
4748
Stream.finished = eos;
49+
Stream.destroy = destroyer;
4850

4951
ObjectDefineProperty(Stream, 'promises', {
5052
configurable: true,

test/parallel/test-stream-destroy.js

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Writable,
6+
Readable,
7+
destroy
8+
} = require('stream');
9+
const assert = require('assert');
10+
const http = require('http');
11+
12+
{
13+
const r = new Readable({ read() {} });
14+
destroy(r);
15+
assert.strictEqual(r.destroyed, true);
16+
r.on('error', common.mustCall((err) => {
17+
assert.strictEqual(err.name, 'AbortError');
18+
}));
19+
r.on('close', common.mustCall());
20+
}
21+
22+
{
23+
const r = new Readable({ read() {} });
24+
destroy(r, new Error('asd'));
25+
assert.strictEqual(r.destroyed, true);
26+
r.on('error', common.mustCall((err) => {
27+
assert.strictEqual(err.message, 'asd');
28+
}));
29+
r.on('close', common.mustCall());
30+
}
31+
32+
{
33+
const w = new Writable({ write() {} });
34+
destroy(w);
35+
assert.strictEqual(w.destroyed, true);
36+
w.on('error', common.mustCall((err) => {
37+
assert.strictEqual(err.name, 'AbortError');
38+
}));
39+
w.on('close', common.mustCall());
40+
}
41+
42+
{
43+
const w = new Writable({ write() {} });
44+
destroy(w, new Error('asd'));
45+
assert.strictEqual(w.destroyed, true);
46+
w.on('error', common.mustCall((err) => {
47+
assert.strictEqual(err.message, 'asd');
48+
}));
49+
w.on('close', common.mustCall());
50+
}
51+
52+
{
53+
const server = http.createServer((req, res) => {
54+
destroy(req);
55+
req.on('error', common.mustCall((err) => {
56+
assert.strictEqual(err.name, 'AbortError');
57+
}));
58+
req.on('close', common.mustCall(() => {
59+
res.end('hello');
60+
}));
61+
});
62+
63+
server.listen(0, () => {
64+
const req = http.request({
65+
port: server.address().port
66+
});
67+
68+
req.write('asd');
69+
req.on('response', (res) => {
70+
const buf = [];
71+
res.on('data', (data) => buf.push(data));
72+
res.on('end', common.mustCall(() => {
73+
assert.deepStrictEqual(
74+
Buffer.concat(buf),
75+
Buffer.from('hello')
76+
);
77+
server.close();
78+
}));
79+
});
80+
});
81+
}
82+
83+
{
84+
const server = http.createServer((req, res) => {
85+
req
86+
.resume()
87+
.on('end', () => {
88+
destroy(req);
89+
})
90+
.on('error', common.mustNotCall());
91+
92+
req.on('close', common.mustCall(() => {
93+
res.end('hello');
94+
}));
95+
});
96+
97+
server.listen(0, () => {
98+
const req = http.request({
99+
port: server.address().port
100+
});
101+
102+
req.write('asd');
103+
req.on('response', (res) => {
104+
const buf = [];
105+
res.on('data', (data) => buf.push(data));
106+
res.on('end', common.mustCall(() => {
107+
assert.deepStrictEqual(
108+
Buffer.concat(buf),
109+
Buffer.from('hello')
110+
);
111+
server.close();
112+
}));
113+
});
114+
});
115+
}

0 commit comments

Comments
 (0)