Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for TransformStream "owning" writable/readable types #1274

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ jobs:
submodules: true
- uses: actions/setup-node@v1
with:
node-version: 14
node-version: 19
- run: npm install
- run: npm test
245 changes: 172 additions & 73 deletions index.bs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion reference-implementation/lib/ReadableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ exports.implementation = class ReadableStreamImpl {
this, underlyingSource, underlyingSourceDict, highWaterMark
);
} else {
assert(!('type' in underlyingSourceDict));
assert(!('type' in underlyingSourceDict) || underlyingSourceDict.type === 'owning');
const sizeAlgorithm = ExtractSizeAlgorithm(strategy);
const highWaterMark = ExtractHighWaterMark(strategy, 1);
aos.SetUpReadableStreamDefaultControllerFromUnderlyingSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ exports.implementation = class ReadableStreamDefaultControllerImpl {
aos.ReadableStreamDefaultControllerClose(this);
}

enqueue(chunk) {
enqueue(chunk, options) {
const transferList = options ? options.transfer : undefined;
if (aos.ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {
throw new TypeError('The stream is not in a state that permits enqueue');
}

return aos.ReadableStreamDefaultControllerEnqueue(this, chunk);
return aos.ReadableStreamDefaultControllerEnqueue(this, chunk, transferList);
}

error(e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
dictionary StructuredSerializeOptions {
sequence<object> transfer = [];
};

[Exposed=(Window,Worker,Worklet)]
interface ReadableStreamDefaultController {
readonly attribute unrestricted double? desiredSize;

undefined close();
undefined enqueue(optional any chunk);
undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { });
undefined error(optional any e);
};
9 changes: 5 additions & 4 deletions reference-implementation/lib/TransformStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ exports.implementation = class TransformStreamImpl {
transformer = null;
}
const transformerDict = Transformer.convert(transformer);
if ('readableType' in transformerDict) {
throw new RangeError('Invalid readableType specified');
if ('readableType' in transformerDict && transformerDict['readableType'] !== 'owning') {
throw new TypeError('Invalid readableType specified');
}
if ('writableType' in transformerDict) {
throw new RangeError('Invalid writableType specified');
assert(transformerDict['writableType'] !== 'owning');
}

const readableHighWaterMark = ExtractHighWaterMark(readableStrategy, 0);
Expand All @@ -27,7 +27,8 @@ exports.implementation = class TransformStreamImpl {
const startPromise = newPromise();

aos.InitializeTransformStream(
this, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm
this, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm,
transformerDict
);
aos.SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ exports.implementation = class TransformStreamDefaultController {
return rsAOs.ReadableStreamDefaultControllerGetDesiredSize(readableController);
}

enqueue(chunk) {
aos.TransformStreamDefaultControllerEnqueue(this, chunk);
enqueue(chunk, options) {
const transferList = options ? options.transfer : undefined;
aos.TransformStreamDefaultControllerEnqueue(this, chunk, transferList);
}

error(reason) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
dictionary StructuredSerializeOptions {
sequence<object> transfer = [];
};

[Exposed=(Window,Worker,Worklet)]
interface TransformStreamDefaultController {
readonly attribute unrestricted double? desiredSize;

undefined enqueue(optional any chunk);
undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { });
undefined error(optional any reason);
undefined terminate();
};
4 changes: 2 additions & 2 deletions reference-implementation/lib/Transformer.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ dictionary Transformer {
TransformerStartCallback start;
TransformerTransformCallback transform;
TransformerFlushCallback flush;
any readableType;
any writableType;
ReadableStreamType readableType;
WritableStreamType writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
Expand Down
4 changes: 3 additions & 1 deletion reference-implementation/lib/UnderlyingSink.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ dictionary UnderlyingSink {
UnderlyingSinkWriteCallback write;
UnderlyingSinkCloseCallback close;
UnderlyingSinkAbortCallback abort;
any type;
WritableStreamType type;
};

callback UnderlyingSinkStartCallback = any (WritableStreamDefaultController controller);
callback UnderlyingSinkWriteCallback = Promise<undefined> (any chunk, WritableStreamDefaultController controller);
callback UnderlyingSinkCloseCallback = Promise<undefined> ();
callback UnderlyingSinkAbortCallback = Promise<undefined> (optional any reason);

enum WritableStreamType { "owning" };
2 changes: 1 addition & 1 deletion reference-implementation/lib/UnderlyingSource.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ callback UnderlyingSourceStartCallback = any (ReadableStreamController controlle
callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);

enum ReadableStreamType { "bytes" };
enum ReadableStreamType { "bytes", "owning" };
2 changes: 1 addition & 1 deletion reference-implementation/lib/WritableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ exports.implementation = class WritableStreamImpl {
underlyingSink = null;
}
const underlyingSinkDict = UnderlyingSink.convert(underlyingSink);
if ('type' in underlyingSinkDict) {
if ('type' in underlyingSinkDict && underlyingSinkDict.type !== 'owning') {
throw new RangeError('Invalid type is specified');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ exports.implementation = class WritableStreamDefaultWriterImpl {
aos.WritableStreamDefaultWriterRelease(this);
}

write(chunk) {
write(chunk, options) {
const transferList = options ? options.transfer : undefined;
if (this._stream === undefined) {
return promiseRejectedWith(defaultWriterLockException('write to'));
}

return aos.WritableStreamDefaultWriterWrite(this, chunk);
return aos.WritableStreamDefaultWriterWrite(this, chunk, transferList);
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
dictionary StructuredSerializeOptions {
sequence<object> transfer = [];
};

[Exposed=(Window,Worker,Worklet)]
interface WritableStreamDefaultWriter {
constructor(WritableStream stream);
Expand All @@ -9,5 +13,5 @@ interface WritableStreamDefaultWriter {
Promise<undefined> abort(optional any reason);
Promise<undefined> close();
undefined releaseLock();
Promise<undefined> write(optional any chunk);
Promise<undefined> write(optional any chunk, optional StructuredSerializeOptions options = { });
};
15 changes: 15 additions & 0 deletions reference-implementation/lib/abstract-ops/miscellaneous.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,18 @@ exports.CloneAsUint8Array = O => {
const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength);
return new Uint8Array(buffer);
};

exports.StructuredTransferOrClone = (value, transferList) => {
return globalThis.structuredClone(value, { transfer: transferList });
};

exports.RunCloseSteps = value => {
if (typeof value.close === 'function') {
return;
}
try {
value.close();
} catch (closeException) {
// Nothing to do.
}
};
14 changes: 11 additions & 3 deletions reference-implementation/lib/abstract-ops/queue-with-sizes.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const assert = require('assert');
const { IsNonNegativeNumber } = require('./miscellaneous.js');
const { IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } = require('./miscellaneous.js');

exports.DequeueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container);
Expand All @@ -15,7 +15,7 @@ exports.DequeueValue = container => {
return pair.value;
};

exports.EnqueueValueWithSize = (container, value, size) => {
exports.EnqueueValueWithSize = (container, value, size, transferList) => {
assert('_queue' in container && '_queueTotalSize' in container);

if (!IsNonNegativeNumber(size)) {
Expand All @@ -24,7 +24,9 @@ exports.EnqueueValueWithSize = (container, value, size) => {
if (size === Infinity) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}

if (container._isOwning && !container._isPipeToOptimizedTransfer) {
value = StructuredTransferOrClone(value, transferList);
}
container._queue.push({ value, size });
container._queueTotalSize += size;
};
Expand All @@ -40,6 +42,12 @@ exports.PeekQueueValue = container => {
exports.ResetQueue = container => {
assert('_queue' in container && '_queueTotalSize' in container);

if (container._isOwning) {
while (container._queue.length > 0) {
const value = exports.DequeueValue(container);
RunCloseSteps(value);
}
}
container._queue = [];
container._queueTotalSize = 0;
};
45 changes: 32 additions & 13 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re
require('../helpers/webidl.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js');
const { CloneAsUint8Array, IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } =
require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
Expand Down Expand Up @@ -81,15 +82,17 @@ function AcquireReadableStreamDefaultReader(stream) {
}

function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark = 1,
sizeAlgorithm = () => 1) {
sizeAlgorithm = () => 1, type = undefined) {
assert(IsNonNegativeNumber(highWaterMark) === true);

const stream = ReadableStream.new(globalThis);
InitializeReadableStream(stream);

const controller = ReadableStreamDefaultController.new(globalThis);
const isOwning = type === 'owning';

SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isOwning
);

return stream;
Expand Down Expand Up @@ -136,6 +139,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC

const reader = AcquireReadableStreamDefaultReader(source);
const writer = AcquireWritableStreamDefaultWriter(dest);
writer._stream._controller._isPipeToOptimizedTransfer = source._controller._isOwning && dest._controller._isOwning;

source._disturbed = true;

Expand Down Expand Up @@ -206,7 +210,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
{
chunkSteps: chunk => {
currentWrite = transformPromiseWith(
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {}
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {
if (reader._stream._controller._isOwning) {
RunCloseSteps(chunk);
}
}
);
resolveRead(false);
},
Expand Down Expand Up @@ -319,6 +327,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

function finalize(isError, error) {
writer._stream._controller._isPipeToOptimizedTransfer = undefined;
WritableStreamDefaultWriterRelease(writer);
ReadableStreamDefaultReaderRelease(reader);

Expand All @@ -340,7 +349,7 @@ function ReadableStreamTee(stream, cloneForBranch2) {
if (ReadableByteStreamController.isImpl(stream._controller)) {
return ReadableByteStreamTee(stream);
}
return ReadableStreamDefaultTee(stream, cloneForBranch2);
return ReadableStreamDefaultTee(stream, stream._controller._isOwning ? true : cloneForBranch2);
}

function ReadableStreamDefaultTee(stream, cloneForBranch2) {
Expand Down Expand Up @@ -392,10 +401,10 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
// }

if (canceled1 === false) {
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1);
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1, undefined);
}
if (canceled2 === false) {
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2);
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2, undefined);
}

reading = false;
Expand Down Expand Up @@ -1074,14 +1083,22 @@ function ReadableStreamDefaultControllerClose(controller) {
}
}

function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
function ReadableStreamDefaultControllerEnqueue(controller, chunk, transferList) {
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) {
return;
}

const stream = controller._stream;

if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
if (controller._isOwning) {
try {
chunk = StructuredTransferOrClone(chunk, transferList);
} catch (chunkCloneError) {
ReadableStreamDefaultControllerError(controller, chunkCloneError);
throw chunkCloneError;
}
}
ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize;
Expand All @@ -1093,7 +1110,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
}

try {
EnqueueValueWithSize(controller, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize, transferList);
} catch (enqueueE) {
ReadableStreamDefaultControllerError(controller, enqueueE);
throw enqueueE;
Expand Down Expand Up @@ -1148,7 +1165,7 @@ function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) {
}

function SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm) {
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isOwning) {
assert(stream._controller === undefined);

controller._stream = stream;
Expand All @@ -1169,6 +1186,8 @@ function SetUpReadableStreamDefaultController(
controller._pullAlgorithm = pullAlgorithm;
controller._cancelAlgorithm = cancelAlgorithm;

controller._isOwning = isOwning;

stream._controller = controller;

const startResult = startAlgorithm();
Expand All @@ -1195,7 +1214,7 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
let startAlgorithm = () => undefined;
let pullAlgorithm = () => promiseResolvedWith(undefined);
let cancelAlgorithm = () => promiseResolvedWith(undefined);

const isOwning = underlyingSourceDict.type === 'owning';
if ('start' in underlyingSourceDict) {
startAlgorithm = () => underlyingSourceDict.start.call(underlyingSource, controller);
}
Expand All @@ -1207,8 +1226,8 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
}

SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
);
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm,
isOwning);
}

// Byte stream controllers
Expand Down
Loading