Skip to content

Commit 7ade6e3

Browse files
committed
stream: add isErrored helper
Refs: nodejs/undici#1134
1 parent 1fa507f commit 7ade6e3

File tree

6 files changed

+61
-7
lines changed

6 files changed

+61
-7
lines changed

doc/api/stream.md

+13
Original file line numberDiff line numberDiff line change
@@ -2225,6 +2225,19 @@ added: v16.8.0
22252225

22262226
Returns whether the stream has been read from or cancelled.
22272227

2228+
### `stream.Readable.isErrored(stream)`
2229+
2230+
<!-- YAML
2231+
added: v16.8.0
2232+
-->
2233+
2234+
> Stability: 1 - Experimental
2235+
2236+
* `stream` {stream.Readable|ReadableStream}
2237+
* Returns: `boolean`
2238+
2239+
Returns whether the stream has been errored.
2240+
22282241
### `stream.Readable.toWeb(streamReadable)`
22292242

22302243
<!-- YAML

lib/internal/streams/utils.js

+17-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
} = primordials;
88

99
const kDestroyed = Symbol('kDestroyed');
10+
const kIsErrored = Symbol('kIsErrored');
1011
const kIsDisturbed = Symbol('kIsDisturbed');
1112

1213
function isReadableNodeStream(obj, strict = false) {
@@ -239,16 +240,29 @@ function willEmitClose(stream) {
239240

240241
function isDisturbed(stream) {
241242
return !!(stream && (
242-
stream.readableDidRead ||
243-
stream.readableAborted ||
244-
stream[kIsDisturbed]
243+
stream[kIsDisturbed] ??
244+
(stream.readableDidRead || stream.readableAborted)
245+
));
246+
}
247+
248+
function isErrored(stream) {
249+
return !!(stream && (
250+
stream[kIsErrored] ??
251+
stream.readableErrored ??
252+
stream.writableErrored ??
253+
stream._readableState?.errorEmitted ??
254+
stream._writableState?.errorEmitted ??
255+
stream._readableState?.errored ??
256+
stream._writableState?.errored
245257
));
246258
}
247259

248260
module.exports = {
249261
kDestroyed,
250262
isDisturbed,
263+
isErrored,
251264
kIsDisturbed,
265+
kIsErrored,
252266
isClosed,
253267
isDestroyed,
254268
isDuplexNodeStream,

lib/internal/webstreams/readablestream.js

+5
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ const {
8282

8383
const {
8484
kIsDisturbed,
85+
kIsErrored,
8586
} = require('internal/streams/utils');
8687

8788
const {
@@ -241,6 +242,10 @@ class ReadableStream {
241242
return this[kState].disturbed;
242243
}
243244

245+
get [kIsErrored]() {
246+
return this[kState].state === 'errored';
247+
}
248+
244249
/**
245250
* @readonly
246251
* @type {boolean}

lib/stream.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ const eos = require('internal/streams/end-of-stream');
3636
const internalBuffer = require('internal/buffer');
3737

3838
const promises = require('stream/promises');
39+
const utils = require('internal/streams/utils');
3940

4041
const Stream = module.exports = require('internal/streams/legacy').Stream;
41-
Stream.isDisturbed = require('internal/streams/utils').isDisturbed;
42+
Stream.isDisturbed = utils.isDisturbed;
43+
Stream.isErrored = utils.isErrored;
4244
Stream.Readable = require('internal/streams/readable');
4345
Stream.Writable = require('internal/streams/writable');
4446
Stream.Duplex = require('internal/streams/duplex');

test/parallel/test-stream-readable-didRead.js

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
'use strict';
22
const common = require('../common');
33
const assert = require('assert');
4-
const { isDisturbed, Readable } = require('stream');
4+
const { isDisturbed, isErrored, Readable } = require('stream');
55

66
function noop() {}
77

88
function check(readable, data, fn) {
99
assert.strictEqual(readable.readableDidRead, false);
1010
assert.strictEqual(isDisturbed(readable), false);
11+
assert.strictEqual(isErrored(readable), false);
1112
if (data === -1) {
12-
readable.on('error', common.mustCall());
13+
readable.on('error', common.mustCall(() => {
14+
assert.strictEqual(isErrored(readable), true);
15+
}));
1316
readable.on('data', common.mustNotCall());
1417
readable.on('end', common.mustNotCall());
1518
} else {

test/parallel/test-whatwg-readablestream.js

+18-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
'use strict';
33

44
const common = require('../common');
5-
const { isDisturbed } = require('stream');
5+
const { isDisturbed, isErrored } = require('stream');
66
const assert = require('assert');
77
const {
88
isPromise,
@@ -1572,3 +1572,20 @@ class Source {
15721572
isDisturbed(stream, true);
15731573
})().then(common.mustCall());
15741574
}
1575+
1576+
1577+
{
1578+
const stream = new ReadableStream({
1579+
start(controller) {
1580+
controller.error(new Error());
1581+
},
1582+
pull: common.mustNotCall(),
1583+
});
1584+
1585+
const reader = stream.getReader();
1586+
(async () => {
1587+
isErrored(stream, false);
1588+
await reader.read();
1589+
isErrored(stream, true);
1590+
})().then(common.mustCall());
1591+
}

0 commit comments

Comments
 (0)