Skip to content

stream.pipeline does not wait for the last stream to flush before calling the final callback #34274

Closed
@aravindanve

Description

@aravindanve
  • Version: v12.8.0
  • Platform: macOS 10.15.4 (Catalina)
  • Subsystem: stream

What steps will reproduce the bug?

// reproduce.js

const util = require('util');
const stream = require('stream');

const call = async (fn, ...args) => fn(...args);

const map = (fn) => {
  const tx = new stream.Transform({ objectMode: true });

  tx._transform = (chunk, enc, cb) =>
    call(fn, chunk).then(
      (modified) => cb(null, modified),
      (error) => cb(error),
    );

  return tx;
};

const tap = (fn) => {
  const tx = new stream.Transform({ objectMode: true });

  tx._transform = (chunk, enc, cb) =>
    call(fn, chunk).then(
      () => cb(null, chunk),
      (error) => cb(error),
    );

  return tx;
};

const fork = (...t) => {
  let done;
  let doneError;
  let flush;

  const tx = new stream.Transform({ objectMode: true });
  const pt = new stream.PassThrough({ objectMode: true });

  stream.pipeline(pt, ...t, (error) => {
    done = true;
    doneError = error;
    flush && flush(doneError);
  });

  tx._flush = (cb) => {
    pt.push(null);
    flush = cb;
    done && flush(doneError);
  };

  tx._transform = (chunk, enc, cb) => {
    pt.push(chunk, enc);
    cb(null, chunk);
  };

  return tx;
};

const readableStream = new stream.PassThrough({ objectMode: true });
const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    readableStream,
    fork(
      tap(() => console.log('fork 1: do something with obj for 2s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 2000))),
      tap(() => console.log('fork 1 done!')),
    ),
    fork(
      tap(() => console.log('fork 2: do something with obj for 4s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 4000))),
      tap(() => console.log('fork 2 done!')),
    ),
    fork(
      tap(() => console.log('fork 3: do something with obj for 6s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 6000))),
      tap(() => console.log('fork 3 done!')),
    ),
    // new stream.PassThrough({ objectMode: true }),
    // ^___ adding an extra stream in the pipeline seems to fix the problem
  );
  console.log('done!');
}

run().catch(console.error);

readableStream.push({ name: 'test' });
readableStream.push(null);

How often does it reproduce? Is there a required condition?

always

What is the expected behavior?

Console output should look like:

fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
fork 3 done!
done!

What do you see instead?

Console output actually looks like:

fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
done!
fork 3 done!

Additional information

As noted in the code above, adding an extra stream at the end seems to mitigate the problem for now.

await pipeline(
    ...
    new stream.PassThrough({ objectMode: true }),
);

console.log('done!);

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions