Skip to content

Commit bdc3d17

Browse files
committed
stream: implement fetch body mixin on Readable
Make Readable exposew the fetch boxy mixin API. Bypasses webstream glue when possible. Refs: https://fetch.spec.whatwg.org/#body-mixin
1 parent ab03ab4 commit bdc3d17

File tree

1 file changed

+135
-1
lines changed

1 file changed

+135
-1
lines changed

lib/internal/streams/readable.js

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ const {
2626
NumberIsInteger,
2727
NumberIsNaN,
2828
NumberParseInt,
29+
JSONParse,
2930
ObjectDefineProperties,
3031
ObjectKeys,
3132
ObjectSetPrototypeOf,
3233
Promise,
3334
SafeSet,
3435
SymbolAsyncIterator,
35-
Symbol
36+
Symbol,
37+
TypeError,
3638
} = primordials;
3739

3840
module.exports = Readable;
@@ -42,6 +44,8 @@ const EE = require('events');
4244
const { Stream, prependListener } = require('internal/streams/legacy');
4345
const { Buffer } = require('buffer');
4446

47+
let Blob;
48+
4549
const {
4650
addAbortSignal,
4751
} = require('internal/streams/add-abort-signal');
@@ -58,6 +62,7 @@ const {
5862
} = require('internal/streams/state');
5963

6064
const {
65+
AbortError,
6166
aggregateTwoErrors,
6267
codes: {
6368
ERR_INVALID_ARG_TYPE,
@@ -69,6 +74,7 @@ const {
6974
const { validateObject } = require('internal/validators');
7075

7176
const kPaused = Symbol('kPaused');
77+
const kConsume = Symbol('kConsume');
7278

7379
const { StringDecoder } = require('string_decoder');
7480
const from = require('internal/streams/from');
@@ -206,6 +212,8 @@ function Readable(options) {
206212
addAbortSignal(options.signal, this);
207213
}
208214

215+
this[kConsume] = null;
216+
209217
Stream.call(this, options);
210218

211219
destroyImpl.construct(this, () => {
@@ -1268,6 +1276,12 @@ ObjectDefineProperties(Readable.prototype, {
12681276

12691277
});
12701278

1279+
const kWebStreamType = 1;
1280+
const kTextType = 2;
1281+
const kBlobType = 3;
1282+
const kArrayBufferType = 4;
1283+
const kJSONType = 5;
1284+
12711285
ObjectDefineProperties(ReadableState.prototype, {
12721286
// Legacy getter for `pipesCount`.
12731287
pipesCount: {
@@ -1284,9 +1298,129 @@ ObjectDefineProperties(ReadableState.prototype, {
12841298
set(value) {
12851299
this[kPaused] = !!value;
12861300
}
1301+
},
1302+
1303+
// https://fetch.spec.whatwg.org/#dom-body-bodyused
1304+
bodyUsed: {
1305+
get() {
1306+
return isDisturbed(this);
1307+
}
1308+
},
1309+
1310+
body: {
1311+
get() {
1312+
if (this[kConsume]?.type === kWebStreamType) {
1313+
return this[kConsume].body;
1314+
}
1315+
1316+
return consume(this, kWebStreamType);
1317+
}
1318+
},
1319+
1320+
text: {
1321+
get() {
1322+
return consume(this, kTextType);
1323+
}
1324+
},
1325+
1326+
json: {
1327+
get() {
1328+
return consume(this, kJSONType);
1329+
}
1330+
},
1331+
1332+
blob: {
1333+
get() {
1334+
return consume(this, kBlobType);
1335+
}
1336+
},
1337+
1338+
arrayBuffer: {
1339+
get() {
1340+
return consume(this, kArrayBufferType);
1341+
}
12871342
}
12881343
});
12891344

1345+
function isLocked(self) {
1346+
return self[kConsume] &&
1347+
(self[kConsume].type !== kWebStreamType || self[kConsume].body.locked);
1348+
}
1349+
1350+
// https://streams.spec.whatwg.org/#readablestream-disturbed
1351+
function isDisturbed(self) {
1352+
return self.destroyed || self.readableDidRead;
1353+
}
1354+
1355+
// https://fetch.spec.whatwg.org/#body-unusable
1356+
function isUnusable(self) {
1357+
return isDisturbed(self) || isLocked(self);
1358+
}
1359+
1360+
function consume(self, type) {
1361+
if (isUnusable(self)) {
1362+
throw new TypeError('unusable');
1363+
}
1364+
1365+
if (type === kWebStreamType) {
1366+
self[kConsume] = {
1367+
type,
1368+
body: Readable.toWeb(self)
1369+
};
1370+
1371+
return self[kConsume].body;
1372+
}
1373+
1374+
return new Promise((resolve, reject) => {
1375+
self[kConsume] = {
1376+
type,
1377+
resolve,
1378+
reject,
1379+
body: type === kTextType || type === kJSONType ? '' : []
1380+
};
1381+
self
1382+
.on('error', reject)
1383+
.on('data', function(val) {
1384+
const { type } = this[kConsume];
1385+
1386+
if (type === kTextType || type === kJSONType) {
1387+
this[kConsume].body += val;
1388+
} else {
1389+
this[kConsume].body.push(val);
1390+
}
1391+
})
1392+
.on('end', function() {
1393+
const { type, resolve, body } = this[kConsume];
1394+
1395+
try {
1396+
if (type === kTextType) {
1397+
resolve(body);
1398+
} else if (type === kJSONType) {
1399+
resolve(JSONParse(body));
1400+
} else if (type === kArrayBufferType) {
1401+
resolve(Buffer.concat(body).buffer);
1402+
} else if (type === kBlobType) {
1403+
if (!Blob) {
1404+
Blob = require('buffer').Blob;
1405+
}
1406+
resolve(new Blob(body));
1407+
}
1408+
1409+
this[kConsume].body = null;
1410+
} catch (err) {
1411+
self.destroy(err);
1412+
}
1413+
})
1414+
.on('close', function() {
1415+
const { body, reject } = this[kConsume];
1416+
1417+
if (body !== null) {
1418+
reject(new AbortError());
1419+
}
1420+
});
1421+
});
1422+
}
1423+
12901424
// Exposed for testing purposes only.
12911425
Readable._fromList = fromList;
12921426

0 commit comments

Comments
 (0)