Skip to content

Commit bb275ef

Browse files
committed
stream: unify stream utils
Unify stream helps into utils. PR-URL: #39294 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent c4f8363 commit bb275ef

10 files changed

+235
-111
lines changed

lib/_http_client.js

-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ const {
5353
prepareError,
5454
} = require('_http_common');
5555
const { OutgoingMessage } = require('_http_outgoing');
56-
const { kDestroy } = require('internal/streams/destroy');
5756
const Agent = require('_http_agent');
5857
const { Buffer } = require('buffer');
5958
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
@@ -610,7 +609,6 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
610609
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
611610
req.res = res;
612611
res.req = req;
613-
res[kDestroy] = null;
614612

615613
// Add our listener first, so that we guarantee socket cleanup
616614
res.on('end', responseOnEnd);

lib/_http_incoming.js

-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ const {
3131
} = primordials;
3232

3333
const { Readable, finished } = require('stream');
34-
const { kDestroy } = require('internal/streams/destroy');
3534

3635
const kHeaders = Symbol('kHeaders');
3736
const kHeadersCount = Symbol('kHeadersCount');
@@ -199,11 +198,6 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
199198
}
200199
};
201200

202-
IncomingMessage.prototype[kDestroy] = function(err) {
203-
this.socket = null;
204-
this.destroy(err);
205-
};
206-
207201
IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
208202
function _addHeaderLines(headers, n) {
209203
if (headers && headers.length) {

lib/internal/streams/add-abort-signal.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ const validateAbortSignal = (signal, name) => {
1818
}
1919
};
2020

21-
function isStream(obj) {
21+
function isNodeStream(obj) {
2222
return !!(obj && typeof obj.pipe === 'function');
2323
}
2424

2525
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
2626
validateAbortSignal(signal, 'signal');
27-
if (!isStream(stream)) {
27+
if (!isNodeStream(stream)) {
2828
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
2929
}
3030
return module.exports.addAbortSignalNoValidate(signal, stream);

lib/internal/streams/destroy.js

+10-19
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ const {
1010
const {
1111
Symbol,
1212
} = primordials;
13+
const {
14+
kDestroyed,
15+
isDestroyed,
16+
isFinished,
17+
isServerRequest
18+
} = require('internal/streams/utils');
1319

1420
const kDestroy = Symbol('kDestroy');
1521
const kConstruct = Symbol('kConstruct');
@@ -364,8 +370,6 @@ function isRequest(stream) {
364370
return stream && stream.setHeader && typeof stream.abort === 'function';
365371
}
366372

367-
const kDestroyed = Symbol('kDestroyed');
368-
369373
function emitCloseLegacy(stream) {
370374
stream.emit('close');
371375
}
@@ -375,31 +379,20 @@ function emitErrorCloseLegacy(stream, err) {
375379
process.nextTick(emitCloseLegacy, stream);
376380
}
377381

378-
function isDestroyed(stream) {
379-
return stream.destroyed || stream[kDestroyed];
380-
}
381-
382-
function isReadable(stream) {
383-
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
384-
}
385-
386-
function isWritable(stream) {
387-
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
388-
}
389-
390382
// Normalize destroy for legacy.
391383
function destroyer(stream, err) {
392384
if (isDestroyed(stream)) {
393385
return;
394386
}
395387

396-
if (!err && (isReadable(stream) || isWritable(stream))) {
388+
if (!err && !isFinished(stream)) {
397389
err = new AbortError();
398390
}
399391

400392
// TODO: Remove isRequest branches.
401-
if (typeof stream[kDestroy] === 'function') {
402-
stream[kDestroy](err);
393+
if (isServerRequest(stream)) {
394+
stream.socket = null;
395+
stream.destroy(err);
403396
} else if (isRequest(stream)) {
404397
stream.abort();
405398
} else if (isRequest(stream.req)) {
@@ -421,8 +414,6 @@ function destroyer(stream, err) {
421414
}
422415

423416
module.exports = {
424-
kDestroy,
425-
isDestroyed,
426417
construct,
427418
destroyer,
428419
destroy,

lib/internal/streams/end-of-stream.js

+29-61
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,23 @@ const {
1717
validateObject,
1818
} = require('internal/validators');
1919

20+
const {
21+
isClosed,
22+
isReadable,
23+
isReadableNodeStream,
24+
isReadableFinished,
25+
isWritable,
26+
isWritableNodeStream,
27+
isWritableFinished,
28+
willEmitClose: _willEmitClose,
29+
} = require('internal/streams/utils');
30+
2031
function isRequest(stream) {
2132
return stream.setHeader && typeof stream.abort === 'function';
2233
}
2334

24-
function isServerResponse(stream) {
25-
return (
26-
typeof stream._sent100 === 'boolean' &&
27-
typeof stream._removedConnection === 'boolean' &&
28-
typeof stream._removedContLen === 'boolean' &&
29-
typeof stream._removedTE === 'boolean' &&
30-
typeof stream._closed === 'boolean'
31-
);
32-
}
33-
34-
function isReadable(stream) {
35-
return typeof stream.readable === 'boolean' ||
36-
typeof stream.readableEnded === 'boolean' ||
37-
!!stream._readableState;
38-
}
39-
40-
function isWritable(stream) {
41-
return typeof stream.writable === 'boolean' ||
42-
typeof stream.writableEnded === 'boolean' ||
43-
!!stream._writableState;
44-
}
45-
46-
function isWritableFinished(stream) {
47-
if (stream.writableFinished) return true;
48-
const wState = stream._writableState;
49-
if (!wState || wState.errored) return false;
50-
return wState.finished || (wState.ended && wState.length === 0);
51-
}
52-
5335
const nop = () => {};
5436

55-
function isReadableEnded(stream) {
56-
if (stream.readableEnded) return true;
57-
const rState = stream._readableState;
58-
if (!rState || rState.errored) return false;
59-
return rState.endEmitted || (rState.ended && rState.length === 0);
60-
}
61-
6237
function eos(stream, options, callback) {
6338
if (arguments.length === 2) {
6439
callback = options;
@@ -74,13 +49,12 @@ function eos(stream, options, callback) {
7449
callback = once(callback);
7550

7651
const readable = options.readable ||
77-
(options.readable !== false && isReadable(stream));
52+
(options.readable !== false && isReadableNodeStream(stream));
7853
const writable = options.writable ||
79-
(options.writable !== false && isWritable(stream));
54+
(options.writable !== false && isWritableNodeStream(stream));
8055

8156
const wState = stream._writableState;
8257
const rState = stream._readableState;
83-
const state = wState || rState;
8458

8559
const onlegacyfinish = () => {
8660
if (!stream.writable) onfinish();
@@ -89,16 +63,13 @@ function eos(stream, options, callback) {
8963
// TODO (ronag): Improve soft detection to include core modules and
9064
// common ecosystem modules that do properly emit 'close' but fail
9165
// this generic check.
92-
let willEmitClose = isServerResponse(stream) || (
93-
state &&
94-
state.autoDestroy &&
95-
state.emitClose &&
96-
state.closed === false &&
97-
isReadable(stream) === readable &&
98-
isWritable(stream) === writable
66+
let willEmitClose = (
67+
_willEmitClose(stream) &&
68+
isReadableNodeStream(stream) === readable &&
69+
isWritableNodeStream(stream) === writable
9970
);
10071

101-
let writableFinished = stream.writableFinished || wState?.finished;
72+
let writableFinished = isWritableFinished(stream, false);
10273
const onfinish = () => {
10374
writableFinished = true;
10475
// Stream should not be destroyed here. If it is that
@@ -107,12 +78,12 @@ function eos(stream, options, callback) {
10778
if (stream.destroyed) willEmitClose = false;
10879

10980
if (willEmitClose && (!stream.readable || readable)) return;
110-
if (!readable || readableEnded) callback.call(stream);
81+
if (!readable || readableFinished) callback.call(stream);
11182
};
11283

113-
let readableEnded = stream.readableEnded || rState?.endEmitted;
84+
let readableFinished = isReadableFinished(stream, false);
11485
const onend = () => {
115-
readableEnded = true;
86+
readableFinished = true;
11687
// Stream should not be destroyed here. If it is that
11788
// means that user space is doing something differently and
11889
// we cannot trust willEmitClose.
@@ -126,7 +97,7 @@ function eos(stream, options, callback) {
12697
callback.call(stream, err);
12798
};
12899

129-
let closed = wState?.closed || rState?.closed;
100+
let closed = isClosed(stream);
130101

131102
const onclose = () => {
132103
closed = true;
@@ -137,13 +108,13 @@ function eos(stream, options, callback) {
137108
return callback.call(stream, errored);
138109
}
139110

140-
if (readable && !readableEnded) {
141-
if (!isReadableEnded(stream))
111+
if (readable && !readableFinished) {
112+
if (!isReadableFinished(stream, false))
142113
return callback.call(stream,
143114
new ERR_STREAM_PREMATURE_CLOSE());
144115
}
145116
if (writable && !writableFinished) {
146-
if (!isWritableFinished(stream))
117+
if (!isWritableFinished(stream, false))
147118
return callback.call(stream,
148119
new ERR_STREAM_PREMATURE_CLOSE());
149120
}
@@ -185,19 +156,16 @@ function eos(stream, options, callback) {
185156
}
186157
} else if (
187158
!readable &&
188-
(!willEmitClose || stream.readable) &&
189-
writableFinished
159+
(!willEmitClose || isReadable(stream)) &&
160+
(writableFinished || !isWritable(stream))
190161
) {
191162
process.nextTick(onclose);
192163
} else if (
193164
!writable &&
194-
(!willEmitClose || stream.writable) &&
195-
readableEnded
165+
(!willEmitClose || isWritable(stream)) &&
166+
(readableFinished || !isReadable(stream))
196167
) {
197168
process.nextTick(onclose);
198-
} else if (!wState && !rState && stream._closed === true) {
199-
// _closed is for OutgoingMessage which is not a proper Writable.
200-
process.nextTick(onclose);
201169
} else if ((rState && stream.req && stream.aborted)) {
202170
process.nextTick(onclose);
203171
}

lib/internal/streams/pipeline.js

+7-7
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');
2727

2828
const {
2929
isIterable,
30-
isReadable,
31-
isStream,
30+
isReadableNodeStream,
31+
isNodeStream,
3232
} = require('internal/streams/utils');
3333

3434
let PassThrough;
@@ -87,7 +87,7 @@ function popCallback(streams) {
8787
function makeAsyncIterable(val) {
8888
if (isIterable(val)) {
8989
return val;
90-
} else if (isReadable(val)) {
90+
} else if (isReadableNodeStream(val)) {
9191
// Legacy streams are not Iterable.
9292
return fromReadable(val);
9393
}
@@ -204,7 +204,7 @@ function pipeline(...streams) {
204204
const reading = i < streams.length - 1;
205205
const writing = i > 0;
206206

207-
if (isStream(stream)) {
207+
if (isNodeStream(stream)) {
208208
finishCount++;
209209
destroys.push(destroyer(stream, reading, writing, finish));
210210
}
@@ -216,7 +216,7 @@ function pipeline(...streams) {
216216
throw new ERR_INVALID_RETURN_VALUE(
217217
'Iterable, AsyncIterable or Stream', 'source', ret);
218218
}
219-
} else if (isIterable(stream) || isReadable(stream)) {
219+
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
220220
ret = stream;
221221
} else {
222222
throw new ERR_INVALID_ARG_TYPE(
@@ -271,8 +271,8 @@ function pipeline(...streams) {
271271
finishCount++;
272272
destroys.push(destroyer(ret, false, true, finish));
273273
}
274-
} else if (isStream(stream)) {
275-
if (isReadable(ret)) {
274+
} else if (isNodeStream(stream)) {
275+
if (isReadableNodeStream(ret)) {
276276
ret.pipe(stream);
277277

278278
// Compat. Before node v10.12.0 stdio used to throw an error so

0 commit comments

Comments
 (0)