Description
Is your feature request related to a problem? Please describe.
I am trying build a pipeline of streams in conjunction with promises/async code in the midst of the orchestration of the pipeline:
const pipeline = util.promisify(stream.pipeline);
async function queryToFile(querySql, path) {
// promise that resolves on first result from the DB or rejects on SQL error
const queryResults = await dbClient.query(querySql).promiseFirstResult();
const dbStream = queryResults.stream();
const formattingStream = new stream.Transform(...);
return pipeline(dbStream, formattingStream, fs.createWriteStream(path));
}
This should work without a problem.
However, one might decide to split this function into multiple ones:
async function queryStream(querySql) {
const queryResults = await dbClient.query(querySql).promiseFirstResult();
return queryResults.stream();
}
async function writeStream(inputStream) {
const formattingStream = new stream.Transform(...);
return pipeline(inputStream, formattingStream, fs.createWriteStream(path));
}
async function queryToFile(querySql, path) {
const dbStream = queryStream(querySql);
return writeStream(dbStream);
}
Now, though, the code isn't safe.
If I understand correctly, it could be that in between the time that the queryStream
function produces a stream and the time that the writeStream
function subscribes to said stream's events (via pipeline
), there might be events emitted to the stream, since these functions do not run synchronically and so they make room for other functions -- such as IO -- to run in between.
This is fine for incoming data, as the stream begins in paused mode, buffering data until turning to flowing mode when the stream is piped by pipeline
. In contrast, other events such as error
or close
are not buffered, and are emitted immediately as they are received, which means that the subscribing stream in writeStream
will not be aware of them, which is a problem.
Describe the solution you'd like
I would love for the default paused mode -- i.e, buffering events until first "subscriber" -- to apply for non data events (at least the builtin ones). Should it be the default behavior or not is a separate question, but having an ability to provide an option to the stream at construction could prove extremely useful.
As a first step though, it might be helpful to add this scenario to the documentation on streams.
Describe alternatives you've considered
I tried a workaround where before returning a queue from an async function, I added listeners to these events to buffer them, and then overridden each of the event-registration methods (on
, once
, etc) to push events to any new subscriber of these events. I encountered two problems:
- Using
pipeline
, which in turn usespipe
, makes it so that if the stream has event listeners prior to the function, it will not propagate these evens further on (I guess it assumes that there's already a handler in place). - The order of event registration by the
pipeline
/pipe
method is not necessarily (and is often not) the same as the order in which the events were emitted; for instance,error
is usually emitted beforeclose
, while the registration to these events is in the opposite order.