Closed
Description
- Version: v12.8.0
- Platform: macOS 10.15.4 (Catalina)
- Subsystem: stream
What steps will reproduce the bug?
// reproduce.js
const util = require('util');
const stream = require('stream');
const call = async (fn, ...args) => fn(...args);
const map = (fn) => {
const tx = new stream.Transform({ objectMode: true });
tx._transform = (chunk, enc, cb) =>
call(fn, chunk).then(
(modified) => cb(null, modified),
(error) => cb(error),
);
return tx;
};
const tap = (fn) => {
const tx = new stream.Transform({ objectMode: true });
tx._transform = (chunk, enc, cb) =>
call(fn, chunk).then(
() => cb(null, chunk),
(error) => cb(error),
);
return tx;
};
const fork = (...t) => {
let done;
let doneError;
let flush;
const tx = new stream.Transform({ objectMode: true });
const pt = new stream.PassThrough({ objectMode: true });
stream.pipeline(pt, ...t, (error) => {
done = true;
doneError = error;
flush && flush(doneError);
});
tx._flush = (cb) => {
pt.push(null);
flush = cb;
done && flush(doneError);
};
tx._transform = (chunk, enc, cb) => {
pt.push(chunk, enc);
cb(null, chunk);
};
return tx;
};
const readableStream = new stream.PassThrough({ objectMode: true });
const pipeline = util.promisify(stream.pipeline);
async function run() {
await pipeline(
readableStream,
fork(
tap(() => console.log('fork 1: do something with obj for 2s')),
map((obj) => new Promise((done) => setTimeout(() => done(obj), 2000))),
tap(() => console.log('fork 1 done!')),
),
fork(
tap(() => console.log('fork 2: do something with obj for 4s')),
map((obj) => new Promise((done) => setTimeout(() => done(obj), 4000))),
tap(() => console.log('fork 2 done!')),
),
fork(
tap(() => console.log('fork 3: do something with obj for 6s')),
map((obj) => new Promise((done) => setTimeout(() => done(obj), 6000))),
tap(() => console.log('fork 3 done!')),
),
// new stream.PassThrough({ objectMode: true }),
// ^___ adding an extra stream in the pipeline seems to fix the problem
);
console.log('done!');
}
run().catch(console.error);
readableStream.push({ name: 'test' });
readableStream.push(null);
How often does it reproduce? Is there a required condition?
always
What is the expected behavior?
Console output should look like:
fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
fork 3 done!
done!
What do you see instead?
Console output actually looks like:
fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
done!
fork 3 done!
Additional information
As noted in the code above, adding an extra stream at the end seems to mitigate the problem for now.
await pipeline(
...
new stream.PassThrough({ objectMode: true }),
);
console.log('done!);