Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 26, 2021
1 parent cf96032 commit 5a8110f
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 13 deletions.
9 changes: 4 additions & 5 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const {
isReadable,
isWritable,
} = require('internal/streams/utils');
const duplexify = require('internal/streams/duplexify');
const {
AbortError,
codes: {
Expand All @@ -23,18 +22,18 @@ module.exports = function compose(...streams) {
}

if (streams.length === 1) {
return duplexify(streams[0], 'streams[0]');
return Duplex.from(streams[0]);
}

const orgStreams = [...streams];

if (typeof streams[0] === 'function') {
streams[0] = duplexify(streams[0], 'streams[0]');
streams[0] = Duplex.from(streams[0]);
}

if (typeof streams[streams.length - 1] === 'function') {
const idx = streams.length - 1;
streams[idx] = duplexify(streams[idx], `streams[${idx}]`);
streams[idx] = Duplex.from(streams[idx]);
}

for (let n = 0; n < streams.length; ++n) {
Expand Down Expand Up @@ -87,7 +86,7 @@ module.exports = function compose(...streams) {
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new Duplex({
highWaterMark: 1,
// TODO (ronag): highWaterMark?
writableObjectMode: !!head?.writableObjectMode,
readableObjectMode: !!tail?.writableObjectMode,
writable,
Expand Down
3 changes: 3 additions & 0 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ function _duplexify(pair) {
onfinished(err);
});

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new Duplex({
// TODO (ronag): highWaterMark?
readableObjectMode: !!r?.readableObjectMode,
Expand Down
8 changes: 3 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ const {
} = primordials;

const eos = require('internal/streams/end-of-stream');
const duplexify = require('internal/streams/duplexify');

const { once } = require('internal/util');
const destroyImpl = require('internal/streams/destroy');
const Duplex = require('internal/streams/duplex');
const {
aggregateTwoErrors,
codes: {
Expand Down Expand Up @@ -220,7 +219,7 @@ function pipeline(...streams) {
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
ret = duplexify(stream, 'source');
ret = Duplex.from(stream);
}
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
Expand Down Expand Up @@ -288,8 +287,7 @@ function pipeline(...streams) {
}
ret = stream;
} else {
const name = reading ? `transform[${i - 1}]` : 'destination';
ret = duplexify(stream, name);
ret = Duplex.from(stream);
}
}

Expand Down
6 changes: 5 additions & 1 deletion lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ const compose = require('internal/streams/compose');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
const duplexify = require('internal/streams/duplexify');

let duplexify;

const promises = require('stream/promises');

Expand All @@ -43,6 +44,9 @@ Stream.Readable = require('internal/streams/readable');
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Duplex.from = function from(body) {
if (!duplexify) {
duplexify = require('internal/streams/duplexify');
}
return duplexify(body, 'body');
};
Stream.Transform = require('internal/streams/transform');
Expand Down
2 changes: 0 additions & 2 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,8 @@ const expectedModules = new Set([
'NativeModule internal/stream_base_commons',
'NativeModule internal/streams/add-abort-signal',
'NativeModule internal/streams/buffer_list',
'NativeModule internal/streams/compose',
'NativeModule internal/streams/destroy',
'NativeModule internal/streams/duplex',
'NativeModule internal/streams/duplexify',
'NativeModule internal/streams/end-of-stream',
'NativeModule internal/streams/from',
'NativeModule internal/streams/legacy',
Expand Down

0 comments on commit 5a8110f

Please sign in to comment.