Skip to content

Commit b84f101

Browse files
Mesteerytargos
authored andcommitted
stream: support array of streams in promises pipeline
Fixes: #40191 PR-URL: #40193 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 3f03355 commit b84f101

File tree

2 files changed

+45
-7
lines changed

2 files changed

+45
-7
lines changed

lib/internal/streams/pipeline.js

+4-7
Original file line numberDiff line numberDiff line change
@@ -166,17 +166,14 @@ async function pump(iterable, writable, finish) {
166166
}
167167

168168
function pipeline(...streams) {
169-
const callback = once(popCallback(streams));
169+
return pipelineImpl(streams, once(popCallback(streams)));
170+
}
170171

171-
// stream.pipeline(streams, callback)
172-
if (ArrayIsArray(streams[0]) && streams.length === 1) {
172+
function pipelineImpl(streams, callback, opts) {
173+
if (streams.length === 1 && ArrayIsArray(streams[0])) {
173174
streams = streams[0];
174175
}
175176

176-
return pipelineImpl(streams, callback);
177-
}
178-
179-
function pipelineImpl(streams, callback, opts) {
180177
if (streams.length < 2) {
181178
throw new ERR_MISSING_ARGS('streams');
182179
}

test/parallel/test-stream-pipeline.js

+41
Original file line numberDiff line numberDiff line change
@@ -1406,3 +1406,44 @@ const tsp = require('timers/promises');
14061406
}));
14071407
ac.abort();
14081408
}
1409+
1410+
{
1411+
async function run() {
1412+
let finished = false;
1413+
let text = '';
1414+
const write = new Writable({
1415+
write(data, enc, cb) {
1416+
text += data;
1417+
cb();
1418+
}
1419+
});
1420+
write.on('finish', () => {
1421+
finished = true;
1422+
});
1423+
1424+
await pipelinep([Readable.from('Hello World!'), write]);
1425+
assert(finished);
1426+
assert.strictEqual(text, 'Hello World!');
1427+
}
1428+
1429+
run();
1430+
}
1431+
1432+
{
1433+
let finished = false;
1434+
let text = '';
1435+
const write = new Writable({
1436+
write(data, enc, cb) {
1437+
text += data;
1438+
cb();
1439+
}
1440+
});
1441+
write.on('finish', () => {
1442+
finished = true;
1443+
});
1444+
1445+
pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => {
1446+
assert(finished);
1447+
assert.strictEqual(text, 'Hello World!');
1448+
}));
1449+
}

0 commit comments

Comments
 (0)