Skip to content

Commit

Permalink
PipeTo initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
youennf committed Apr 17, 2023
1 parent 5993860 commit c585a6c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
11 changes: 11 additions & 0 deletions reference-implementation/lib/abstract-ops/miscellaneous.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,14 @@ exports.CloneAsUint8Array = O => {
exports.StructuredTransferOrClone = (value, transferList) => {
return globalThis.structuredClone(value, { transfer: transferList });
};

exports.RunCloseSteps = value => {
if (typeof value.close === 'function') {
return;
}
try {
value.close();
} catch (closeException) {
// Nothing to do.
}
};
12 changes: 3 additions & 9 deletions reference-implementation/lib/abstract-ops/queue-with-sizes.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const assert = require('assert');
const { IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');
const { IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } = require('./miscellaneous.js');

exports.DequeueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container);
Expand All @@ -24,7 +24,7 @@ exports.EnqueueValueWithSize = (container, value, size, transferList) => {
if (size === Infinity) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}
if (container._isOwning) {
if (container._isOwning && !container._isPipeToOptimizedTransfer) {
value = StructuredTransferOrClone(value, transferList);
}
container._queue.push({ value, size });
Expand All @@ -45,13 +45,7 @@ exports.ResetQueue = container => {
if (container._isOwning) {
while (container._queue.length > 0) {
const value = exports.DequeueValue(container);
if (typeof value.close === 'function') {
try {
value.close();
} catch (closeException) {
// Nothing to do.
}
}
RunCloseSteps(value);
}
}
container._queue = [];
Expand Down
11 changes: 9 additions & 2 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re
require('../helpers/webidl.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { CloneAsUint8Array, IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');
const { CloneAsUint8Array, IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } =
require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
Expand Down Expand Up @@ -136,6 +137,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC

const reader = AcquireReadableStreamDefaultReader(source);
const writer = AcquireWritableStreamDefaultWriter(dest);
writer._stream._controller._isPipeToOptimizedTransfer = source._controller._isOwning && dest._controller._isOwning;

source._disturbed = true;

Expand Down Expand Up @@ -206,7 +208,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
{
chunkSteps: chunk => {
currentWrite = transformPromiseWith(
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {}
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {
if (reader._stream._controller._isOwning) {
RunCloseSteps(chunk);
}
}
);
resolveRead(false);
},
Expand Down Expand Up @@ -319,6 +325,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

function finalize(isError, error) {
writer._stream._controller._isPipeToOptimizedTransfer = undefined;
WritableStreamDefaultWriterRelease(writer);
ReadableStreamDefaultReaderRelease(reader);

Expand Down

0 comments on commit c585a6c

Please sign in to comment.