Skip to content

Commit

Permalink
stream: initial approach to include strategy options on Readable.toWeb()
Browse files Browse the repository at this point in the history
PR-URL: #43515
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
Warkanlock authored and danielleadams committed Jul 26, 2022
1 parent f32aec8 commit a057510
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 12 deletions.
6 changes: 5 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2789,7 +2789,7 @@ added:

Returns whether the stream is readable.

### `stream.Readable.toWeb(streamReadable)`
### `stream.Readable.toWeb(streamReadable[, options])`

<!-- YAML
added: v17.0.0
Expand All @@ -2798,6 +2798,10 @@ added: v17.0.0
> Stability: 1 - Experimental
* `streamReadable` {stream.Readable}
* `options` {Object}
* `strategy` {Object}
* `highWaterMark` {number}
* `size` {Function}
* Returns: {ReadableStream}

### `stream.Writable.fromWeb(writableStream[, options])`
Expand Down
6 changes: 4 additions & 2 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1405,8 +1405,10 @@ Readable.fromWeb = function(readableStream, options) {
options);
};

Readable.toWeb = function(streamReadable) {
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
Readable.toWeb = function(streamReadable, options) {
return lazyWebStreams().newReadableStreamFromStreamReadable(
streamReadable,
options);
};

Readable.wrap = function(src, options) {
Expand Down
34 changes: 25 additions & 9 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,14 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
}

/**
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
* @param {Readable} streamReadable
* @param {{
* strategy : QueuingStrategy
* }} [options]
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamReadable(streamReadable) {
function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) {
// Not using the internal/streams/utils isReadableNodeStream utility
// here because it will return false if streamReadable is a Duplex
// whose readable option is false. For a Duplex that is not readable,
Expand All @@ -382,14 +386,26 @@ function newReadableStreamFromStreamReadable(streamReadable) {

const objectMode = streamReadable.readableObjectMode;
const highWaterMark = streamReadable.readableHighWaterMark;
// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
const strategy =
objectMode ?
new CountQueuingStrategy({ highWaterMark }) :
{ highWaterMark };

const evaluateStrategyOrFallback = (strategy) => {
// If there is a strategy available, use it
if (strategy)
return strategy;

if (objectMode) {
// When running in objectMode explicitly but no strategy, we just fall
// back to CountQueuingStrategy
return new CountQueuingStrategy({ highWaterMark });
}

// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
return { highWaterMark };
};

const strategy = evaluateStrategyOrFallback(options?.strategy);

let controller;

Expand Down
75 changes: 75 additions & 0 deletions test/parallel/test-stream-readable-strategy-option.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
'use strict';
const common = require('../common');
const { Readable } = require('stream');
const assert = require('assert');
const { strictEqual } = require('assert');

{
// Strategy 2
const streamData = ['a', 'b', 'c', null];

// Fulfill a Readable object
const readable = new Readable({
read: common.mustCall(() => {
process.nextTick(() => {
readable.push(streamData.shift());
});
}, streamData.length),
});

// Use helper to convert it to a Web ReadableStream using ByteLength strategy
const readableStream = Readable.toWeb(readable, {
strategy: new ByteLengthQueuingStrategy({ highWaterMark: 1 }),
});

assert(!readableStream.locked);
readableStream.getReader().read().then(common.mustCall());
}

{
// Strategy 2
const streamData = ['a', 'b', 'c', null];

// Fulfill a Readable object
const readable = new Readable({
read: common.mustCall(() => {
process.nextTick(() => {
readable.push(streamData.shift());
});
}, streamData.length),
});

// Use helper to convert it to a Web ReadableStream using Count strategy
const readableStream = Readable.toWeb(readable, {
strategy: new CountQueuingStrategy({ highWaterMark: 1 }),
});

assert(!readableStream.locked);
readableStream.getReader().read().then(common.mustCall());
}

{
const desireSizeExpected = 2;

const stringStream = new ReadableStream(
{
start(controller) {
// Check if the strategy is being assigned on the init of the ReadableStream
strictEqual(controller.desiredSize, desireSizeExpected);
controller.enqueue('a');
controller.enqueue('b');
controller.close();
},
},
new CountQueuingStrategy({ highWaterMark: desireSizeExpected })
);

const reader = stringStream.getReader();

reader.read().then(common.mustCall());
reader.read().then(common.mustCall());
reader.read().then(({ value, done }) => {
strictEqual(value, undefined);
strictEqual(done, true);
});
}

0 comments on commit a057510

Please sign in to comment.