Skip to content

Commit e579acb

Browse files
committed
stream: add stream.compose
Refs: #32020 PR-URL: #39029 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Michaël Zasso <targos@protonmail.com>
1 parent 36bcc29 commit e579acb

File tree

7 files changed

+783
-4
lines changed

7 files changed

+783
-4
lines changed

doc/api/stream.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1856,6 +1856,48 @@ run().catch(console.error);
18561856
after the `callback` has been invoked. In the case of reuse of streams after
18571857
failure, this can cause event listener leaks and swallowed errors.
18581858

1859+
### `stream.compose(...streams)`
1860+
<!-- YAML
1861+
added: REPLACEME
1862+
-->
1863+
1864+
* `streams` {Stream[]}
1865+
* Returns: {stream.Duplex}
1866+
1867+
Combines two or more streams into a `Duplex` stream that writes to the
1868+
first stream and reads from the last. Each provided stream is piped into
1869+
the next, using `stream.pipeline`. If any of the streams error then all
1870+
are destroyed, including the outer `Duplex` stream.
1871+
1872+
Because `stream.compose` returns a new stream that in turn can (and
1873+
should) be piped into other streams, it enables composition. In contrast,
1874+
when passing streams to `stream.pipeline`, typically the first stream is
1875+
a readable stream and the last a writable stream, forming a closed
1876+
circuit.
1877+
1878+
```mjs
1879+
import { compose, Transform } from 'stream';
1880+
1881+
const removeSpaces = new Transform({
1882+
transform(chunk, encoding, callback) {
1883+
callback(null, String(chunk).replace(' ', ''));
1884+
}
1885+
});
1886+
1887+
const toUpper = new Transform({
1888+
transform(chunk, encoding, callback) {
1889+
callback(null, String(chunk).toUpperCase());
1890+
}
1891+
});
1892+
1893+
let res = '';
1894+
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
1895+
res += buf;
1896+
}
1897+
1898+
console.log(res); // prints 'HELLOWORLD'
1899+
```
1900+
18591901
### `stream.Readable.from(iterable, [options])`
18601902
<!-- YAML
18611903
added:

lib/internal/streams/compose.js

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
'use strict';
2+
3+
const pipeline = require('internal/streams/pipeline');
4+
const Duplex = require('internal/streams/duplex');
5+
const { createDeferredPromise } = require('internal/util');
6+
const { destroyer } = require('internal/streams/destroy');
7+
const from = require('internal/streams/from');
8+
const {
9+
isNodeStream,
10+
isIterable,
11+
isReadable,
12+
isWritable,
13+
} = require('internal/streams/utils');
14+
const {
15+
PromiseResolve,
16+
} = primordials;
17+
const {
18+
AbortError,
19+
codes: {
20+
ERR_INVALID_ARG_TYPE,
21+
ERR_INVALID_ARG_VALUE,
22+
ERR_INVALID_RETURN_VALUE,
23+
ERR_MISSING_ARGS,
24+
},
25+
} = require('internal/errors');
26+
const assert = require('internal/assert');
27+
28+
// This is needed for pre node 17.
29+
class ComposeDuplex extends Duplex {
30+
constructor(options) {
31+
super(options);
32+
33+
// https://github.com/nodejs/node/pull/34385
34+
35+
if (options?.readable === false) {
36+
this._readableState.readable = false;
37+
this._readableState.ended = true;
38+
this._readableState.endEmitted = true;
39+
}
40+
41+
if (options?.writable === false) {
42+
this._writableState.writable = false;
43+
this._writableState.ending = true;
44+
this._writableState.ended = true;
45+
this._writableState.finished = true;
46+
}
47+
}
48+
}
49+
50+
module.exports = function compose(...streams) {
51+
if (streams.length === 0) {
52+
throw new ERR_MISSING_ARGS('streams');
53+
}
54+
55+
if (streams.length === 1) {
56+
return makeDuplex(streams[0], 'streams[0]');
57+
}
58+
59+
const orgStreams = [...streams];
60+
61+
if (typeof streams[0] === 'function') {
62+
streams[0] = makeDuplex(streams[0], 'streams[0]');
63+
}
64+
65+
if (typeof streams[streams.length - 1] === 'function') {
66+
const idx = streams.length - 1;
67+
streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`);
68+
}
69+
70+
for (let n = 0; n < streams.length; ++n) {
71+
if (!isNodeStream(streams[n])) {
72+
// TODO(ronag): Add checks for non streams.
73+
continue;
74+
}
75+
if (n < streams.length - 1 && !isReadable(streams[n])) {
76+
throw new ERR_INVALID_ARG_VALUE(
77+
`streams[${n}]`,
78+
orgStreams[n],
79+
'must be readable'
80+
);
81+
}
82+
if (n > 0 && !isWritable(streams[n])) {
83+
throw new ERR_INVALID_ARG_VALUE(
84+
`streams[${n}]`,
85+
orgStreams[n],
86+
'must be writable'
87+
);
88+
}
89+
}
90+
91+
let ondrain;
92+
let onfinish;
93+
let onreadable;
94+
let onclose;
95+
let d;
96+
97+
function onfinished(err) {
98+
const cb = onclose;
99+
onclose = null;
100+
101+
if (cb) {
102+
cb(err);
103+
} else if (err) {
104+
d.destroy(err);
105+
} else if (!readable && !writable) {
106+
d.destroy();
107+
}
108+
}
109+
110+
const head = streams[0];
111+
const tail = pipeline(streams, onfinished);
112+
113+
const writable = !!isWritable(head);
114+
const readable = !!isReadable(tail);
115+
116+
// TODO(ronag): Avoid double buffering.
117+
// Implement Writable/Readable/Duplex traits.
118+
// See, https://github.com/nodejs/node/pull/33515.
119+
d = new ComposeDuplex({
120+
highWaterMark: 1,
121+
writableObjectMode: !!head?.writableObjectMode,
122+
readableObjectMode: !!tail?.writableObjectMode,
123+
writable,
124+
readable,
125+
});
126+
127+
if (writable) {
128+
d._write = function(chunk, encoding, callback) {
129+
if (head.write(chunk, encoding)) {
130+
callback();
131+
} else {
132+
ondrain = callback;
133+
}
134+
};
135+
136+
d._final = function(callback) {
137+
head.end();
138+
onfinish = callback;
139+
};
140+
141+
head.on('drain', function() {
142+
if (ondrain) {
143+
const cb = ondrain;
144+
ondrain = null;
145+
cb();
146+
}
147+
});
148+
149+
tail.on('finish', function() {
150+
if (onfinish) {
151+
const cb = onfinish;
152+
onfinish = null;
153+
cb();
154+
}
155+
});
156+
}
157+
158+
if (readable) {
159+
tail.on('readable', function() {
160+
if (onreadable) {
161+
const cb = onreadable;
162+
onreadable = null;
163+
cb();
164+
}
165+
});
166+
167+
tail.on('end', function() {
168+
d.push(null);
169+
});
170+
171+
d._read = function() {
172+
while (true) {
173+
const buf = tail.read();
174+
175+
if (buf === null) {
176+
onreadable = d._read;
177+
return;
178+
}
179+
180+
if (!d.push(buf)) {
181+
return;
182+
}
183+
}
184+
};
185+
}
186+
187+
d._destroy = function(err, callback) {
188+
if (!err && onclose !== null) {
189+
err = new AbortError();
190+
}
191+
192+
onreadable = null;
193+
ondrain = null;
194+
onfinish = null;
195+
196+
if (onclose === null) {
197+
callback(err);
198+
} else {
199+
onclose = callback;
200+
destroyer(tail, err);
201+
}
202+
};
203+
204+
return d;
205+
};
206+
207+
function makeDuplex(stream, name) {
208+
let ret;
209+
if (typeof stream === 'function') {
210+
assert(stream.length > 0);
211+
212+
const { value, write, final } = fromAsyncGen(stream);
213+
214+
if (isIterable(value)) {
215+
ret = from(ComposeDuplex, value, {
216+
objectMode: true,
217+
highWaterMark: 1,
218+
write,
219+
final
220+
});
221+
} else if (typeof value?.then === 'function') {
222+
const promise = PromiseResolve(value)
223+
.then((val) => {
224+
if (val != null) {
225+
throw new ERR_INVALID_RETURN_VALUE('nully', name, val);
226+
}
227+
})
228+
.catch((err) => {
229+
destroyer(ret, err);
230+
});
231+
232+
ret = new ComposeDuplex({
233+
objectMode: true,
234+
highWaterMark: 1,
235+
readable: false,
236+
write,
237+
final(cb) {
238+
final(() => promise.then(cb, cb));
239+
}
240+
});
241+
} else {
242+
throw new ERR_INVALID_RETURN_VALUE(
243+
'Iterable, AsyncIterable or AsyncFunction', name, value);
244+
}
245+
} else if (isNodeStream(stream)) {
246+
ret = stream;
247+
} else if (isIterable(stream)) {
248+
ret = from(ComposeDuplex, stream, {
249+
objectMode: true,
250+
highWaterMark: 1,
251+
writable: false
252+
});
253+
} else {
254+
throw new ERR_INVALID_ARG_TYPE(
255+
name,
256+
['Stream', 'Iterable', 'AsyncIterable', 'Function'],
257+
stream)
258+
;
259+
}
260+
return ret;
261+
}
262+
263+
function fromAsyncGen(fn) {
264+
let { promise, resolve } = createDeferredPromise();
265+
const value = fn(async function*() {
266+
while (true) {
267+
const { chunk, done, cb } = await promise;
268+
process.nextTick(cb);
269+
if (done) return;
270+
yield chunk;
271+
({ promise, resolve } = createDeferredPromise());
272+
}
273+
}());
274+
275+
return {
276+
value,
277+
write(chunk, encoding, cb) {
278+
resolve({ chunk, done: false, cb });
279+
},
280+
final(cb) {
281+
resolve({ done: true, cb });
282+
}
283+
};
284+
}

lib/internal/streams/pipeline.js

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,6 @@ function pipeline(...streams) {
295295
}
296296
}
297297

298-
// TODO(ronag): Consider returning a Duplex proxy if the first argument
299-
// is a writable. Would improve composability.
300-
// See, https://github.com/nodejs/node/issues/32020
301298
return ret;
302299
}
303300

lib/internal/streams/utils.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,15 @@ function isWritableNodeStream(obj) {
2828
}
2929

3030
function isNodeStream(obj) {
31-
return isReadableNodeStream(obj) || isWritableNodeStream(obj);
31+
return (
32+
obj &&
33+
(
34+
obj._readableState ||
35+
obj._writableState ||
36+
(typeof obj.write === 'function' && typeof obj.on === 'function') ||
37+
(typeof obj.pipe === 'function' && typeof obj.on === 'function')
38+
)
39+
);
3240
}
3341

3442
function isIterable(obj, isAsync) {

lib/stream.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,16 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33+
const _compose = require('internal/streams/compose');
3334
const { destroyer } = require('internal/streams/destroy');
3435
const eos = require('internal/streams/end-of-stream');
3536
const internalBuffer = require('internal/buffer');
37+
const { isNodeStream } = require('internal/streams/utils');
38+
const {
39+
codes: {
40+
ERR_INVALID_ARG_VALUE,
41+
},
42+
} = require('internal/errors');
3643

3744
const promises = require('stream/promises');
3845

@@ -48,6 +55,21 @@ Stream.addAbortSignal = addAbortSignal;
4855
Stream.finished = eos;
4956
Stream.destroy = destroyer;
5057

58+
Stream.compose = function compose(...streams) {
59+
// TODO (ronag): Remove this once async function API
60+
// has been discussed.
61+
for (let n = 0; n < streams.length; ++n) {
62+
if (!isNodeStream(streams[n])) {
63+
throw new ERR_INVALID_ARG_VALUE(
64+
`streams[${n}]`,
65+
streams[n],
66+
'must be stream'
67+
);
68+
}
69+
}
70+
return _compose(...streams);
71+
};
72+
5173
ObjectDefineProperty(Stream, 'promises', {
5274
configurable: true,
5375
enumerable: true,

0 commit comments

Comments
 (0)