Skip to content

Commit

Permalink
stream: add isDisturbed helper
Browse files Browse the repository at this point in the history
Adds a helper util used to determine whether a stream has been
disturbed (read or cancelled).

Refs: nodejs#39627

PR-URL: nodejs#39628
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
ronag committed Aug 20, 2021
1 parent 7410d41 commit ef6ccc1
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 11 deletions.
29 changes: 26 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1259,16 +1259,27 @@ added: v11.4.0
Is `true` if it is safe to call [`readable.read()`][stream-read], which means
the stream has not been destroyed or emitted `'error'` or `'end'`.

##### `readable.readableAborted`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* {boolean}

Returns whether the stream was destroyed or errored before emitting `'end'`.

##### `readable.readableDidRead`
<!-- YAML
added: v16.7.0
-->

> Stability: 1 - Experimental
* {boolean}

Allows determining if the stream has been or is about to be read.
Returns true if `'data'`, `'end'`, `'error'` or `'close'` has been
emitted.
Returns whether `'data'` has been emitted.

##### `readable.readableEncoding`
<!-- YAML
Expand Down Expand Up @@ -1943,6 +1954,18 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
the strings or buffers be iterated to match the other streams semantics
for performance reasons.

### `stream.Readable.isDisturbed(stream)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `stream` {stream.Readable|ReadableStream}
* Returns: `boolean`

Returns whether the stream has been read from or cancelled.

### `stream.addAbortSignal(signal, stream)`
<!-- YAML
added: v15.4.0
Expand Down
15 changes: 9 additions & 6 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1191,12 +1191,15 @@ ObjectDefineProperties(Readable.prototype, {
readableDidRead: {
enumerable: false,
get: function() {
return (
this._readableState.dataEmitted ||
this._readableState.endEmitted ||
this._readableState.errorEmitted ||
this._readableState.closeEmitted
);
return this._readableState.dataEmitted;
}
},

readableAborted: {
enumerable: false,
get: function() {
return !!(this._readableState.destroyed || this._readableState.errored) &&
!this._readableState.endEmitted;
}
},

Expand Down
11 changes: 11 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const {
SymbolIterator,
} = primordials;

const kIsDisturbed = Symbol('kIsDisturbed');

function isReadable(obj) {
return !!(obj && typeof obj.pipe === 'function' &&
typeof obj.on === 'function');
Expand All @@ -27,7 +29,16 @@ function isIterable(obj, isAsync) {
typeof obj[SymbolIterator] === 'function';
}

function isDisturbed(stream) {
return !!(stream && (
stream.readableDidRead ||
stream.readableAborted ||
stream[kIsDisturbed]
));
}

module.exports = {
isDisturbed,
isIterable,
isReadable,
isStream,
Expand Down
9 changes: 9 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ const {
queueMicrotask,
} = require('internal/process/task_queues');

const {
kIsDisturbed,
} = require('internal/streams/utils');

const {
ArrayBufferViewGetBuffer,
ArrayBufferViewGetByteLength,
Expand Down Expand Up @@ -200,6 +204,7 @@ class ReadableStream {
promise: undefined,
}
};

// The spec requires handling of the strategy first
// here. Specifically, if getting the size and
// highWaterMark from the strategy fail, that has
Expand Down Expand Up @@ -232,6 +237,10 @@ class ReadableStream {
return makeTransferable(this);
}

get [kIsDisturbed]() {
return this[kState].disturbed;
}

/**
* @readonly
* @type {boolean}
Expand Down
1 change: 1 addition & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const internalBuffer = require('internal/buffer');
const promises = require('stream/promises');

const Stream = module.exports = require('internal/streams/legacy').Stream;
Stream.isDisturbed = require('internal/streams/utils').isDisturbed;
Stream.Readable = require('internal/streams/readable');
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Expand Down
57 changes: 57 additions & 0 deletions test/parallel/test-stream-readable-aborted.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');

{
const readable = new Readable({
read() {
}
});
assert.strictEqual(readable.readableAborted, false);
readable.destroy();
assert.strictEqual(readable.readableAborted, true);
}

{
const readable = new Readable({
read() {
}
});
assert.strictEqual(readable.readableAborted, false);
readable.push(null);
readable.destroy();
assert.strictEqual(readable.readableAborted, true);
}

{
const readable = new Readable({
read() {
}
});
assert.strictEqual(readable.readableAborted, false);
readable.push('asd');
readable.destroy();
assert.strictEqual(readable.readableAborted, true);
}

{
const readable = new Readable({
read() {
}
});
assert.strictEqual(readable.readableAborted, false);
readable.push('asd');
readable.push(null);
assert.strictEqual(readable.readableAborted, false);
readable.on('end', common.mustCall(() => {
assert.strictEqual(readable.readableAborted, false);
readable.destroy();
assert.strictEqual(readable.readableAborted, false);
queueMicrotask(() => {
assert.strictEqual(readable.readableAborted, false);
});
}));
readable.resume();
}
8 changes: 6 additions & 2 deletions test/parallel/test-stream-readable-didRead.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const Readable = require('stream').Readable;
const { isDisturbed, Readable } = require('stream');

function noop() {}

function check(readable, data, fn) {
assert.strictEqual(readable.readableDidRead, false);
assert.strictEqual(isDisturbed(readable), false);
if (data === -1) {
readable.on('error', common.mustCall());
readable.on('data', common.mustNotCall());
Expand All @@ -27,7 +28,10 @@ function check(readable, data, fn) {
readable.on('close', common.mustCall());
fn();
setImmediate(() => {
assert.strictEqual(readable.readableDidRead, true);
assert.strictEqual(readable.readableDidRead, data > 0);
if (data > 0) {
assert.strictEqual(isDisturbed(readable), true);
}
});
}

Expand Down
50 changes: 50 additions & 0 deletions test/parallel/test-whatwg-readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
'use strict';

const common = require('../common');
const { isDisturbed } = require('stream');
const assert = require('assert');
const {
isPromise,
Expand Down Expand Up @@ -1520,3 +1521,52 @@ class Source {
readableByteStreamControllerClose(controller);
readableByteStreamControllerEnqueue(controller);
}

{
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
controller.close();
},
pull: common.mustNotCall(),
});

const reader = stream.getReader();
(async () => {
isDisturbed(stream, false);
await reader.read();
isDisturbed(stream, true);
})().then(common.mustCall());
}

{
const stream = new ReadableStream({
start(controller) {
controller.close();
},
pull: common.mustNotCall(),
});

const reader = stream.getReader();
(async () => {
isDisturbed(stream, false);
await reader.read();
isDisturbed(stream, true);
})().then(common.mustCall());
}

{
const stream = new ReadableStream({
start(controller) {
},
pull: common.mustNotCall(),
});
stream.cancel();

const reader = stream.getReader();
(async () => {
isDisturbed(stream, false);
await reader.read();
isDisturbed(stream, true);
})().then(common.mustCall());
}

0 comments on commit ef6ccc1

Please sign in to comment.