Skip to content

Commit

Permalink
stream: add stream.compose
Browse files Browse the repository at this point in the history
Refs: #32020

PR-URL: #39029
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
Backport-PR-URL: #39563
  • Loading branch information
ronag authored and targos committed Sep 6, 2021
1 parent 9e782eb commit cb44781
Show file tree
Hide file tree
Showing 10 changed files with 829 additions and 44 deletions.
89 changes: 89 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,95 @@ run().catch(console.error);
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.

### `stream.compose(...streams)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - `stream.compose` is experimental.
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
* Returns: {stream.Duplex}

Combines two or more streams into a `Duplex` stream that writes to the
first stream and reads from the last. Each provided stream is piped into
the next, using `stream.pipeline`. If any of the streams error then all
are destroyed, including the outer `Duplex` stream.

Because `stream.compose` returns a new stream that in turn can (and
should) be piped into other streams, it enables composition. In contrast,
when passing streams to `stream.pipeline`, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.

If passed a `Function` it must be a factory method taking a `source`
`Iterable`.

```mjs
import { compose, Transform } from 'stream';

const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
}
});

async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}

console.log(res); // prints 'HELLOWORLD'
```

`stream.compose` can be used to convert async iterables, generators and
functions into streams.

* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
`null`.
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
Must take a source `AsyncIterable` as first parameter. Cannot yield
`null`.
* `AsyncFunction` converts into a writable `Duplex`. Must return
either `null` or `undefined`.

```mjs
import { compose } from 'stream';
import { finished } from 'stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD'
```

### `stream.Readable.from(iterable, [options])`
<!-- YAML
added:
Expand Down
196 changes: 196 additions & 0 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');
const {
isNodeStream,
isReadable,
isWritable,
} = require('internal/streams/utils');
const {
AbortError,
codes: {
ERR_INVALID_ARG_VALUE,
ERR_MISSING_ARGS,
},
} = require('internal/errors');

// This is needed for pre node 17.
class ComposeDuplex extends Duplex {
constructor(options) {
super(options);

// https://github.com/nodejs/node/pull/34385

if (options?.readable === false) {
this._readableState.readable = false;
this._readableState.ended = true;
this._readableState.endEmitted = true;
}

if (options?.writable === false) {
this._writableState.writable = false;
this._writableState.ending = true;
this._writableState.ended = true;
this._writableState.finished = true;
}
}
}

module.exports = function compose(...streams) {
if (streams.length === 0) {
throw new ERR_MISSING_ARGS('streams');
}

if (streams.length === 1) {
return Duplex.from(streams[0]);
}

const orgStreams = [...streams];

if (typeof streams[0] === 'function') {
streams[0] = Duplex.from(streams[0]);
}

if (typeof streams[streams.length - 1] === 'function') {
const idx = streams.length - 1;
streams[idx] = Duplex.from(streams[idx]);
}

for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue;
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be readable'
);
}
if (n > 0 && !isWritable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be writable'
);
}
}

let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;

function onfinished(err) {
const cb = onclose;
onclose = null;

if (cb) {
cb(err);
} else if (err) {
d.destroy(err);
} else if (!readable && !writable) {
d.destroy();
}
}

const head = streams[0];
const tail = pipeline(streams, onfinished);

const writable = !!isWritable(head);
const readable = !!isReadable(tail);

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new ComposeDuplex({
// TODO (ronag): highWaterMark?
writableObjectMode: !!head?.writableObjectMode,
readableObjectMode: !!tail?.writableObjectMode,
writable,
readable,
});

if (writable) {
d._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
head.end();
onfinish = callback;
};

head.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});

tail.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (readable) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});

tail.on('end', function() {
d.push(null);
});

d._read = function() {
while (true) {
const buf = tail.read();

if (buf === null) {
onreadable = d._read;
return;
}

if (!d.push(buf)) {
return;
}
}
};
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}

onreadable = null;
ondrain = null;
onfinish = null;

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
}
};

return d;
};
19 changes: 2 additions & 17 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
const {
isIterable,
isNodeStream,
isDestroyed,
isReadable,
isReadableNodeStream,
isWritable,
isWritableNodeStream,
isDuplexNodeStream,
isReadableFinished,
isWritableEnded,
} = require('internal/streams/utils');
const eos = require('internal/streams/end-of-stream');
const {
Expand All @@ -32,20 +31,6 @@ const {
FunctionPrototypeCall
} = primordials;

function isReadable(stream) {
const r = isReadableNodeStream(stream);
if (r === null || typeof stream?.readable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.readable && !isReadableFinished(stream);
}

function isWritable(stream) {
const r = isWritableNodeStream(stream);
if (r === null || typeof stream?.writable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.writable && !isWritableEnded(stream);
}

// This is needed for pre node 17.
class Duplexify extends Duplex {
constructor(options) {
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ function eos(stream, options, callback) {
(rState && rState.errorEmitted) ||
(rState && stream.req && stream.aborted) ||
(
(!wState || !willEmitClose || typeof wState.closed !== 'boolean') &&
(!rState || !willEmitClose || typeof rState.closed !== 'boolean') &&
(!writable || (wState && wState.finished)) &&
(!readable || (rState && rState.endEmitted))
)
Expand Down
17 changes: 7 additions & 10 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');

const {
isIterable,
isReadable,
isStream,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');

let PassThrough;
Expand Down Expand Up @@ -87,7 +87,7 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadable(val)) {
} else if (isReadableNodeStream(val)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
Expand Down Expand Up @@ -204,7 +204,7 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
}
Expand All @@ -216,7 +216,7 @@ function pipeline(...streams) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadable(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
Expand Down Expand Up @@ -269,8 +269,8 @@ function pipeline(...streams) {
finishCount++;
destroys.push(destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);

// Compat. Before node v10.12.0 stdio used to throw an error so
Expand All @@ -291,9 +291,6 @@ function pipeline(...streams) {
}
}

// TODO(ronag): Consider returning a Duplex proxy if the first argument
// is a writable. Would improve composability.
// See, https://github.com/nodejs/node/issues/32020
return ret;
}

Expand Down
Loading

1 comment on commit cb44781

@JoakimCh
Copy link

@JoakimCh JoakimCh commented on cb44781 Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const duplexStream = duplexer3(subprocess.stdin, subprocess.stdout) // this one works
const duplexStream =   compose(subprocess.stdin, subprocess.stdout) // this one doesn't

Is it not suppoed to do the same as duplexer3?
I ask here before creating an issue...

The error I get:
The argument 'streams[0]' must be readable.

Please sign in to comment.