Skip to content

Commit 0d73c31

Browse files
committed
stream: add isDisturbed helper
Adds a helper util used to determine whether a stream has been disturbed (read or cancelled). Refs: #39627
1 parent 6ca23d7 commit 0d73c31

File tree

3 files changed

+21
-0
lines changed

3 files changed

+21
-0
lines changed

lib/internal/streams/utils.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
} = primordials;
88

99
const kDestroyed = Symbol('kDestroyed');
10+
const kIsDisturbed = Symbol('kIsDisturbed');
1011

1112
function isReadableNodeStream(obj) {
1213
return !!(
@@ -195,8 +196,18 @@ function willEmitClose(stream) {
195196
);
196197
}
197198

199+
function isDisturbed(stream) {
200+
return stream && (
201+
stream.readableDidRead ||
202+
stream.destroyed ||
203+
stream[kIsDisturbed]
204+
);
205+
}
206+
198207
module.exports = {
199208
kDestroyed,
209+
isDisturbed,
210+
kIsDisturbed,
200211
isClosed,
201212
isDestroyed,
202213
isFinished,

lib/internal/webstreams/readablestream.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ const {
8080
queueMicrotask,
8181
} = require('internal/process/task_queues');
8282

83+
const {
84+
kIsDisturbed,
85+
} = require('internal/stream/utils');
86+
8387
const {
8488
ArrayBufferViewGetBuffer,
8589
ArrayBufferViewGetByteLength,
@@ -200,6 +204,7 @@ class ReadableStream {
200204
promise: undefined,
201205
}
202206
};
207+
203208
// The spec requires handling of the strategy first
204209
// here. Specifically, if getting the size and
205210
// highWaterMark from the strategy fail, that has
@@ -232,6 +237,10 @@ class ReadableStream {
232237
return makeTransferable(this);
233238
}
234239

240+
get [kIsDisturbed]() {
241+
return this[kState].disturbed;
242+
}
243+
235244
/**
236245
* @readonly
237246
* @type {boolean}

lib/stream.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const internalBuffer = require('internal/buffer');
3838
const promises = require('stream/promises');
3939

4040
const Stream = module.exports = require('internal/streams/legacy').Stream;
41+
Stream.isDisturbed = require('internal/streams/utils').isDisturbed;
4142
Stream.Readable = require('internal/streams/readable');
4243
Stream.Writable = require('internal/streams/writable');
4344
Stream.Duplex = require('internal/streams/duplex');

0 commit comments

Comments
 (0)