Skip to content

Commit 3444617

Browse files
committed
fixup: faster version
1 parent 1d0a7a5 commit 3444617

File tree

2 files changed

+36
-25
lines changed

2 files changed

+36
-25
lines changed

lib/internal/streams/pipeline.js

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -146,37 +146,43 @@ async function* fromReadable(val) {
146146
yield* createReadableStreamAsyncIterator(val);
147147
}
148148

149+
function pumpStdio(iterable, writable, finish) {
150+
let iterator;
151+
if (iterable[SymbolAsyncIterator])
152+
iterator = iterable[SymbolAsyncIterator]();
153+
else if (iterable[SymbolIterator])
154+
iterator = iterable[SymbolIterator]();
155+
156+
async function _next(err) {
157+
if (err) {
158+
return finish(err);
159+
}
160+
try {
161+
const { value, done } = await iterator.next();
162+
if (done) {
163+
return finish();
164+
}
165+
writable.write(value, _next);
166+
} catch (err) {
167+
finish(err);
168+
}
169+
}
170+
_next();
171+
}
172+
149173
async function pump(iterable, writable, finish) {
150174
if (!EE) {
151175
EE = require('events');
152176
}
153177
let error;
154178
try {
155-
if (!isStdio(writable)) {
156-
for await (const chunk of iterable) {
157-
if (!writable.write(chunk)) {
158-
if (writable.destroyed) return;
159-
await EE.once(writable, 'drain');
160-
}
161-
}
162-
writable.end();
163-
} else {
164-
const errorPromise = new Promise((resolve, reject) => {
165-
writable.on('error', reject);
166-
});
167-
// Don't propagate to unhandledRejection
168-
errorPromise.catch(() => {});
169-
for await (const chunk of iterable) {
170-
await Promise.race([
171-
errorPromise,
172-
new Promise((resolve, reject) => {
173-
writable.write(chunk, null, (err) => {
174-
err ? reject(err) : resolve();
175-
});
176-
})
177-
]);
179+
for await (const chunk of iterable) {
180+
if (!writable.write(chunk)) {
181+
if (writable.destroyed) return;
182+
await EE.once(writable, 'drain');
178183
}
179184
}
185+
writable.end();
180186
} catch (err) {
181187
error = err;
182188
} finally {
@@ -292,7 +298,11 @@ function pipeline(...streams) {
292298
ret = makeAsyncIterable(ret);
293299

294300
finishCount++;
295-
pump(ret, stream, finish);
301+
if (isStdio(stream)) {
302+
pumpStdio(ret, stream, finish);
303+
} else {
304+
pump(ret, stream, finish);
305+
}
296306
}
297307
ret = stream;
298308
} else {

test/parallel/test-stream-pipeline-process.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ if (process.argv[2] === 'child') {
2121
`"${process.execPath}"`,
2222
`"${__filename}"`,
2323
'child'
24-
].join(' '), common.mustCall((err) => {
24+
].join(' '), common.mustCall((err, stdout) => {
2525
assert.ifError(err);
26+
assert.strictEqual(stdout, 'hello');
2627
}));
2728
}

0 commit comments

Comments
 (0)