From b266fffde6d3cc752af290c84c1115485ffbc419 Mon Sep 17 00:00:00 2001 From: Daeyeon Jeong Date: Fri, 9 Sep 2022 07:06:06 +0900 Subject: [PATCH] stream: add `ReadableByteStream.tee()` This supports teeing readable byte streams to meet the latest web streams standards. Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com PR-URL: https://github.com/nodejs/node/pull/44505 Refs: https://streams.spec.whatwg.org/#readable-stream-tee Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Minwoo Jung Reviewed-By: Antoine du Hamel --- doc/api/webstreams.md | 4 + lib/internal/webstreams/readablestream.js | 302 ++++++++++++++++++- lib/internal/webstreams/util.js | 12 + test/parallel/test-whatwg-readablestream.js | 2 +- test/parallel/test-whatwg-readablestream.mjs | 34 +++ 5 files changed, 342 insertions(+), 12 deletions(-) diff --git a/doc/api/webstreams.md b/doc/api/webstreams.md index 0fe9aeeba93383..78df370979e2ef 100644 --- a/doc/api/webstreams.md +++ b/doc/api/webstreams.md @@ -303,6 +303,10 @@ is active. * Returns: {ReadableStream\[]} diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 26879ad861efe7..28f71293767ecc 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -93,6 +93,7 @@ const { ArrayBufferViewGetByteOffset, ArrayBufferGetByteLength, AsyncIterator, + cloneAsUint8Array, copyArrayBuffer, customInspect, dequeueValue, @@ -211,6 +212,7 @@ class ReadableStream { throw new ERR_INVALID_ARG_VALUE('source', 'Object', source); this[kState] = { disturbed: false, + reader: undefined, state: 'readable', storedError: undefined, stream: undefined, @@ -1103,7 +1105,6 @@ class ReadableByteStreamController { chunk); } const chunkByteLength = ArrayBufferViewGetByteLength(chunk); - const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk); const chunkBuffer = ArrayBufferViewGetBuffer(chunk); const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer); if (chunkByteLength === 0 || chunkBufferByteLength === 0) { @@ -1114,11 +1115,7 @@ class ReadableByteStreamController { throw new ERR_INVALID_STATE.TypeError('Controller is already closed'); if (this[kState].stream[kState].state !== 'readable') throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed'); - readableByteStreamControllerEnqueue( - this, - chunkBuffer, - chunkByteLength, - chunkByteOffset); + readableByteStreamControllerEnqueue(this, chunk); } /** @@ -1416,6 +1413,13 @@ function readableStreamPipeTo( } function readableStreamTee(stream, cloneForBranch2) { + if (isReadableByteStreamController(stream[kState].controller)) { + return readableByteStreamTee(stream); + } + return readableStreamDefaultTee(stream, cloneForBranch2); +} + +function readableStreamDefaultTee(stream, cloneForBranch2) { const reader = new ReadableStreamDefaultReader(stream); let reading = false; let canceled1 = false; @@ -1510,6 +1514,282 @@ function readableStreamTee(stream, cloneForBranch2) { return [branch1, branch2]; } +function readableByteStreamTee(stream) { + assert(isReadableStream(stream)); + assert(isReadableByteStreamController(stream[kState].controller)); + + let reader = new ReadableStreamDefaultReader(stream); + let reading = false; + let readAgainForBranch1 = false; + let readAgainForBranch2 = false; + let canceled1 = false; + let canceled2 = false; + let reason1; + let reason2; + let branch1; + let branch2; + const cancelDeferred = createDeferredPromise(); + + function forwardReaderError(thisReader) { + PromisePrototypeThen( + thisReader[kState].close.promise, + undefined, + (error) => { + if (thisReader !== reader) { + return; + } + readableStreamDefaultControllerError(branch1[kState].controller, error); + readableStreamDefaultControllerError(branch2[kState].controller, error); + if (!canceled1 || !canceled2) { + cancelDeferred.resolve(); + } + } + ); + } + + function pullWithDefaultReader() { + if (isReadableStreamBYOBReader(reader)) { + reader = new ReadableStreamDefaultReader(stream); + forwardReaderError(reader); + } + + const readRequest = { + [kChunk](chunk) { + queueMicrotask(() => { + readAgainForBranch1 = false; + readAgainForBranch2 = false; + const chunk1 = chunk; + let chunk2 = chunk; + + if (!canceled1 && !canceled2) { + try { + chunk2 = cloneAsUint8Array(chunk); + } catch (error) { + readableByteStreamControllerError( + branch1[kState].controller, + error + ); + readableByteStreamControllerError( + branch2[kState].controller, + error + ); + cancelDeferred.resolve(readableStreamCancel(stream, error)); + return; + } + } + if (!canceled1) { + readableByteStreamControllerEnqueue( + branch1[kState].controller, + chunk1 + ); + } + if (!canceled2) { + readableByteStreamControllerEnqueue( + branch2[kState].controller, + chunk2 + ); + } + reading = false; + + if (readAgainForBranch1) { + pull1Algorithm(); + } else if (readAgainForBranch2) { + pull2Algorithm(); + } + }); + }, + [kClose]() { + reading = false; + + if (!canceled1) { + readableByteStreamControllerClose(branch1[kState].controller); + } + if (!canceled2) { + readableByteStreamControllerClose(branch2[kState].controller); + } + if (branch1[kState].controller[kState].pendingPullIntos.length > 0) { + readableByteStreamControllerRespond(branch1[kState].controller, 0); + } + if (branch2[kState].controller[kState].pendingPullIntos.length > 0) { + readableByteStreamControllerRespond(branch2[kState].controller, 0); + } + if (!canceled1 || !canceled2) { + cancelDeferred.resolve(); + } + }, + [kError]() { + reading = false; + }, + }; + + readableStreamDefaultReaderRead(reader, readRequest); + } + + function pullWithBYOBReader(view, forBranch2) { + if (isReadableStreamDefaultReader(reader)) { + reader = new ReadableStreamBYOBReader(stream); + forwardReaderError(reader); + } + + const byobBranch = forBranch2 === true ? branch2 : branch1; + const otherBranch = forBranch2 === false ? branch2 : branch1; + const readIntoRequest = { + [kChunk](chunk) { + queueMicrotask(() => { + readAgainForBranch1 = false; + readAgainForBranch2 = false; + const byobCanceled = forBranch2 === true ? canceled2 : canceled1; + const otherCanceled = forBranch2 === false ? canceled2 : canceled1; + + if (!otherCanceled) { + let clonedChunk; + + try { + clonedChunk = cloneAsUint8Array(chunk); + } catch (error) { + readableByteStreamControllerError( + byobBranch[kState].controller, + error + ); + readableByteStreamControllerError( + otherBranch[kState].controller, + error + ); + cancelDeferred.resolve(readableStreamCancel(stream, error)); + return; + } + if (!byobCanceled) { + readableByteStreamControllerRespondWithNewView( + byobBranch[kState].controller, + chunk + ); + } + + readableByteStreamControllerEnqueue( + otherBranch[kState].controller, + clonedChunk + ); + } else if (!byobCanceled) { + readableByteStreamControllerRespondWithNewView( + byobBranch[kState].controller, + chunk + ); + } + reading = false; + + if (readAgainForBranch1) { + pull1Algorithm(); + } else if (readAgainForBranch2) { + pull2Algorithm(); + } + }); + }, + [kClose](chunk) { + reading = false; + + const byobCanceled = forBranch2 === true ? canceled2 : canceled1; + const otherCanceled = forBranch2 === false ? canceled2 : canceled1; + + if (!byobCanceled) { + readableByteStreamControllerClose(byobBranch[kState].controller); + } + if (!otherCanceled) { + readableByteStreamControllerClose(otherBranch[kState].controller); + } + if (chunk !== undefined) { + if (!byobCanceled) { + readableByteStreamControllerRespondWithNewView( + byobBranch[kState].controller, + chunk + ); + } + if ( + !otherCanceled && + otherBranch[kState].controller[kState].pendingPullIntos.length > 0 + ) { + readableByteStreamControllerRespond( + otherBranch[kState].controller, + 0 + ); + } + } + if (!byobCanceled || !otherCanceled) { + cancelDeferred.resolve(); + } + }, + [kError]() { + reading = false; + }, + }; + readableStreamBYOBReaderRead(reader, view, readIntoRequest); + } + + function pull1Algorithm() { + if (reading) { + readAgainForBranch1 = true; + return PromiseResolve(); + } + reading = true; + + const byobRequest = branch1[kState].controller.byobRequest; + if (byobRequest === null) { + pullWithDefaultReader(); + } else { + pullWithBYOBReader(byobRequest[kState].view, false); + } + return PromiseResolve(); + } + + function pull2Algorithm() { + if (reading) { + readAgainForBranch2 = true; + return PromiseResolve(); + } + reading = true; + + const byobRequest = branch2[kState].controller.byobRequest; + if (byobRequest === null) { + pullWithDefaultReader(); + } else { + pullWithBYOBReader(byobRequest[kState].view, true); + } + return PromiseResolve(); + } + + function cancel1Algorithm(reason) { + canceled1 = true; + reason1 = reason; + if (canceled2) { + cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2])); + } + return cancelDeferred.promise; + } + + function cancel2Algorithm(reason) { + canceled2 = true; + reason2 = reason; + if (canceled1) { + cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2])); + } + return cancelDeferred.promise; + } + + branch1 = new ReadableStream({ + type: 'bytes', + pull: pull1Algorithm, + cancel: cancel1Algorithm, + }); + branch2 = new ReadableStream({ + type: 'bytes', + pull: pull2Algorithm, + cancel: cancel2Algorithm, + }); + + forwardReaderError(reader); + + return [branch1, branch2]; +} + function readableByteStreamControllerConvertPullIntoDescriptor(desc) { const { buffer, @@ -2273,11 +2553,7 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor( desc.bytesFilled += size; } -function readableByteStreamControllerEnqueue( - controller, - buffer, - byteLength, - byteOffset) { +function readableByteStreamControllerEnqueue(controller, chunk) { const { closeRequested, pendingPullIntos, @@ -2285,6 +2561,10 @@ function readableByteStreamControllerEnqueue( stream, } = controller[kState]; + const buffer = ArrayBufferViewGetBuffer(chunk); + const byteOffset = ArrayBufferViewGetByteOffset(chunk); + const byteLength = ArrayBufferViewGetByteLength(chunk); + if (closeRequested || stream[kState].state !== 'readable') return; diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 524093dc74df66..3dd50827f29aa5 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -2,6 +2,7 @@ const { ArrayBufferPrototype, + ArrayBufferPrototypeSlice, ArrayPrototypePush, ArrayPrototypeShift, AsyncIteratorPrototype, @@ -14,6 +15,7 @@ const { PromiseReject, ReflectGet, Symbol, + Uint8Array, } = primordials; const { @@ -111,6 +113,15 @@ function ArrayBufferGetByteLength(view) { return ReflectGet(ArrayBufferPrototype, 'byteLength', view); } +function cloneAsUint8Array(view) { + const buffer = ArrayBufferViewGetBuffer(view); + const byteOffset = ArrayBufferViewGetByteOffset(view); + const byteLength = ArrayBufferViewGetByteLength(view); + return new Uint8Array( + ArrayBufferPrototypeSlice(buffer, byteOffset, byteOffset + byteLength) + ); +} + function isBrandCheck(brand) { return (value) => { return value != null && @@ -215,6 +226,7 @@ module.exports = { ArrayBufferViewGetByteOffset, ArrayBufferGetByteLength, AsyncIterator, + cloneAsUint8Array, copyArrayBuffer, customInspect, dequeueValue, diff --git a/test/parallel/test-whatwg-readablestream.js b/test/parallel/test-whatwg-readablestream.js index 9f1cfc60e9a28b..c798c73500f021 100644 --- a/test/parallel/test-whatwg-readablestream.js +++ b/test/parallel/test-whatwg-readablestream.js @@ -1571,7 +1571,7 @@ class Source { assert(!readableStreamDefaultControllerCanCloseOrEnqueue(controller)); readableStreamDefaultControllerEnqueue(controller); readableByteStreamControllerClose(controller); - readableByteStreamControllerEnqueue(controller); + readableByteStreamControllerEnqueue(controller, new Uint8Array(1)); } { diff --git a/test/parallel/test-whatwg-readablestream.mjs b/test/parallel/test-whatwg-readablestream.mjs index a3693f62439ce7..57ebed604542a3 100644 --- a/test/parallel/test-whatwg-readablestream.mjs +++ b/test/parallel/test-whatwg-readablestream.mjs @@ -34,3 +34,37 @@ import assert from 'assert'; assert.strictEqual(dataReader2, 'foobar'); })().then(mustCall()); } + +{ + // Test ReadableByteStream.tee() with close in the nextTick after enqueue + async function read(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return Buffer.concat(chunks).toString(); + } + + const [r1, r2] = new ReadableStream({ + type: 'bytes', + start(controller) { + process.nextTick(() => { + controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114])); + + process.nextTick(() => { + controller.close(); + }); + }); + } + }).tee(); + + (async () => { + const [dataReader1, dataReader2] = await Promise.all([ + read(r1), + read(r2), + ]); + + assert.strictEqual(dataReader1, dataReader2); + assert.strictEqual(dataReader1, 'foobar'); + assert.strictEqual(dataReader2, 'foobar'); + })().then(mustCall()); +}