Skip to content

Commit 9ab353a

Browse files
MattiasBuelenstargos
authored andcommitted
stream: implement min option for ReadableStreamBYOBReader.read
PR-URL: #50888 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Debadree Chatterjee <debadree333@gmail.com>
1 parent 595542e commit 9ab353a

15 files changed

+968
-114
lines changed

doc/api/webstreams.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ added: v16.5.0
492492
-->
493493

494494
* Returns: A promise fulfilled with an object:
495-
* `value` {ArrayBuffer}
495+
* `value` {any}
496496
* `done` {boolean}
497497

498498
Requests the next chunk of data from the underlying {ReadableStream}
@@ -617,15 +617,24 @@ added: v16.5.0
617617
{ReadableStream} is closed or rejected if the stream errors or the reader's
618618
lock is released before the stream finishes closing.
619619
620-
#### `readableStreamBYOBReader.read(view)`
620+
#### `readableStreamBYOBReader.read(view[, options])`
621621
622622
<!-- YAML
623623
added: v16.5.0
624+
changes:
625+
- version: REPLACEME
626+
pr-url: https://github.com/nodejs/node/pull/50888
627+
description: Added `min` option.
624628
-->
625629
626630
* `view` {Buffer|TypedArray|DataView}
631+
* `options` {Object}
632+
* `min` {number} When set, the returned promise will only be
633+
fulfilled as soon as `min` number of elements are available.
634+
When not set, the promise fulfills when at least one element
635+
is available.
627636
* Returns: A promise fulfilled with an object:
628-
* `value` {ArrayBuffer}
637+
* `value` {TypedArray|DataView}
629638
* `done` {boolean}
630639
631640
Requests the next chunk of data from the underlying {ReadableStream}

lib/internal/encoding.js

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@ const {
4747
const {
4848
validateString,
4949
validateObject,
50-
kValidateObjectAllowNullable,
51-
kValidateObjectAllowArray,
52-
kValidateObjectAllowFunction,
50+
kValidateObjectAllowObjectsAndNull,
5351
} = require('internal/validators');
5452
const binding = internalBinding('encoding_binding');
5553
const {
@@ -393,10 +391,6 @@ const TextDecoder =
393391
makeTextDecoderICU() :
394392
makeTextDecoderJS();
395393

396-
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
397-
kValidateObjectAllowArray |
398-
kValidateObjectAllowFunction;
399-
400394
function makeTextDecoderICU() {
401395
const {
402396
decode: _decode,

lib/internal/validators.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ const kValidateObjectNone = 0;
222222
const kValidateObjectAllowNullable = 1 << 0;
223223
const kValidateObjectAllowArray = 1 << 1;
224224
const kValidateObjectAllowFunction = 1 << 2;
225+
const kValidateObjectAllowObjects = kValidateObjectAllowArray |
226+
kValidateObjectAllowFunction;
227+
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
228+
kValidateObjectAllowArray |
229+
kValidateObjectAllowFunction;
225230

226231
/**
227232
* @callback validateObject
@@ -583,6 +588,8 @@ module.exports = {
583588
kValidateObjectAllowNullable,
584589
kValidateObjectAllowArray,
585590
kValidateObjectAllowFunction,
591+
kValidateObjectAllowObjects,
592+
kValidateObjectAllowObjectsAndNull,
586593
validateOneOf,
587594
validatePlainFunction,
588595
validatePort,

lib/internal/webstreams/readablestream.js

Lines changed: 67 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const {
2222
SymbolAsyncIterator,
2323
SymbolDispose,
2424
SymbolToStringTag,
25+
TypedArrayPrototypeGetLength,
2526
Uint8Array,
2627
} = primordials;
2728

@@ -33,6 +34,7 @@ const {
3334
ERR_INVALID_ARG_TYPE,
3435
ERR_INVALID_STATE,
3536
ERR_INVALID_THIS,
37+
ERR_OUT_OF_RANGE,
3638
},
3739
} = require('internal/errors');
3840

@@ -58,8 +60,8 @@ const {
5860
validateAbortSignal,
5961
validateBuffer,
6062
validateObject,
61-
kValidateObjectAllowNullable,
62-
kValidateObjectAllowFunction,
63+
kValidateObjectAllowObjects,
64+
kValidateObjectAllowObjectsAndNull,
6365
} = require('internal/validators');
6466

6567
const {
@@ -246,10 +248,10 @@ class ReadableStream {
246248
* @param {UnderlyingSource} [source]
247249
* @param {QueuingStrategy} [strategy]
248250
*/
249-
constructor(source = {}, strategy = kEmptyObject) {
251+
constructor(source = kEmptyObject, strategy = kEmptyObject) {
250252
markTransferMode(this, false, true);
251-
if (source === null)
252-
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
253+
validateObject(source, 'source', kValidateObjectAllowObjects);
254+
validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);
253255
this[kState] = createReadableStreamState();
254256

255257
this[kIsClosedPromise] = createDeferredPromise();
@@ -332,7 +334,7 @@ class ReadableStream {
332334
getReader(options = kEmptyObject) {
333335
if (!isReadableStream(this))
334336
throw new ERR_INVALID_THIS('ReadableStream');
335-
validateObject(options, 'options', kValidateObjectAllowNullable | kValidateObjectAllowFunction);
337+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
336338
const mode = options?.mode;
337339

338340
if (mode === undefined)
@@ -370,6 +372,7 @@ class ReadableStream {
370372

371373
// The web platform tests require that these be handled one at a
372374
// time and in a specific order. options can be null or undefined.
375+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
373376
const preventAbort = options?.preventAbort;
374377
const preventCancel = options?.preventCancel;
375378
const preventClose = options?.preventClose;
@@ -412,6 +415,7 @@ class ReadableStream {
412415
destination);
413416
}
414417

418+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
415419
const preventAbort = options?.preventAbort;
416420
const preventCancel = options?.preventCancel;
417421
const preventClose = options?.preventClose;
@@ -456,10 +460,8 @@ class ReadableStream {
456460
values(options = kEmptyObject) {
457461
if (!isReadableStream(this))
458462
throw new ERR_INVALID_THIS('ReadableStream');
459-
validateObject(options, 'options');
460-
const {
461-
preventCancel = false,
462-
} = options;
463+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
464+
const preventCancel = !!(options?.preventCancel);
463465

464466
// eslint-disable-next-line no-use-before-define
465467
const reader = new ReadableStreamDefaultReader(this);
@@ -929,47 +931,62 @@ class ReadableStreamBYOBReader {
929931

930932
/**
931933
* @param {ArrayBufferView} view
934+
* @param {{
935+
* min? : number
936+
* }} [options]
932937
* @returns {Promise<{
933-
* view : ArrayBufferView,
938+
* value : ArrayBufferView,
934939
* done : boolean,
935940
* }>}
936941
*/
937-
read(view) {
942+
async read(view, options = kEmptyObject) {
938943
if (!isReadableStreamBYOBReader(this))
939-
return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
944+
throw new ERR_INVALID_THIS('ReadableStreamBYOBReader');
940945
if (!isArrayBufferView(view)) {
941-
return PromiseReject(
942-
new ERR_INVALID_ARG_TYPE(
943-
'view',
944-
[
945-
'Buffer',
946-
'TypedArray',
947-
'DataView',
948-
],
949-
view));
946+
throw new ERR_INVALID_ARG_TYPE(
947+
'view',
948+
[
949+
'Buffer',
950+
'TypedArray',
951+
'DataView',
952+
],
953+
view,
954+
);
950955
}
956+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
951957

952958
const viewByteLength = ArrayBufferViewGetByteLength(view);
953959
const viewBuffer = ArrayBufferViewGetBuffer(view);
954960
const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);
955961

956962
if (viewByteLength === 0 || viewBufferByteLength === 0) {
957-
return PromiseReject(
958-
new ERR_INVALID_STATE.TypeError(
959-
'View or Viewed ArrayBuffer is zero-length or detached',
960-
),
961-
);
963+
throw new ERR_INVALID_STATE.TypeError(
964+
'View or Viewed ArrayBuffer is zero-length or detached');
962965
}
963966

964967
// Supposed to assert here that the view's buffer is not
965968
// detached, but there's no API available to use to check that.
969+
970+
const min = options?.min ?? 1;
971+
if (typeof min !== 'number')
972+
throw new ERR_INVALID_ARG_TYPE('options.min', 'number', min);
973+
if (!NumberIsInteger(min))
974+
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be an integer');
975+
if (min <= 0)
976+
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be greater than 0');
977+
if (!isDataView(view)) {
978+
if (min > TypedArrayPrototypeGetLength(view)) {
979+
throw new ERR_OUT_OF_RANGE('options.min', '<= view.length', min);
980+
}
981+
} else if (min > viewByteLength) {
982+
throw new ERR_OUT_OF_RANGE('options.min', '<= view.byteLength', min);
983+
}
984+
966985
if (this[kState].stream === undefined) {
967-
return PromiseReject(
968-
new ERR_INVALID_STATE.TypeError(
969-
'The reader is not attached to a stream'));
986+
throw new ERR_INVALID_STATE.TypeError('The reader is not attached to a stream');
970987
}
971988
const readIntoRequest = new ReadIntoRequest();
972-
readableStreamBYOBReaderRead(this, view, readIntoRequest);
989+
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
973990
return readIntoRequest.promise;
974991
}
975992

@@ -1883,7 +1900,7 @@ function readableByteStreamTee(stream) {
18831900
reading = false;
18841901
},
18851902
};
1886-
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1903+
readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
18871904
}
18881905

18891906
function pull1Algorithm() {
@@ -2210,7 +2227,7 @@ function readableStreamReaderGenericRelease(reader) {
22102227
reader[kState].stream = undefined;
22112228
}
22122229

2213-
function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
2230+
function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
22142231
const {
22152232
stream,
22162233
} = reader[kState];
@@ -2223,6 +2240,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
22232240
readableByteStreamControllerPullInto(
22242241
stream[kState].controller,
22252242
view,
2243+
min,
22262244
readIntoRequest);
22272245
}
22282246

@@ -2495,7 +2513,7 @@ function readableByteStreamControllerClose(controller) {
24952513

24962514
if (pendingPullIntos.length) {
24972515
const firstPendingPullInto = pendingPullIntos[0];
2498-
if (firstPendingPullInto.bytesFilled > 0) {
2516+
if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
24992517
const error = new ERR_INVALID_STATE.TypeError('Partial read');
25002518
readableByteStreamControllerError(controller, error);
25012519
throw error;
@@ -2512,7 +2530,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
25122530

25132531
let done = false;
25142532
if (stream[kState].state === 'closed') {
2515-
desc.bytesFilled = 0;
2533+
assert(desc.bytesFilled % desc.elementSize === 0);
25162534
done = true;
25172535
}
25182536

@@ -2601,6 +2619,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {
26012619
function readableByteStreamControllerPullInto(
26022620
controller,
26032621
view,
2622+
min,
26042623
readIntoRequest) {
26052624
const {
26062625
closeRequested,
@@ -2613,6 +2632,11 @@ function readableByteStreamControllerPullInto(
26132632
elementSize = view.constructor.BYTES_PER_ELEMENT;
26142633
ctor = view.constructor;
26152634
}
2635+
2636+
const minimumFill = min * elementSize;
2637+
assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
2638+
assert(minimumFill % elementSize === 0);
2639+
26162640
const buffer = ArrayBufferViewGetBuffer(view);
26172641
const byteOffset = ArrayBufferViewGetByteOffset(view);
26182642
const byteLength = ArrayBufferViewGetByteLength(view);
@@ -2631,6 +2655,7 @@ function readableByteStreamControllerPullInto(
26312655
byteOffset,
26322656
byteLength,
26332657
bytesFilled: 0,
2658+
minimumFill,
26342659
elementSize,
26352660
ctor,
26362661
type: 'byob',
@@ -2718,7 +2743,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
27182743
}
27192744

27202745
function readableByteStreamControllerRespondInClosedState(controller, desc) {
2721-
assert(!desc.bytesFilled);
2746+
assert(desc.bytesFilled % desc.elementSize === 0);
27222747
if (desc.type === 'none') {
27232748
readableByteStreamControllerShiftPendingPullInto(controller);
27242749
}
@@ -2895,17 +2920,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
28952920
byteLength,
28962921
byteOffset,
28972922
bytesFilled,
2923+
minimumFill,
28982924
elementSize,
28992925
} = desc;
2900-
const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
29012926
const maxBytesToCopy = MathMin(
29022927
controller[kState].queueTotalSize,
29032928
byteLength - bytesFilled);
29042929
const maxBytesFilled = bytesFilled + maxBytesToCopy;
29052930
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
29062931
let totalBytesToCopyRemaining = maxBytesToCopy;
29072932
let ready = false;
2908-
if (maxAlignedBytes > currentAlignedBytes) {
2933+
assert(bytesFilled < minimumFill);
2934+
if (maxAlignedBytes >= minimumFill) {
29092935
totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
29102936
ready = true;
29112937
}
@@ -2948,7 +2974,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
29482974
if (!ready) {
29492975
assert(!controller[kState].queueTotalSize);
29502976
assert(desc.bytesFilled > 0);
2951-
assert(desc.bytesFilled < elementSize);
2977+
assert(desc.bytesFilled < minimumFill);
29522978
}
29532979
return ready;
29542980
}
@@ -3004,7 +3030,7 @@ function readableByteStreamControllerRespondInReadableState(
30043030
return;
30053031
}
30063032

3007-
if (desc.bytesFilled < desc.elementSize)
3033+
if (desc.bytesFilled < desc.minimumFill)
30083034
return;
30093035

30103036
readableByteStreamControllerShiftPendingPullInto(controller);
@@ -3189,6 +3215,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {
31893215
byteOffset: 0,
31903216
byteLength: autoAllocateChunkSize,
31913217
bytesFilled: 0,
3218+
minimumFill: 1,
31923219
elementSize: 1,
31933220
ctor: Uint8Array,
31943221
type: 'default',

lib/internal/webstreams/transformstream.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ const {
2929
kEnumerableProperty,
3030
} = require('internal/util');
3131

32+
const {
33+
validateObject,
34+
kValidateObjectAllowObjects,
35+
kValidateObjectAllowObjectsAndNull,
36+
} = require('internal/validators');
37+
3238
const {
3339
kDeserialize,
3440
kTransfer,
@@ -119,10 +125,13 @@ class TransformStream {
119125
* @param {QueuingStrategy} [readableStrategy]
120126
*/
121127
constructor(
122-
transformer = null,
128+
transformer = kEmptyObject,
123129
writableStrategy = kEmptyObject,
124130
readableStrategy = kEmptyObject) {
125131
markTransferMode(this, false, true);
132+
validateObject(transformer, 'transformer', kValidateObjectAllowObjects);
133+
validateObject(writableStrategy, 'writableStrategy', kValidateObjectAllowObjectsAndNull);
134+
validateObject(readableStrategy, 'readableStrategy', kValidateObjectAllowObjectsAndNull);
126135
const readableType = transformer?.readableType;
127136
const writableType = transformer?.writableType;
128137
const start = transformer?.start;

0 commit comments

Comments
 (0)