Skip to content

Make streams default pause mode apply for non data events (such as 'error') #39722

Closed
@eyalroth

Description

@eyalroth

Is your feature request related to a problem? Please describe.

I am trying build a pipeline of streams in conjunction with promises/async code in the midst of the orchestration of the pipeline:

const pipeline = util.promisify(stream.pipeline);

async function queryToFile(querySql, path) {
  // promise that resolves on first result from the DB or rejects on SQL error
  const queryResults = await dbClient.query(querySql).promiseFirstResult();
  const dbStream = queryResults.stream();

  const formattingStream = new stream.Transform(...);
  return pipeline(dbStream, formattingStream,  fs.createWriteStream(path));
}

This should work without a problem.

However, one might decide to split this function into multiple ones:

async function queryStream(querySql) {
  const queryResults = await dbClient.query(querySql).promiseFirstResult();
  return queryResults.stream();
}

async function writeStream(inputStream) {
  const formattingStream = new stream.Transform(...);
  return pipeline(inputStream, formattingStream,  fs.createWriteStream(path));
}

async function queryToFile(querySql, path) {
  const dbStream = queryStream(querySql);
  return writeStream(dbStream);
}

Now, though, the code isn't safe.

If I understand correctly, it could be that in between the time that the queryStream function produces a stream and the time that the writeStream function subscribes to said stream's events (via pipeline), there might be events emitted to the stream, since these functions do not run synchronically and so they make room for other functions -- such as IO -- to run in between.

This is fine for incoming data, as the stream begins in paused mode, buffering data until turning to flowing mode when the stream is piped by pipeline. In contrast, other events such as error or close are not buffered, and are emitted immediately as they are received, which means that the subscribing stream in writeStream will not be aware of them, which is a problem.

Describe the solution you'd like

I would love for the default paused mode -- i.e, buffering events until first "subscriber" -- to apply for non data events (at least the builtin ones). Should it be the default behavior or not is a separate question, but having an ability to provide an option to the stream at construction could prove extremely useful.

As a first step though, it might be helpful to add this scenario to the documentation on streams.

Describe alternatives you've considered

I tried a workaround where before returning a queue from an async function, I added listeners to these events to buffer them, and then overridden each of the event-registration methods (on, once, etc) to push events to any new subscriber of these events. I encountered two problems:

  1. Using pipeline, which in turn uses pipe, makes it so that if the stream has event listeners prior to the function, it will not propagate these evens further on (I guess it assumes that there's already a handler in place).
  2. The order of event registration by the pipeline/pipe method is not necessarily (and is often not) the same as the order in which the events were emitted; for instance, error is usually emitted before close, while the registration to these events is in the opposite order.

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions