Skip to content

Commit e9e4444

Browse files
committed
fixup: unify
1 parent 3444617 commit e9e4444

File tree

2 files changed

+25
-35
lines changed

2 files changed

+25
-35
lines changed

lib/internal/streams/pipeline.js

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -146,43 +146,37 @@ async function* fromReadable(val) {
146146
yield* createReadableStreamAsyncIterator(val);
147147
}
148148

149-
function pumpStdio(iterable, writable, finish) {
150-
let iterator;
151-
if (iterable[SymbolAsyncIterator])
152-
iterator = iterable[SymbolAsyncIterator]();
153-
else if (iterable[SymbolIterator])
154-
iterator = iterable[SymbolIterator]();
155-
156-
async function _next(err) {
157-
if (err) {
158-
return finish(err);
159-
}
160-
try {
161-
const { value, done } = await iterator.next();
162-
if (done) {
163-
return finish();
164-
}
165-
writable.write(value, _next);
166-
} catch (err) {
167-
finish(err);
168-
}
169-
}
170-
_next();
171-
}
172-
173149
async function pump(iterable, writable, finish) {
174150
if (!EE) {
175151
EE = require('events');
176152
}
177153
let error;
154+
writable.on('error', (err) => {
155+
error = err;
156+
});
178157
try {
179-
for await (const chunk of iterable) {
180-
if (!writable.write(chunk)) {
181-
if (writable.destroyed) return;
182-
await EE.once(writable, 'drain');
158+
let prev;
159+
for await (const next of iterable) {
160+
if (prev != null) {
161+
if (!writable.write(prev)) {
162+
if (writable.destroyed) return;
163+
await EE.once(writable, 'drain');
164+
}
183165
}
166+
prev = next;
167+
}
168+
169+
if (prev != null) {
170+
await new Promise((resolve, reject) => {
171+
writable.write(prev, (err) => {
172+
err ? reject(err) : resolve();
173+
});
174+
});
175+
}
176+
177+
if (!isStdio(writable)) {
178+
writable.end();
184179
}
185-
writable.end();
186180
} catch (err) {
187181
error = err;
188182
} finally {
@@ -298,11 +292,7 @@ function pipeline(...streams) {
298292
ret = makeAsyncIterable(ret);
299293

300294
finishCount++;
301-
if (isStdio(stream)) {
302-
pumpStdio(ret, stream, finish);
303-
} else {
304-
pump(ret, stream, finish);
305-
}
295+
pump(ret, stream, finish);
306296
}
307297
ret = stream;
308298
} else {

test/parallel/test-stream-pipeline-process.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ if (process.argv[2] === 'child') {
2323
'child'
2424
].join(' '), common.mustCall((err, stdout) => {
2525
assert.ifError(err);
26-
assert.strictEqual(stdout, 'hello');
26+
assert.strictEqual(stdout, 'hello\n');
2727
}));
2828
}

0 commit comments

Comments
 (0)