Skip to content

Commit 39027d9

Browse files
committed
stream: introduce Body
This introduce a new stream primitive called Body which helps with performance, ergonomics and compatibility when working with different types of data producers and consumers. Using Body it will be possible to delay converting streamlike objects as long as possible and enable some optimizations where we can avoid e.g. intermediate node streams.
1 parent f1d3533 commit 39027d9

File tree

8 files changed

+564
-109
lines changed

8 files changed

+564
-109
lines changed

lib/internal/streams/body.js

Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
'use strict';
2+
3+
const { Buffer, Blob } = require('buffer');
4+
const Duplex = require('internal/streams/duplex');
5+
const Readable = require('internal/streams/readable');
6+
const Writable = require('internal/streams/writable');
7+
const duplexify = require('internal/streams/duplexify');
8+
const { createDeferredPromise } = require('internal/util');
9+
const { destroyer } = require('internal/streams/destroy');
10+
const from = require('internal/streams/from');
11+
const assert = require('internal/assert');
12+
13+
const {
14+
isBlob
15+
} = require('internal/blob');
16+
17+
const {
18+
isBrandCheck,
19+
} = require('internal/webstreams/util');
20+
21+
const isReadableStream =
22+
isBrandCheck('ReadableStream');
23+
const isWritableStream =
24+
isBrandCheck('WritableStream');
25+
26+
const {
27+
isIterable,
28+
isDuplexNodeStream,
29+
isReadableNodeStream,
30+
isWritableNodeStream,
31+
} = require('internal/streams/utils');
32+
33+
const {
34+
JSONParse,
35+
PromiseResolve,
36+
Symbol,
37+
SymbolAsyncIterator
38+
} = primordials;
39+
40+
const {
41+
codes: {
42+
ERR_INVALID_ARG_TYPE,
43+
ERR_INVALID_RETURN_VALUE,
44+
ERR_INVALID_STATE,
45+
},
46+
} = require('internal/errors');
47+
const console = require('console');
48+
49+
const kState = Symbol('kState');
50+
51+
// This is needed for pre node 17.
52+
class BodyDuplex extends Duplex {
53+
constructor(options) {
54+
super(options);
55+
56+
// https://github.com/nodejs/node/pull/34385
57+
58+
if (options?.readable === false) {
59+
this._readableState.readable = false;
60+
this._readableState.ended = true;
61+
this._readableState.endEmitted = true;
62+
}
63+
64+
if (options?.writable === false) {
65+
this._writableState.writable = false;
66+
this._writableState.ending = true;
67+
this._writableState.ended = true;
68+
this._writableState.finished = true;
69+
}
70+
}
71+
}
72+
73+
class Body {
74+
constructor(body, options) {
75+
// TODO (ronag): What about TransformStream?
76+
77+
if (body[kState]) {
78+
this[kState] = body[kState];
79+
} else if (
80+
isReadableStream(body?.readable) &&
81+
isWritableStream(body?.writable)
82+
) {
83+
// TODO (ronag): Optimize. Delay conversion.
84+
const d = Duplex.fromWeb(body, options);
85+
this[kState] = { readable: d, writable: d };
86+
} else if (isWritableStream(body?.writable)) {
87+
// TODO (ronag): Optimize. Delay conversion.
88+
this[kState] = {
89+
readable: undefined,
90+
writable: Writable.fromWeb(body, options)
91+
};
92+
} else if (isReadableStream(body?.readable)) {
93+
// TODO (ronag): Optimize. Delay conversion.
94+
this[kState] = {
95+
readable: Readable.fromWeb(body, options),
96+
writable: undefined
97+
};
98+
} else if (isDuplexNodeStream(body)) {
99+
this[kState] = { readable: body, writable: body };
100+
} else if (isReadableNodeStream(body)) {
101+
this[kState] = { readable: body, writable: undefined };
102+
} else if (isWritableNodeStream(body)) {
103+
this[kState] = { readable: undefined, writable: body };
104+
} else if (isReadableStream(body)) {
105+
// TODO (ronag): Optimize. Delay conversion.
106+
this[kState] = {
107+
readable: Readable.fromWeb(body, options),
108+
writable: undefined
109+
};
110+
} else if (isWritableStream(body)) {
111+
// TODO (ronag): Optimize. Delay conversion.
112+
this[kState] = {
113+
readable: undefined,
114+
writable: Writable.fromWeb(body, options)
115+
};
116+
} else if (typeof body === 'function') {
117+
// TODO (ronag): Optimize. Delay conversion.
118+
assert(body.length > 0);
119+
120+
const { value, write, final } = fromAsyncGen(body);
121+
122+
if (isIterable(value)) {
123+
const d = from(BodyDuplex, value, {
124+
objectMode: true,
125+
highWaterMark: 1,
126+
...options,
127+
write,
128+
final
129+
});
130+
131+
this[kState] = { readable: d, writable: d };
132+
} else if (typeof value?.then === 'function') {
133+
let d;
134+
135+
const promise = PromiseResolve(value)
136+
.then((val) => {
137+
if (val != null) {
138+
throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
139+
}
140+
})
141+
.catch((err) => {
142+
destroyer(d, err);
143+
});
144+
145+
d = new BodyDuplex({
146+
objectMode: true,
147+
highWaterMark: 1,
148+
...options,
149+
readable: false,
150+
write,
151+
final(cb) {
152+
final(() => promise.then(cb, cb));
153+
}
154+
});
155+
156+
this[kState] = { readable: d, writable: d };
157+
} else {
158+
throw new ERR_INVALID_RETURN_VALUE(
159+
'Iterable, AsyncIterable or AsyncFunction', 'body', value);
160+
}
161+
} else if (isBlob(body)) {
162+
// TODO (ronag): Optimize. Delay conversion.
163+
const d = new Body(async function* () {
164+
yield await body.arrayBuffer();
165+
}()).nodeStream();
166+
167+
this[kState] = { readable: d, writable: d };
168+
} else if (isIterable(body)) {
169+
// TODO (ronag): Optimize. Delay conversion.
170+
const d = from(BodyDuplex, body, {
171+
objectMode: true,
172+
highWaterMark: 1,
173+
...options,
174+
writable: false
175+
});
176+
177+
this[kState] = { readable: d, writable: d };
178+
} else if (
179+
typeof body?.writable === 'object' ||
180+
typeof body?.readable === 'object'
181+
) {
182+
// TODO (ronag): Optimize. Delay conversion.
183+
const readable = body?.readable ?
184+
isReadableNodeStream(body?.readable) ? body?.readable :
185+
new Body(body.readable).readableNodeStream() : undefined;
186+
const writable = body?.writable ?
187+
isWritableNodeStream(body?.writable) ? body?.writable :
188+
new Body(body.writable).writableNodeStream() : undefined;
189+
190+
this[kState] = { readable, writable };
191+
} else {
192+
throw new ERR_INVALID_ARG_TYPE(
193+
'stream',
194+
['Blob', 'ReadableStream', 'WritableStream', 'Stream', 'Iterable',
195+
'AsyncIterable', 'Function', '{ readable, writable } pair'],
196+
body)
197+
;
198+
}
199+
}
200+
201+
get writable() {
202+
return !!this[kState].writable;
203+
}
204+
205+
get readable() {
206+
return !!this[kState].readable;
207+
}
208+
209+
readableNodeStream() {
210+
const { readable } = this[kState];
211+
212+
if (readable === null) {
213+
throw new ERR_INVALID_STATE('read lock');
214+
}
215+
216+
this[kState].readable = null;
217+
218+
// TODO (ronag): Hide Writable interface.
219+
return readable ?? new BodyDuplex({ readable: false, writable: false });
220+
}
221+
222+
writableNodeStream() {
223+
const { writable } = this[kState];
224+
225+
if (writable === null) {
226+
throw new ERR_INVALID_STATE('write lock');
227+
}
228+
229+
this[kState].writable = null;
230+
231+
// TODO (ronag): Hide Readable interface.
232+
return writable ?? new BodyDuplex({ readable: false, writable: false });
233+
}
234+
235+
nodeStream() {
236+
if (this.readable === null) {
237+
throw new ERR_INVALID_STATE('read lock');
238+
}
239+
240+
if (this.writable === null) {
241+
throw new ERR_INVALID_STATE('write lock');
242+
}
243+
244+
if (this[kState].readable === this[kState].writable) {
245+
const d = this[kState].readable;
246+
this[kState].readable = null;
247+
this[kState].writable = null;
248+
return d;
249+
}
250+
251+
return duplexify({
252+
readable: this.readableNodeStream(),
253+
writable: this.writableNodeStream(),
254+
});
255+
}
256+
257+
readableWebStream() {
258+
return this.readableWebStream().asWeb();
259+
}
260+
261+
writableWebStream() {
262+
return this.writableNodeStream().asWeb();
263+
}
264+
265+
[SymbolAsyncIterator]() {
266+
return this.readableNodeStream()[SymbolAsyncIterator]();
267+
}
268+
269+
async blob() {
270+
const sources = [];
271+
for await (const chunk of this.readableNodeStream()) {
272+
sources.push(chunk);
273+
}
274+
return new Blob(sources);
275+
}
276+
277+
async buffer() {
278+
const sources = [];
279+
for await (const chunk of this.readableNodeStream()) {
280+
sources.push(chunk);
281+
}
282+
return Buffer.concat(sources);
283+
}
284+
285+
async arrayBuffer() {
286+
const blob = await this.blob();
287+
return blob.arrayBuffer();
288+
}
289+
290+
async text() {
291+
let ret = '';
292+
for await (const chunk of this.readableNodeStream()) {
293+
ret += chunk;
294+
}
295+
return ret;
296+
}
297+
298+
async json() {
299+
return JSONParse(await this.text());
300+
}
301+
}
302+
303+
function fromAsyncGen(fn) {
304+
let { promise, resolve } = createDeferredPromise();
305+
const value = fn(async function*() {
306+
while (true) {
307+
const { chunk, done, cb } = await promise;
308+
process.nextTick(cb);
309+
if (done) return;
310+
yield chunk;
311+
({ promise, resolve } = createDeferredPromise());
312+
}
313+
}());
314+
315+
return {
316+
value,
317+
write(chunk, encoding, cb) {
318+
resolve({ chunk, done: false, cb });
319+
},
320+
final(cb) {
321+
resolve({ done: true, cb });
322+
}
323+
};
324+
}
325+
326+
module.exports = Body;

0 commit comments

Comments
 (0)