Skip to content

Commit 88a4819

Browse files
benjamingrronag
authored andcommitted
stream: add drop and take
This adds the `drop` and `take` methods to readable streams allowing users easily drop and take items from the stream. This continues the iterator-helper proposal alignment task. Co-Authored-By: Robert Nagy <ronagy@icloud.com> PR-URL: #41630 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 205c018 commit 88a4819

File tree

3 files changed

+195
-0
lines changed

3 files changed

+195
-0
lines changed

doc/api/stream.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2021,6 +2021,50 @@ for await (const result of concatResult) {
20212021
}
20222022
```
20232023

2024+
### `readable.drop(limit[, options])`
2025+
2026+
<!-- YAML
2027+
added: REPLACEME
2028+
-->
2029+
2030+
> Stability: 1 - Experimental
2031+
2032+
* `limit` {number} the number of chunks to drop from the readable.
2033+
* `options` {Object}
2034+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2035+
aborted.
2036+
* Returns: {Readable} a stream with `limit` chunks dropped.
2037+
2038+
This method returns a new stream with the first `limit` chunks dropped.
2039+
2040+
```mjs
2041+
import { Readable } from 'stream';
2042+
2043+
await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
2044+
```
2045+
2046+
### `readable.take(limit[, options])`
2047+
2048+
<!-- YAML
2049+
added: REPLACEME
2050+
-->
2051+
2052+
> Stability: 1 - Experimental
2053+
2054+
* `limit` {number} the number of chunks to take from the readable.
2055+
* `options` {Object}
2056+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2057+
aborted.
2058+
* Returns: {Readable} a stream with `limit` chunks taken.
2059+
2060+
This method returns a new stream with the first `limit` chunks.
2061+
2062+
```mjs
2063+
import { Readable } from 'stream';
2064+
2065+
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
2066+
```
2067+
20242068
### Duplex and transform streams
20252069

20262070
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const { Buffer } = require('buffer');
66
const {
77
codes: {
88
ERR_INVALID_ARG_TYPE,
9+
ERR_OUT_OF_RANGE,
910
},
1011
AbortError,
1112
} = require('internal/errors');
@@ -15,6 +16,8 @@ const { kWeakHandler } = require('internal/event_target');
1516
const {
1617
ArrayPrototypePush,
1718
MathFloor,
19+
Number,
20+
NumberIsNaN,
1821
Promise,
1922
PromiseReject,
2023
PromisePrototypeCatch,
@@ -236,10 +239,62 @@ async function* flatMap(fn, options) {
236239
}
237240
}
238241

242+
function toIntegerOrInfinity(number) {
243+
// We coerce here to align with the spec
244+
// https://github.com/tc39/proposal-iterator-helpers/issues/169
245+
number = Number(number);
246+
if (NumberIsNaN(number)) {
247+
return 0;
248+
}
249+
if (number < 0) {
250+
throw new ERR_OUT_OF_RANGE('number', '>= 0', number);
251+
}
252+
return number;
253+
}
254+
255+
function drop(number, options) {
256+
number = toIntegerOrInfinity(number);
257+
return async function* drop() {
258+
if (options?.signal?.aborted) {
259+
throw new AbortError();
260+
}
261+
for await (const val of this) {
262+
if (options?.signal?.aborted) {
263+
throw new AbortError();
264+
}
265+
if (number-- <= 0) {
266+
yield val;
267+
}
268+
}
269+
}.call(this);
270+
}
271+
272+
273+
function take(number, options) {
274+
number = toIntegerOrInfinity(number);
275+
return async function* take() {
276+
if (options?.signal?.aborted) {
277+
throw new AbortError();
278+
}
279+
for await (const val of this) {
280+
if (options?.signal?.aborted) {
281+
throw new AbortError();
282+
}
283+
if (number-- > 0) {
284+
yield val;
285+
} else {
286+
return;
287+
}
288+
}
289+
}.call(this);
290+
}
291+
239292
module.exports.streamReturningOperators = {
293+
drop,
240294
filter,
241295
flatMap,
242296
map,
297+
take,
243298
};
244299

245300
module.exports.promiseReturningOperators = {
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const { deepStrictEqual, rejects, throws } = require('assert');
8+
9+
const { from } = Readable;
10+
11+
const fromAsync = (...args) => from(...args).map(async (x) => x);
12+
13+
const naturals = () => from(async function*() {
14+
let i = 1;
15+
while (true) {
16+
yield i++;
17+
}
18+
}());
19+
20+
{
21+
// Synchronous streams
22+
(async () => {
23+
deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
24+
deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
25+
deepStrictEqual(await from([]).drop(2).toArray(), []);
26+
deepStrictEqual(await from([]).take(1).toArray(), []);
27+
deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
28+
deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
29+
deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
30+
})().then(common.mustCall());
31+
// Asynchronous streams
32+
(async () => {
33+
deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
34+
deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
35+
deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
36+
deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
37+
deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
38+
deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
39+
deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
40+
})().then(common.mustCall());
41+
// Infinite streams
42+
// Asynchronous streams
43+
(async () => {
44+
deepStrictEqual(await naturals().take(1).toArray(), [1]);
45+
deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
46+
const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
47+
deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
48+
deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
49+
})().then(common.mustCall());
50+
}
51+
52+
{
53+
// Coercion
54+
(async () => {
55+
// The spec made me do this ^^
56+
deepStrictEqual(await naturals().take('cat').toArray(), []);
57+
deepStrictEqual(await naturals().take('2').toArray(), [1, 2]);
58+
deepStrictEqual(await naturals().take(true).toArray(), [1]);
59+
})().then(common.mustCall());
60+
}
61+
62+
{
63+
// Support for AbortSignal
64+
const ac = new AbortController();
65+
rejects(
66+
Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
67+
name: 'AbortError',
68+
}).then(common.mustCall());
69+
rejects(
70+
Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
71+
name: 'AbortError',
72+
}).then(common.mustCall());
73+
ac.abort();
74+
}
75+
76+
{
77+
// Support for AbortSignal, already aborted
78+
const signal = AbortSignal.abort();
79+
rejects(
80+
Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
81+
name: 'AbortError',
82+
}).then(common.mustCall());
83+
}
84+
85+
{
86+
// Error cases
87+
const invalidArgs = [
88+
-1,
89+
-Infinity,
90+
-40,
91+
];
92+
93+
for (const example of invalidArgs) {
94+
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
95+
}
96+
}

0 commit comments

Comments
 (0)