Skip to content

Commit 9e38fc6

Browse files
ronagdanielleadams
authored andcommitted
stream: add readableDidRead if has been read from
Adds did read accessor used to determine whether a readable has been read from. PR-URL: #39589 Refs: nodejs/undici#907 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent 4df59bc commit 9e38fc6

File tree

3 files changed

+133
-1
lines changed

3 files changed

+133
-1
lines changed

doc/api/stream.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,17 @@ added: v11.4.0
12591259
Is `true` if it is safe to call [`readable.read()`][stream-read], which means
12601260
the stream has not been destroyed or emitted `'error'` or `'end'`.
12611261

1262+
##### `readable.readableDidRead`
1263+
<!-- YAML
1264+
added: REPLACEME
1265+
-->
1266+
1267+
* {boolean}
1268+
1269+
Allows determining if the stream has been or is about to be read.
1270+
Returns true if `'data'`, `'end'`, `'error'` or `'close'` has been
1271+
emitted.
1272+
12621273
##### `readable.readableEncoding`
12631274
<!-- YAML
12641275
added: v12.7.0

lib/internal/streams/readable.js

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ function ReadableState(options, stream, isDuplex) {
167167
// If true, a maybeReadMore has been scheduled.
168168
this.readingMore = false;
169169

170+
this.dataEmitted = false;
171+
170172
this.decoder = null;
171173
this.encoding = null;
172174
if (options && options.encoding) {
@@ -309,6 +311,7 @@ function addChunk(stream, state, chunk, addToFront) {
309311
} else {
310312
state.awaitDrainWriters = null;
311313
}
314+
state.dataEmitted = true;
312315
stream.emit('data', chunk);
313316
} else {
314317
// Update the buffer info.
@@ -519,8 +522,10 @@ Readable.prototype.read = function(n) {
519522
endReadable(this);
520523
}
521524

522-
if (ret !== null)
525+
if (ret !== null) {
526+
state.dataEmitted = true;
523527
this.emit('data', ret);
528+
}
524529

525530
return ret;
526531
};
@@ -1183,6 +1188,18 @@ ObjectDefineProperties(Readable.prototype, {
11831188
}
11841189
},
11851190

1191+
readableDidRead: {
1192+
enumerable: false,
1193+
get: function() {
1194+
return (
1195+
this._readableState.dataEmitted ||
1196+
this._readableState.endEmitted ||
1197+
this._readableState.errorEmitted ||
1198+
this._readableState.closeEmitted
1199+
);
1200+
}
1201+
},
1202+
11861203
readableHighWaterMark: {
11871204
enumerable: false,
11881205
get: function() {
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const Readable = require('stream').Readable;
5+
6+
function noop() {}
7+
8+
function check(readable, data, fn) {
9+
assert.strictEqual(readable.readableDidRead, false);
10+
if (data === -1) {
11+
readable.on('error', common.mustCall());
12+
readable.on('data', common.mustNotCall());
13+
readable.on('end', common.mustNotCall());
14+
} else {
15+
readable.on('error', common.mustNotCall());
16+
if (data === -2) {
17+
readable.on('end', common.mustNotCall());
18+
} else {
19+
readable.on('end', common.mustCall());
20+
}
21+
if (data > 0) {
22+
readable.on('data', common.mustCallAtLeast(data));
23+
} else {
24+
readable.on('data', common.mustNotCall());
25+
}
26+
}
27+
readable.on('close', common.mustCall());
28+
fn();
29+
setImmediate(() => {
30+
assert.strictEqual(readable.readableDidRead, true);
31+
});
32+
}
33+
34+
{
35+
const readable = new Readable({
36+
read() {
37+
this.push(null);
38+
}
39+
});
40+
check(readable, 0, () => {
41+
readable.read();
42+
});
43+
}
44+
45+
{
46+
const readable = new Readable({
47+
read() {
48+
this.push(null);
49+
}
50+
});
51+
check(readable, 0, () => {
52+
readable.resume();
53+
});
54+
}
55+
56+
{
57+
const readable = new Readable({
58+
read() {
59+
this.push(null);
60+
}
61+
});
62+
check(readable, -2, () => {
63+
readable.destroy();
64+
});
65+
}
66+
67+
{
68+
const readable = new Readable({
69+
read() {
70+
this.push(null);
71+
}
72+
});
73+
74+
check(readable, -1, () => {
75+
readable.destroy(new Error());
76+
});
77+
}
78+
79+
{
80+
const readable = new Readable({
81+
read() {
82+
this.push('data');
83+
this.push(null);
84+
}
85+
});
86+
87+
check(readable, 1, () => {
88+
readable.on('data', noop);
89+
});
90+
}
91+
92+
{
93+
const readable = new Readable({
94+
read() {
95+
this.push('data');
96+
this.push(null);
97+
}
98+
});
99+
100+
check(readable, 1, () => {
101+
readable.on('data', noop);
102+
readable.off('data', noop);
103+
});
104+
}

0 commit comments

Comments
 (0)