-
Notifications
You must be signed in to change notification settings - Fork 29.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Readable.push calling _read #3203
Comments
I think you're mistaken about the way your readable stream is working. Because you're piping it into a writable stream ( For example, replacing |
Replacing '5' with 'n' will still have each push call do a _read call. If you pipe the output to a file you will see it will grow much faster than 16k every 2 seconds. I understand _read will keep getting called becuase i am doing a 'infinite' pipe. But the docs say _read will not be called until i call push. The docs also say i should keep calling push until it returns false. From this i expect _read should be called after push returned false. It doesn't make sense to start requesting more data by calling _read while at the same time indicating i should call push again by returning true from push. Not this is exactly how it works when not using async calls, but the non-direct push call somehow behaves differently... |
Could you point me to where it says this? I don't see this anywhere.
Yes, but this is with regards to the highWaterMark, which isn't really relevant to this use case. Is this an actual issue for you? If your stream needs to load data, you just shouldn't push anything when there isn't data available. |
Quoted from https://nodejs.org/api/stream.html#stream_readable_read_size_1
Yeah, i'm trying to wrap a source of data into a readable stream, a less abstract example: .prototype._read = function(n) {
this.myread();
}
.prototype.myread() {
while (/*source has data available*/ data = this.source.getdata())
if (!this.push(data))
return; /* highWaterMark is reached; Stream.Readable will call _read again when it needs more data*/
/* source ran out of data but Stream.Readable wants more (highWaterMark was not reached)
, attach an event to continue to push data when it arrives
*/
var mthis = this;
this.source.once('dataready', function() { mthis.myread(); });
} The problem happens when the dataready event triggers an async call to myread and thus push. push starts to behave differently and each async call to push calls _read which triggers more push calls and/or event callback registrations. In this case the only observable effect is everything gets slower and slower as more and more callbacks are registered and called. Also, what is wrong with the code in my first post? Why shouldn't it be a stream that produces 5 x-es every 2 seconds? |
/cc @nodejs/streams Is this a bug? Or a misunderstanding? Maybe a documentation update is in order? |
|
I'm having the same problem. When _read() is called, I fetch data using an async API. I then start pushing the results using push(). But as soon as I push the first object to the stream, the stream.on('data') gets called (consuming the object), which then causes _read() to get called again even though I'm still in the middle of pushing the previous results. This results in the API getting called again even though I still have data to push from the last call. |
The current behaviour guarantees the maximum performance and throughput. |
I'm sure my examples failed to communicate the problem properly, but defending buggy behavior with performance claims seems weird to me... It's fine to call _read when new data is actually needed. But to do this inside the push() call doesn't make sense as the push return value is already going to indicate whether more data is needed to the caller. In other words, if I'm calling push, and it's return value is indicating I should keep doing so, I don't need/want _read being called asking for even more data (I'm already push()-ing it as fast as I can) The thing is, that it behaves exactly like this the way I want as long as you call push directly from _read, but once you do so indirectly through an async callback it behaves completely different. (this is at the very least a documentation issue as this difference is never mentioned anywhere, but IMHO it just doesn't make any sense...) |
I am not "defending buggy behavior", I am just explaining why things are done in a certain way. Let me know if you need some more discussion on why it is faster this way. Even though I might even agree with you that we should change this behavior, there is a massive risk of breakage in the ecosystem. Anyway, I do no think this a road to pursue, but I am very happy in reviewing any proposed changes in streams. Improving the docs is certainly something we need! Feel free to send a PR for that! |
One thing that might help is if there was a way to push() a set of objects in a single call. Right now, after you push the first item (from an async function) there will be an immediate call to _read() which is what can cause issues. Instead, I was thinking there could be a pushObjects() API that would let you push in all of the new objects at once. This call would also generate a call to _read(), but only after the call to pushObjects() is done. I think this would avoid the race condition and allow for simpler code. The alternative I ended up having to build was to keep my own buffer of objects. After the async call adds more objects to this array, it then shift()s the first one, push()es it, and then returns. After this, _read() will be called, and it push()es the rest of the objects synchronously from the buffer. This worked fine, but was somewhat complicated. It also requires an additional buffer of objects, which seems silly since the point of the stream is to hold the buffer. I think this change might be possible in a backwards-compatible way. |
I found a good solution for this problem, which is to pause the stream while pushing in a set of objects that come from an async API call. Here is the code that I ended up with:
|
As I mentioned above, it would be great if the Readable class included a
Much simpler and less error-prone. |
@mcollina ^^^ |
@mself I think streams behave just fine, but probably are not the right tool for your usecase: check out https://www.npmjs.com/package/pull-stream. Data in streams is supposed to wait until it's ready to be processed. As what it seems you want, it's @Trott @nodejs/streams I think we can close this. Since this was open, a good bunch of docs has been added to explain how streams works. And there is nodejs/nodejs.org#1109 which might clarify this further. @mself feel free to review that document, and maybe suggest some ways we can improve the current docs. cc @jessicaquynh @lrlna |
Closing. Feel free to re-open or comment asking that this be re-opened if I'm in error to do so. Thanks. |
@mcollina , I have been wondering about this issue lately. To my mind, the way readable streams are implemented right now leads to two sets of best practices (one for sync implentations of _read and one for async implentations) and I would like to understand why read streams are like this better. The current issue with this.push() triggering calls to _read() leads to multiple calls working concurrently, and makes it precarious to manage state if your stream needs it. I have not found a safe way to use asynchronous implementations of _read() where i can follow what the doc say to do "push until it returns false" since i will set about many concurrent reads in action. So it seems there are two best practices: But this is not clear in the docs, and worse it seems to tell you to keep pushing as long as you have data, Since suppose the simple case is i am reading for an api that sends me a array of items, I cannot push each item though with a push since it will cause a request to that api for every push. I may be rambling. I want to hear your thoughts. ps. My solution would be to include a cb argument to _read(), which would signal that the _read is done and the next is ready to be called. similar to a transform stream, where you can use this.push as many time as you want before invoking the callback. Thanks. |
I like the idea of the |
@mself noms is a package that does what you want I assume. @davidmdm a callback to read is something that's been discussed in the past, the main issue was performance regressions for some of the core pieces of node and making an already complex api mora complex. Another thing you can do when doing async stuff is something like MyStream.prototype._read = function() {
const self = this;
if (this._inProgress) {
return;
}
this._inProgress = true;
// Get a set of objects from an API.
this.fetchObjects(function(err, objects) {
// Push all of the objects into the stream.
for (let object of objects) {
this.push(object);
}
this._inProgress = false;
});
}; I agree it's kinda confusing but we're probably stuck with it for legacy reasons, if the documentation sucks, pulls improving the documentation are always appreciated. 😉 |
@mself To be fair the way you did it was fine too. I know it will work, it's just since the new streams implementation with pipe(), I don't like calling pause and resume since pipe() calls resume on drains and such. I am scared of that 1 in a million chance they coincide. Does anybody know why this.push() immediately invokes a new _read in async and not synchronous reads?
} will start a new read as soon as this.push is called. Before this.i += 1 even has a chance to execute.
in this case the while will keep control until the condition breaks... @calvinmetcalf Thanks for your solution. Unfortunately I think it doesn't work since all of the reads triggered by your pushes would be swallowed in the return statement, and no further _read would be called hence killing your stream... unless you called it yourself,
which is recommended not to do. Or you could pop() the last item and push it after setting in progress to false, but to me it just seems we are hacking around the problem, and the problem is either no real support for asynchronous reads, or a total misunderstanding of how to implement them. However wouldn't it be possible to add the cb without breaking changes if it was introduced as an option? I think people don't know of this difference and they get mixed results. it just leads to them thinking they don't understand how to use streams regardless of the simplicity or complexity of the api. |
@davidmdm woops full code should be MyStream.prototype._read = function() {
const self = this;
if (this._inProgress) {
return;
}
this._inProgress = true;
// Get a set of objects from an API.
this.fetchObjects(function(err, objects) {
// Push all of the objects into the stream.
var moar;
for (let object of objects) {
moar = self.push(object);
}
self._inProgress = false;
if(moar){
self._read();
}
});
}; |
According to #4878 |
@psxcode there is no need to limit to only once |
@mcollina, you are right
|
@psxcode @mcollina Suppose your source is an api that returns json arrays: [{...}, {...}, ...] The only solutions I have found are either the solution proposed by @calvinmetcalf or to push the array and have a transform deconstruct it into its seperate object, or just have each part of your stream process the array of data. Otherwise it is NOT safe to push multiple times in an asynchronous _read operation. |
Reopening, as this is indeed fixable. See #17979 |
@mcollina |
@davidmdm you should follow https://github.com/nodejs/code-and-learn#getting-started or attend a code-and-learn. Also, read https://github.com/nodejs/node/blob/master/CONTRIBUTING.md. |
Was this applied to the 8.x release also? I'm testing with async push to the readable stream and I'm not noticing anything very strange (node 8.11.3). Also, is it a problem to push before _read is called for the first time? |
This was applied to Node 10. The problem is indeed there, it’s just hard to stumble upon it. |
In the documentation for stream.Readable, 'API for Stream Implementors', the documentation states that push should continue to be called until it returns false. However when calling push from an async callback each push call will call _read, leading to unexpected behaviour.
Example:
I expect this to output up to 5 'x' characters every 2 seconds. However, because each push call also does a _read call, after 2 seconds 5 callbacks will be registered and after 2 more seconds 25 'x' characters will be output.
I think push should not call _read like this, or at least return false after it has done so.
The text was updated successfully, but these errors were encountered: