Skip to content

Commit 11cd12a

Browse files
daeyeonjuanarbol
authored andcommitted
stream: add ReadableByteStream.tee()
This supports teeing readable byte streams to meet the latest web streams standards. Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com PR-URL: #44505 Refs: https://streams.spec.whatwg.org/#readable-stream-tee Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
1 parent 404dad2 commit 11cd12a

File tree

5 files changed

+343
-12
lines changed

5 files changed

+343
-12
lines changed

doc/api/webstreams.md

+4
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,10 @@ is active.
303303

304304
<!-- YAML
305305
added: v16.5.0
306+
changes:
307+
- version: REPLACEME
308+
pr-url: https://github.com/nodejs/node/pull/44505
309+
description: Support teeing a readable byte stream.
306310
-->
307311

308312
* Returns: {ReadableStream\[]}

lib/internal/webstreams/readablestream.js

+293-11
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ const {
9393
ArrayBufferViewGetByteOffset,
9494
ArrayBufferGetByteLength,
9595
AsyncIterator,
96+
cloneAsUint8Array,
9697
copyArrayBuffer,
9798
customInspect,
9899
dequeueValue,
@@ -211,6 +212,7 @@ class ReadableStream {
211212
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
212213
this[kState] = {
213214
disturbed: false,
215+
reader: undefined,
214216
state: 'readable',
215217
storedError: undefined,
216218
stream: undefined,
@@ -1103,7 +1105,6 @@ class ReadableByteStreamController {
11031105
chunk);
11041106
}
11051107
const chunkByteLength = ArrayBufferViewGetByteLength(chunk);
1106-
const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk);
11071108
const chunkBuffer = ArrayBufferViewGetBuffer(chunk);
11081109
const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer);
11091110
if (chunkByteLength === 0 || chunkBufferByteLength === 0) {
@@ -1114,11 +1115,7 @@ class ReadableByteStreamController {
11141115
throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
11151116
if (this[kState].stream[kState].state !== 'readable')
11161117
throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
1117-
readableByteStreamControllerEnqueue(
1118-
this,
1119-
chunkBuffer,
1120-
chunkByteLength,
1121-
chunkByteOffset);
1118+
readableByteStreamControllerEnqueue(this, chunk);
11221119
}
11231120

11241121
/**
@@ -1416,6 +1413,13 @@ function readableStreamPipeTo(
14161413
}
14171414

14181415
function readableStreamTee(stream, cloneForBranch2) {
1416+
if (isReadableByteStreamController(stream[kState].controller)) {
1417+
return readableByteStreamTee(stream);
1418+
}
1419+
return readableStreamDefaultTee(stream, cloneForBranch2);
1420+
}
1421+
1422+
function readableStreamDefaultTee(stream, cloneForBranch2) {
14191423
const reader = new ReadableStreamDefaultReader(stream);
14201424
let reading = false;
14211425
let canceled1 = false;
@@ -1510,6 +1514,284 @@ function readableStreamTee(stream, cloneForBranch2) {
15101514
return [branch1, branch2];
15111515
}
15121516

1517+
function readableByteStreamTee(stream) {
1518+
assert(isReadableStream(stream));
1519+
assert(isReadableByteStreamController(stream[kState].controller));
1520+
1521+
let reader = new ReadableStreamDefaultReader(stream);
1522+
let reading = false;
1523+
let readAgainForBranch1 = false;
1524+
let readAgainForBranch2 = false;
1525+
let canceled1 = false;
1526+
let canceled2 = false;
1527+
let reason1;
1528+
let reason2;
1529+
let branch1;
1530+
let branch2;
1531+
const cancelDeferred = createDeferredPromise();
1532+
1533+
function forwardReaderError(thisReader) {
1534+
PromisePrototypeThen(
1535+
thisReader[kState].close.promise,
1536+
undefined,
1537+
(error) => {
1538+
if (thisReader !== reader) {
1539+
return;
1540+
}
1541+
readableStreamDefaultControllerError(branch1[kState].controller, error);
1542+
readableStreamDefaultControllerError(branch2[kState].controller, error);
1543+
if (!canceled1 || !canceled2) {
1544+
cancelDeferred.resolve();
1545+
}
1546+
}
1547+
);
1548+
}
1549+
1550+
function pullWithDefaultReader() {
1551+
if (isReadableStreamBYOBReader(reader)) {
1552+
readableStreamBYOBReaderRelease(reader);
1553+
reader = new ReadableStreamDefaultReader(stream);
1554+
forwardReaderError(reader);
1555+
}
1556+
1557+
const readRequest = {
1558+
[kChunk](chunk) {
1559+
queueMicrotask(() => {
1560+
readAgainForBranch1 = false;
1561+
readAgainForBranch2 = false;
1562+
const chunk1 = chunk;
1563+
let chunk2 = chunk;
1564+
1565+
if (!canceled1 && !canceled2) {
1566+
try {
1567+
chunk2 = cloneAsUint8Array(chunk);
1568+
} catch (error) {
1569+
readableByteStreamControllerError(
1570+
branch1[kState].controller,
1571+
error
1572+
);
1573+
readableByteStreamControllerError(
1574+
branch2[kState].controller,
1575+
error
1576+
);
1577+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1578+
return;
1579+
}
1580+
}
1581+
if (!canceled1) {
1582+
readableByteStreamControllerEnqueue(
1583+
branch1[kState].controller,
1584+
chunk1
1585+
);
1586+
}
1587+
if (!canceled2) {
1588+
readableByteStreamControllerEnqueue(
1589+
branch2[kState].controller,
1590+
chunk2
1591+
);
1592+
}
1593+
reading = false;
1594+
1595+
if (readAgainForBranch1) {
1596+
pull1Algorithm();
1597+
} else if (readAgainForBranch2) {
1598+
pull2Algorithm();
1599+
}
1600+
});
1601+
},
1602+
[kClose]() {
1603+
reading = false;
1604+
1605+
if (!canceled1) {
1606+
readableByteStreamControllerClose(branch1[kState].controller);
1607+
}
1608+
if (!canceled2) {
1609+
readableByteStreamControllerClose(branch2[kState].controller);
1610+
}
1611+
if (branch1[kState].controller[kState].pendingPullIntos.length > 0) {
1612+
readableByteStreamControllerRespond(branch1[kState].controller, 0);
1613+
}
1614+
if (branch2[kState].controller[kState].pendingPullIntos.length > 0) {
1615+
readableByteStreamControllerRespond(branch2[kState].controller, 0);
1616+
}
1617+
if (!canceled1 || !canceled2) {
1618+
cancelDeferred.resolve();
1619+
}
1620+
},
1621+
[kError]() {
1622+
reading = false;
1623+
},
1624+
};
1625+
1626+
readableStreamDefaultReaderRead(reader, readRequest);
1627+
}
1628+
1629+
function pullWithBYOBReader(view, forBranch2) {
1630+
if (isReadableStreamDefaultReader(reader)) {
1631+
readableStreamDefaultReaderRelease(reader);
1632+
reader = new ReadableStreamBYOBReader(stream);
1633+
forwardReaderError(reader);
1634+
}
1635+
1636+
const byobBranch = forBranch2 === true ? branch2 : branch1;
1637+
const otherBranch = forBranch2 === false ? branch2 : branch1;
1638+
const readIntoRequest = {
1639+
[kChunk](chunk) {
1640+
queueMicrotask(() => {
1641+
readAgainForBranch1 = false;
1642+
readAgainForBranch2 = false;
1643+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1644+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1645+
1646+
if (!otherCanceled) {
1647+
let clonedChunk;
1648+
1649+
try {
1650+
clonedChunk = cloneAsUint8Array(chunk);
1651+
} catch (error) {
1652+
readableByteStreamControllerError(
1653+
byobBranch[kState].controller,
1654+
error
1655+
);
1656+
readableByteStreamControllerError(
1657+
otherBranch[kState].controller,
1658+
error
1659+
);
1660+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1661+
return;
1662+
}
1663+
if (!byobCanceled) {
1664+
readableByteStreamControllerRespondWithNewView(
1665+
byobBranch[kState].controller,
1666+
chunk
1667+
);
1668+
}
1669+
1670+
readableByteStreamControllerEnqueue(
1671+
otherBranch[kState].controller,
1672+
clonedChunk
1673+
);
1674+
} else if (!byobCanceled) {
1675+
readableByteStreamControllerRespondWithNewView(
1676+
byobBranch[kState].controller,
1677+
chunk
1678+
);
1679+
}
1680+
reading = false;
1681+
1682+
if (readAgainForBranch1) {
1683+
pull1Algorithm();
1684+
} else if (readAgainForBranch2) {
1685+
pull2Algorithm();
1686+
}
1687+
});
1688+
},
1689+
[kClose](chunk) {
1690+
reading = false;
1691+
1692+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1693+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1694+
1695+
if (!byobCanceled) {
1696+
readableByteStreamControllerClose(byobBranch[kState].controller);
1697+
}
1698+
if (!otherCanceled) {
1699+
readableByteStreamControllerClose(otherBranch[kState].controller);
1700+
}
1701+
if (chunk !== undefined) {
1702+
if (!byobCanceled) {
1703+
readableByteStreamControllerRespondWithNewView(
1704+
byobBranch[kState].controller,
1705+
chunk
1706+
);
1707+
}
1708+
if (
1709+
!otherCanceled &&
1710+
otherBranch[kState].controller[kState].pendingPullIntos.length > 0
1711+
) {
1712+
readableByteStreamControllerRespond(
1713+
otherBranch[kState].controller,
1714+
0
1715+
);
1716+
}
1717+
}
1718+
if (!byobCanceled || !otherCanceled) {
1719+
cancelDeferred.resolve();
1720+
}
1721+
},
1722+
[kError]() {
1723+
reading = false;
1724+
},
1725+
};
1726+
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1727+
}
1728+
1729+
function pull1Algorithm() {
1730+
if (reading) {
1731+
readAgainForBranch1 = true;
1732+
return PromiseResolve();
1733+
}
1734+
reading = true;
1735+
1736+
const byobRequest = branch1[kState].controller.byobRequest;
1737+
if (byobRequest === null) {
1738+
pullWithDefaultReader();
1739+
} else {
1740+
pullWithBYOBReader(byobRequest[kState].view, false);
1741+
}
1742+
return PromiseResolve();
1743+
}
1744+
1745+
function pull2Algorithm() {
1746+
if (reading) {
1747+
readAgainForBranch2 = true;
1748+
return PromiseResolve();
1749+
}
1750+
reading = true;
1751+
1752+
const byobRequest = branch2[kState].controller.byobRequest;
1753+
if (byobRequest === null) {
1754+
pullWithDefaultReader();
1755+
} else {
1756+
pullWithBYOBReader(byobRequest[kState].view, true);
1757+
}
1758+
return PromiseResolve();
1759+
}
1760+
1761+
function cancel1Algorithm(reason) {
1762+
canceled1 = true;
1763+
reason1 = reason;
1764+
if (canceled2) {
1765+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1766+
}
1767+
return cancelDeferred.promise;
1768+
}
1769+
1770+
function cancel2Algorithm(reason) {
1771+
canceled2 = true;
1772+
reason2 = reason;
1773+
if (canceled1) {
1774+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1775+
}
1776+
return cancelDeferred.promise;
1777+
}
1778+
1779+
branch1 = new ReadableStream({
1780+
type: 'bytes',
1781+
pull: pull1Algorithm,
1782+
cancel: cancel1Algorithm,
1783+
});
1784+
branch2 = new ReadableStream({
1785+
type: 'bytes',
1786+
pull: pull2Algorithm,
1787+
cancel: cancel2Algorithm,
1788+
});
1789+
1790+
forwardReaderError(reader);
1791+
1792+
return [branch1, branch2];
1793+
}
1794+
15131795
function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
15141796
const {
15151797
buffer,
@@ -2273,18 +2555,18 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
22732555
desc.bytesFilled += size;
22742556
}
22752557

2276-
function readableByteStreamControllerEnqueue(
2277-
controller,
2278-
buffer,
2279-
byteLength,
2280-
byteOffset) {
2558+
function readableByteStreamControllerEnqueue(controller, chunk) {
22812559
const {
22822560
closeRequested,
22832561
pendingPullIntos,
22842562
queue,
22852563
stream,
22862564
} = controller[kState];
22872565

2566+
const buffer = ArrayBufferViewGetBuffer(chunk);
2567+
const byteOffset = ArrayBufferViewGetByteOffset(chunk);
2568+
const byteLength = ArrayBufferViewGetByteLength(chunk);
2569+
22882570
if (closeRequested || stream[kState].state !== 'readable')
22892571
return;
22902572

0 commit comments

Comments
 (0)