From 346abdd0606c66e696cc91150ac961d418c4c21d Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Tue, 19 Sep 2023 22:22:06 +0300 Subject: [PATCH] stream: improve readable webstream `pipeTo` PR-URL: https://github.com/nodejs/node/pull/49690 Reviewed-By: Matteo Collina Reviewed-By: Yagiz Nizipli Reviewed-By: Moshe Atlow --- lib/internal/webstreams/readablestream.js | 51 +++++++++++++++-------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index a2727d94b6da4b..4bdd1fa4862a8f 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -14,7 +14,6 @@ const { ObjectCreate, ObjectDefineProperties, ObjectSetPrototypeOf, - Promise, PromisePrototypeThen, PromiseResolve, PromiseReject, @@ -1351,7 +1350,9 @@ function readableStreamPipeTo( const promise = createDeferredPromise(); - let currentWrite = PromiseResolve(); + const state = { + currentWrite: PromiseResolve(), + }; // The error here can be undefined. The rejected arg // tells us that the promise must be rejected even @@ -1368,9 +1369,9 @@ function readableStreamPipeTo( } async function waitForCurrentWrite() { - const write = currentWrite; + const write = state.currentWrite; await write; - if (write !== currentWrite) + if (write !== state.currentWrite) await waitForCurrentWrite(); } @@ -1461,20 +1462,14 @@ function readableStreamPipeTo( async function step() { if (shuttingDown) return true; + await writer[kState].ready.promise; - return new Promise((resolve, reject) => { - readableStreamDefaultReaderRead( - reader, - { - [kChunk](chunk) { - currentWrite = writableStreamDefaultWriterWrite(writer, chunk); - setPromiseHandled(currentWrite); - resolve(false); - }, - [kClose]: () => resolve(true), - [kError]: reject, - }); - }); + + const promise = createDeferredPromise(); + // eslint-disable-next-line no-use-before-define + readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise)); + + return promise.promise; } async function run() { @@ -1536,6 +1531,28 @@ function readableStreamPipeTo( return promise.promise; } +class PipeToReadableStreamReadRequest { + constructor(writer, state, promise) { + this.writer = writer; + this.state = state; + this.promise = promise; + } + + [kChunk](chunk) { + this.state.currentWrite = writableStreamDefaultWriterWrite(this.writer, chunk); + setPromiseHandled(this.state.currentWrite); + this.promise.resolve(false); + } + + [kClose]() { + this.promise.resolve(true); + } + + [kError](error) { + this.promise.reject(error); + } +} + function readableStreamTee(stream, cloneForBranch2) { if (isReadableByteStreamController(stream[kState].controller)) { return readableByteStreamTee(stream);