Skip to content

Commit 344dba7

Browse files
committed
stream: utility consumers for web and node.js streams
Signed-off-by: James M Snell <jasnell@gmail.com>
1 parent dd18795 commit 344dba7

File tree

3 files changed

+315
-0
lines changed

3 files changed

+315
-0
lines changed

doc/api/webstreams.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,5 +1270,78 @@ added: REPLACEME
12701270
12711271
* Type: {WritableStream}
12721272
1273+
### Utility Consumers
1274+
<!-- YAML
1275+
added: REPLACEME
1276+
-->
1277+
1278+
The utility consumer functions provide common options for consuming
1279+
streams.
1280+
1281+
They are accessed using:
1282+
1283+
```mjs
1284+
import {
1285+
arrayBuffer,
1286+
blob,
1287+
json,
1288+
text,
1289+
} from 'node:stream/consumers';
1290+
```
1291+
1292+
```cjs
1293+
const {
1294+
arrayBuffer,
1295+
blob,
1296+
json,
1297+
text,
1298+
} = require('stream/consumers');
1299+
```
1300+
1301+
#### `streamConsumers.arrayBuffer(stream)`
1302+
<!-- YAML
1303+
added: REPLACEME
1304+
-->
1305+
1306+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1307+
* Returns: {Promise} Fulfills with an `ArrayBuffer` containing the full
1308+
contents of the stream.
1309+
1310+
#### `streamConsumers.blob(stream)`
1311+
<!-- YAML
1312+
added: REPLACEME
1313+
-->
1314+
1315+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1316+
* Returns: {Promise} Fulfills with a {Blob} containing the full contents
1317+
of the stream.
1318+
1319+
#### `streamConsumers.buffer(stream)`
1320+
<!-- YAML
1321+
added: REPLACEME
1322+
-->
1323+
1324+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1325+
* Returns: {Promise} Fulfills with a {Buffer} containing the full
1326+
contents of the stream.
1327+
1328+
#### `streamConsumers.json(stream)`
1329+
<!-- YAML
1330+
added: REPLACEME
1331+
-->
1332+
1333+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1334+
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
1335+
UTF-8 encoded string that is then passed through `JSON.parse()`.
1336+
1337+
#### `streamConsumers.text(stream)`
1338+
<!-- YAML
1339+
added: REPLACEME
1340+
-->
1341+
1342+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1343+
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
1344+
UTF-8 encoded string.
1345+
12731346
[Streams]: stream.md
12741347
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/

lib/stream/consumers.js

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
'use strict';
2+
3+
const {
4+
JSONParse,
5+
} = primordials;
6+
7+
const {
8+
TextDecoder,
9+
} = require('internal/encoding');
10+
11+
const {
12+
Blob,
13+
} = require('internal/blob');
14+
15+
const {
16+
Buffer,
17+
} = require('buffer');
18+
19+
/**
20+
* @typedef {import('../internal/webstreams/readablestream').ReadableStream
21+
* } ReadableStream
22+
* @typedef {import('../internal/streams/readable')} Readable
23+
*/
24+
25+
/**
26+
* @param {AsyncIterable|ReadableStream|Readable} stream
27+
* @returns {Promise<Blob>}
28+
*/
29+
async function blob(stream) {
30+
const chunks = [];
31+
for await (const chunk of stream)
32+
chunks.push(chunk);
33+
return new Blob(chunks);
34+
}
35+
36+
/**
37+
* @param {AsyncIterable|ReadableStream|Readable} stream
38+
* @returns {Promise<ArrayBuffer>}
39+
*/
40+
async function arrayBuffer(stream) {
41+
const ret = await blob(stream);
42+
return ret.arrayBuffer();
43+
}
44+
45+
/**
46+
* @param {AsyncIterable|ReadableStream|Readable} stream
47+
* @returns {Promise<Buffer>}
48+
*/
49+
async function buffer(stream) {
50+
return Buffer.from(await arrayBuffer(stream));
51+
}
52+
53+
/**
54+
* @param {AsyncIterable|ReadableStream|Readable} stream
55+
* @returns {Promise<string>}
56+
*/
57+
async function text(stream) {
58+
const dec = new TextDecoder(undefined, { stream: true });
59+
let str = '';
60+
for await (const chunk of stream) {
61+
if (typeof chunk === 'string')
62+
str += chunk;
63+
else
64+
str += dec.decode(chunk);
65+
}
66+
return str;
67+
}
68+
69+
/**
70+
* @param {AsyncIterable|ReadableStream|Readable} stream
71+
* @returns {Promise<any>}
72+
*/
73+
async function json(stream) {
74+
const str = await text(stream);
75+
return JSONParse(str);
76+
}
77+
78+
module.exports = {
79+
arrayBuffer,
80+
blob,
81+
buffer,
82+
text,
83+
json,
84+
};
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// Flags: --no-warnings
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
7+
const {
8+
arrayBuffer,
9+
blob,
10+
buffer,
11+
text,
12+
json,
13+
} = require('stream/consumers');
14+
15+
const {
16+
PassThrough
17+
} = require('stream');
18+
19+
const {
20+
TransformStream,
21+
} = require('stream/web');
22+
23+
const buf = Buffer.from('hellothere');
24+
const kArrayBuffer =
25+
buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
26+
27+
{
28+
const passthrough = new PassThrough();
29+
30+
blob(passthrough).then(common.mustCall(async (blob) => {
31+
assert.strictEqual(blob.size, 10);
32+
assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer);
33+
}));
34+
35+
passthrough.write('hello');
36+
setTimeout(() => passthrough.end('there'), 10);
37+
}
38+
39+
{
40+
const passthrough = new PassThrough();
41+
42+
arrayBuffer(passthrough).then(common.mustCall(async (ab) => {
43+
assert.strictEqual(ab.byteLength, 10);
44+
assert.deepStrictEqual(ab, kArrayBuffer);
45+
}));
46+
47+
passthrough.write('hello');
48+
setTimeout(() => passthrough.end('there'), 10);
49+
}
50+
51+
{
52+
const passthrough = new PassThrough();
53+
54+
buffer(passthrough).then(common.mustCall(async (buf) => {
55+
assert.strictEqual(buf.byteLength, 10);
56+
assert.deepStrictEqual(buf.buffer, kArrayBuffer);
57+
}));
58+
59+
passthrough.write('hello');
60+
setTimeout(() => passthrough.end('there'), 10);
61+
}
62+
63+
64+
{
65+
const passthrough = new PassThrough();
66+
67+
text(passthrough).then(common.mustCall(async (str) => {
68+
assert.strictEqual(str.length, 10);
69+
assert.deepStrictEqual(str, 'hellothere');
70+
}));
71+
72+
passthrough.write('hello');
73+
setTimeout(() => passthrough.end('there'), 10);
74+
}
75+
76+
{
77+
const passthrough = new PassThrough();
78+
79+
json(passthrough).then(common.mustCall(async (str) => {
80+
assert.strictEqual(str.length, 10);
81+
assert.deepStrictEqual(str, 'hellothere');
82+
}));
83+
84+
passthrough.write('"hello');
85+
setTimeout(() => passthrough.end('there"'), 10);
86+
}
87+
88+
{
89+
const { writable, readable } = new TransformStream();
90+
91+
blob(readable).then(common.mustCall(async (blob) => {
92+
assert.strictEqual(blob.size, 10);
93+
assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer);
94+
}));
95+
96+
const writer = writable.getWriter();
97+
writer.write('hello');
98+
setTimeout(() => {
99+
writer.write('there');
100+
writer.close();
101+
}, 10);
102+
103+
assert.rejects(blob(readable), { code: 'ERR_INVALID_STATE' });
104+
}
105+
106+
{
107+
const { writable, readable } = new TransformStream();
108+
109+
arrayBuffer(readable).then(common.mustCall(async (ab) => {
110+
assert.strictEqual(ab.byteLength, 10);
111+
assert.deepStrictEqual(ab, kArrayBuffer);
112+
}));
113+
114+
const writer = writable.getWriter();
115+
writer.write('hello');
116+
setTimeout(() => {
117+
writer.write('there');
118+
writer.close();
119+
}, 10);
120+
121+
assert.rejects(arrayBuffer(readable), { code: 'ERR_INVALID_STATE' });
122+
}
123+
124+
{
125+
const { writable, readable } = new TransformStream();
126+
127+
text(readable).then(common.mustCall(async (str) => {
128+
assert.strictEqual(str.length, 10);
129+
assert.deepStrictEqual(str, 'hellothere');
130+
}));
131+
132+
const writer = writable.getWriter();
133+
writer.write('hello');
134+
setTimeout(() => {
135+
writer.write('there');
136+
writer.close();
137+
}, 10);
138+
139+
assert.rejects(text(readable), { code: 'ERR_INVALID_STATE' });
140+
}
141+
142+
{
143+
const { writable, readable } = new TransformStream();
144+
145+
json(readable).then(common.mustCall(async (str) => {
146+
assert.strictEqual(str.length, 10);
147+
assert.deepStrictEqual(str, 'hellothere');
148+
}));
149+
150+
const writer = writable.getWriter();
151+
writer.write('"hello');
152+
setTimeout(() => {
153+
writer.write('there"');
154+
writer.close();
155+
}, 10);
156+
157+
assert.rejects(json(readable), { code: 'ERR_INVALID_STATE' });
158+
}

0 commit comments

Comments
 (0)