Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readable.push calling _read #3203

Closed
MrHacky opened this issue Oct 6, 2015 · 30 comments
Closed

Readable.push calling _read #3203

MrHacky opened this issue Oct 6, 2015 · 30 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@MrHacky
Copy link

MrHacky commented Oct 6, 2015

In the documentation for stream.Readable, 'API for Stream Implementors', the documentation states that push should continue to be called until it returns false. However when calling push from an async callback each push call will call _read, leading to unexpected behaviour.

Example:

var stream = require('stream');
var util = require('util');

util.inherits(TestStream, stream.Readable);
function TestStream(options) {
    stream.Readable.call(this, options);
}

TestStream.prototype._read = function(n) {
    var mthis = this;
    setTimeout(function() {
        for (var i = 0; i < 5; ++i)
            if (!mthis.push('x'))
                break;
    }, 2000);
};

var s = new TestStream();
s.pipe(process.stdout);

I expect this to output up to 5 'x' characters every 2 seconds. However, because each push call also does a _read call, after 2 seconds 5 callbacks will be registered and after 2 more seconds 25 'x' characters will be output.

I think push should not call _read like this, or at least return false after it has done so.

@ChALkeR ChALkeR added the stream Issues and PRs related to the stream subsystem. label Oct 6, 2015
@MrHacky MrHacky changed the title Readable.push calling _doread Readable.push calling _read Oct 6, 2015
@brendanashworth
Copy link
Contributor

I think you're mistaken about the way your readable stream is working. Because you're piping it into a writable stream (process.stdout), it will suck all the data it can out of your stream as long as the highWaterMark of your writable stream is not hit. This means that it will call ._read repeatedly - not because .push('x') triggers more reads.

For example, replacing 5 with n will read out the requested amount of data, 16384 bytes.

@MrHacky
Copy link
Author

MrHacky commented Oct 13, 2015

Replacing '5' with 'n' will still have each push call do a _read call. If you pipe the output to a file you will see it will grow much faster than 16k every 2 seconds.

I understand _read will keep getting called becuase i am doing a 'infinite' pipe. But the docs say _read will not be called until i call push. The docs also say i should keep calling push until it returns false. From this i expect _read should be called after push returned false. It doesn't make sense to start requesting more data by calling _read while at the same time indicating i should call push again by returning true from push.

Not this is exactly how it works when not using async calls, but the non-direct push call somehow behaves differently...

@brendanashworth
Copy link
Contributor

the docs say _read will not be called until i call push

Could you point me to where it says this? I don't see this anywhere.

The docs also say i should keep calling push until it returns false.

Yes, but this is with regards to the highWaterMark, which isn't really relevant to this use case.

Is this an actual issue for you? If your stream needs to load data, you just shouldn't push anything when there isn't data available.

@MrHacky
Copy link
Author

MrHacky commented Oct 20, 2015

Could you point me to where it says this? I don't see this anywhere.

Quoted from https://nodejs.org/api/stream.html#stream_readable_read_size_1
Note: once the _read() method is called, it will not be called again until the push method is called.

Is this an actual issue for you? If your stream needs to load data, you just shouldn't push anything when there isn't data available.

Yeah, i'm trying to wrap a source of data into a readable stream, a less abstract example:

.prototype._read = function(n) {
    this.myread();
}
.prototype.myread() {
    while (/*source has data available*/ data = this.source.getdata())
        if (!this.push(data))
            return; /* highWaterMark is reached; Stream.Readable will call _read again when it needs more data*/

    /* source ran out of data but Stream.Readable wants more (highWaterMark was not reached) 
       , attach an event to continue to push data when it arrives
    */
    var mthis = this;
    this.source.once('dataready', function() { mthis.myread(); });
}

The problem happens when the dataready event triggers an async call to myread and thus push. push starts to behave differently and each async call to push calls _read which triggers more push calls and/or event callback registrations. In this case the only observable effect is everything gets slower and slower as more and more callbacks are registered and called.

Also, what is wrong with the code in my first post? Why shouldn't it be a stream that produces 5 x-es every 2 seconds?

@Trott
Copy link
Member

Trott commented May 26, 2016

/cc @nodejs/streams Is this a bug? Or a misunderstanding? Maybe a documentation update is in order?

@chrisdickinson
Copy link
Contributor

chrisdickinson commented May 26, 2016

.push can trigger ._read (via maybeReadMore and here) in order to fill the readable stream's buffer to hwm, which might be what's going on here.

@mself
Copy link

mself commented Feb 25, 2017

I'm having the same problem. When _read() is called, I fetch data using an async API. I then start pushing the results using push(). But as soon as I push the first object to the stream, the stream.on('data') gets called (consuming the object), which then causes _read() to get called again even though I'm still in the middle of pushing the previous results. This results in the API getting called again even though I still have data to push from the last call.

@mcollina
Copy link
Member

The current behaviour guarantees the maximum performance and throughput.
The reason Readable behaves in this way it is to start all the I/O necessary to fill the hwm before any other processing. Changing this behaviour will slow things down.

@MrHacky
Copy link
Author

MrHacky commented Feb 25, 2017

I'm sure my examples failed to communicate the problem properly, but defending buggy behavior with performance claims seems weird to me...

It's fine to call _read when new data is actually needed. But to do this inside the push() call doesn't make sense as the push return value is already going to indicate whether more data is needed to the caller.

In other words, if I'm calling push, and it's return value is indicating I should keep doing so, I don't need/want _read being called asking for even more data (I'm already push()-ing it as fast as I can)

The thing is, that it behaves exactly like this the way I want as long as you call push directly from _read, but once you do so indirectly through an async callback it behaves completely different. (this is at the very least a documentation issue as this difference is never mentioned anywhere, but IMHO it just doesn't make any sense...)

@mcollina
Copy link
Member

I am not "defending buggy behavior", I am just explaining why things are done in a certain way. Let me know if you need some more discussion on why it is faster this way.

Even though I might even agree with you that we should change this behavior, there is a massive risk of breakage in the ecosystem. Anyway, I do no think this a road to pursue, but I am very happy in reviewing any proposed changes in streams.

Improving the docs is certainly something we need! Feel free to send a PR for that!

@mself
Copy link

mself commented Feb 27, 2017

One thing that might help is if there was a way to push() a set of objects in a single call. Right now, after you push the first item (from an async function) there will be an immediate call to _read() which is what can cause issues.

Instead, I was thinking there could be a pushObjects() API that would let you push in all of the new objects at once. This call would also generate a call to _read(), but only after the call to pushObjects() is done. I think this would avoid the race condition and allow for simpler code.

The alternative I ended up having to build was to keep my own buffer of objects. After the async call adds more objects to this array, it then shift()s the first one, push()es it, and then returns. After this, _read() will be called, and it push()es the rest of the objects synchronously from the buffer.

This worked fine, but was somewhat complicated. It also requires an additional buffer of objects, which seems silly since the point of the stream is to hold the buffer.

I think this change might be possible in a backwards-compatible way.

@mself
Copy link

mself commented Mar 9, 2017

I found a good solution for this problem, which is to pause the stream while pushing in a set of objects that come from an async API call. Here is the code that I ended up with:

MyStream.prototype._read = function() {
	const self = this;

	// Get a set of objects from an API.
	this.fetchObjects(function(err, objects) {
		// Pause the stream to avoid race conditions while pushing in the new objects.
		// Without this, _read() would be called again from inside each push(),
		// resulting in multiple parallel calls to fetchObjects().
		const wasPaused = self.isPaused();
		self.pause();

		// Push all of the objects into the stream.
		objects.forEach(function(object) {
			self.push(object);
		});

		if (!wasPaused) {
			// This will deliver the objects and trigger the next call to _read() once they have been consumed.
			self.resume();
		}
	});
};

@mself
Copy link

mself commented Mar 9, 2017

As I mentioned above, it would be great if the Readable class included a pushObjects() method that essentially encapsulated this. It wouldn't necessarily really need to pause the stream, but it would need to ensure that no calls to _read() are made until all of the objects have been pushed into the stream's buffer. Then the code would be:

MyStream.prototype._read = function() {
	const self = this;

	// Get a set of objects from an API.
	this.fetchObjects(function(err, objects) {
		// Push all of the objects into the stream.
		self.pushObjects(objects);
	});
};

Much simpler and less error-prone.

@mself
Copy link

mself commented Mar 9, 2017

@mcollina ^^^

@mcollina
Copy link
Member

@mself I think streams behave just fine, but probably are not the right tool for your usecase: check out https://www.npmjs.com/package/pull-stream. Data in streams is supposed to wait until it's ready to be processed. _read  can (and should) be called as many times the machinery needs to guarantee best speed and throughput: it happens that you want less throughput than the minimum. At minimum, Readable aims to buffer 1 object if objectMode: true, highWaterMark: 1.
Buffering ensures that there is a chunk ready whenever the destinations needs it.

As what it seems you want, it's cork() for Readable. A PR might be cool, but I am afraid it might result too complex to be implemented without a performance hit. pushObjects will require the same effort, and we do not want to increase the API surface of streams.

@Trott @nodejs/streams I think we can close this. Since this was open, a good bunch of docs has been added to explain how streams works. And there is nodejs/nodejs.org#1109 which might clarify this further.

@mself feel free to review that document, and maybe suggest some ways we can improve the current docs.

cc @jessicaquynh @lrlna

@Trott Trott closed this as completed Mar 10, 2017
@Trott
Copy link
Member

Trott commented Mar 10, 2017

Closing. Feel free to re-open or comment asking that this be re-opened if I'm in error to do so. Thanks.

@davidmdm
Copy link

davidmdm commented Nov 9, 2017

@mcollina , I have been wondering about this issue lately. To my mind, the way readable streams are implemented right now leads to two sets of best practices (one for sync implentations of _read and one for async implentations) and I would like to understand why read streams are like this better.

The current issue with this.push() triggering calls to _read() leads to multiple calls working concurrently, and makes it precarious to manage state if your stream needs it.

I have not found a safe way to use asynchronous implementations of _read() where i can follow what the doc say to do "push until it returns false" since i will set about many concurrent reads in action.

So it seems there are two best practices:
if _read is synchronous push until it returns false, and await a drain event or something similar to resume you.
if _read is asynchronous push only once, and as the last operation of your _read method, in order to have state handled before setting about a new _read(). Or abstract state away inside of a closure like a generator function.

But this is not clear in the docs, and worse it seems to tell you to keep pushing as long as you have data,
but that _read will be called if you push. Which causes all kind of confusion once you start using async functions.

Since suppose the simple case is i am reading for an api that sends me a array of items, I cannot push each item though with a push since it will cause a request to that api for every push.

I may be rambling. I want to hear your thoughts.

ps. My solution would be to include a cb argument to _read(), which would signal that the _read is done and the next is ready to be called. similar to a transform stream, where you can use this.push as many time as you want before invoking the callback.

Thanks.

@mself
Copy link

mself commented Nov 9, 2017

I like the idea of the cb argument to _read(). That seems cleaner than the pause()/resume() workaround that I ended up using (so that I could push multiple times without concurrent calls to _read() before I was ready). But wouldn't that be a breaking change?

@calvinmetcalf
Copy link
Contributor

@mself noms is a package that does what you want I assume.

@davidmdm a callback to read is something that's been discussed in the past, the main issue was performance regressions for some of the core pieces of node and making an already complex api mora complex.

Another thing you can do when doing async stuff is something like

MyStream.prototype._read = function() {
	const self = this;
       if (this._inProgress) {
          return;
        }
        this._inProgress = true;
	// Get a set of objects from an API.
	this.fetchObjects(function(err, objects) {
		// Push all of the objects into the stream.

		for (let object of objects) {
                   this.push(object);
                 }
                 this._inProgress = false;
	});
};

I agree it's kinda confusing but we're probably stuck with it for legacy reasons, if the documentation sucks, pulls improving the documentation are always appreciated. 😉

@davidmdm
Copy link

davidmdm commented Nov 9, 2017

@mself To be fair the way you did it was fine too. I know it will work, it's just since the new streams implementation with pipe(), I don't like calling pause and resume since pipe() calls resume on drains and such. I am scared of that 1 in a million chance they coincide.

Does anybody know why this.push() immediately invokes a new _read in async and not synchronous reads?

_read() {
   Promise.resolve()
       .then(() => {
        let ok = true;
        while(ok) {
            ok = this.push(this.i);
            this.i += 1;
        }
 });

}

will start a new read as soon as this.push is called. Before this.i += 1 even has a chance to execute.
Meanwhile,

_read() {
    let ok = true;
    while(ok) {
        ok = this.push(this.i);
        this.i += 1;
    }
}

in this case the while will keep control until the condition breaks...

@calvinmetcalf Thanks for your solution. Unfortunately I think it doesn't work since all of the reads triggered by your pushes would be swallowed in the return statement, and no further _read would be called hence killing your stream... unless you called it yourself,

this._inProgress  = false;
_read();

which is recommended not to do. Or you could pop() the last item and push it after setting in progress to false, but to me it just seems we are hacking around the problem, and the problem is either no real support for asynchronous reads, or a total misunderstanding of how to implement them.

However wouldn't it be possible to add the cb without breaking changes if it was introduced as an option? I think people don't know of this difference and they get mixed results. it just leads to them thinking they don't understand how to use streams regardless of the simplicity or complexity of the api.

@calvinmetcalf
Copy link
Contributor

@davidmdm woops full code should be

MyStream.prototype._read = function() {
	const self = this;
       if (this._inProgress) {
          return;
        }
        this._inProgress = true;
	// Get a set of objects from an API.
	this.fetchObjects(function(err, objects) {
		// Push all of the objects into the stream.
                var moar;
		for (let object of objects) {
                   moar = self.push(object);
                 }
                 self._inProgress = false;
                if(moar){
                  self._read();
               }
	});
};

@psxcode
Copy link

psxcode commented Jan 2, 2018

According to #4878
this.push is a callback for _read
So you should invoke push once per _read

@mcollina
Copy link
Member

mcollina commented Jan 2, 2018

@psxcode there is no need to limit to only once push().

@psxcode
Copy link

psxcode commented Jan 3, 2018

@mcollina, you are right
While reading documentation very carefully, I found answer...

When readable._read() is called, if data is available from the resource, the implementation should begin pushing that data into the read queue using the this.push(dataChunk) method. _read() should continue reading from the resource and pushing data until readable.push() returns false. Only when _read() is called again after it has stopped should it resume pushing additional data onto the queue.

Note: Once the readable._read() method has been called, it will not be called again until the readable.push() method is called.

@davidmdm
Copy link

davidmdm commented Jan 3, 2018

@psxcode @mcollina
This is the whole contention of this thread.
When running synchronously you can push as many times as you want.
As soon as the this.push function is called within a promise or an async context, it will trigger other other calls to _read.
This behaviour causes all sorts of mayhem.

Suppose your source is an api that returns json arrays: [{...}, {...}, ...]
And you want to process each item, well you cannot push for each object in the array, or you will trigger that amount of concurrent reads to your api, and god knows what could happen to your internal state supposing you extended Readable and built your own custom stream.

The only solutions I have found are either the solution proposed by @calvinmetcalf or to push the array and have a transform deconstruct it into its seperate object, or just have each part of your stream process the array of data.

Otherwise it is NOT safe to push multiple times in an asynchronous _read operation.

mcollina added a commit to mcollina/node that referenced this issue Jan 5, 2018
@mcollina
Copy link
Member

mcollina commented Jan 9, 2018

Reopening, as this is indeed fixable. See #17979

@mcollina mcollina reopened this Jan 9, 2018
@davidmdm
Copy link

davidmdm commented Jan 9, 2018

@mcollina
I read through your PR. I really like it.
Also do you have resources for becoming a nodejs contributor? I would of loved to be able to help and not only complain about the issue.
That being said good job. I am very happy this issue may be solved.

@mcollina
Copy link
Member

@offero
Copy link

offero commented Nov 24, 2018

Was this applied to the 8.x release also? I'm testing with async push to the readable stream and I'm not noticing anything very strange (node 8.11.3).

Also, is it a problem to push before _read is called for the first time?

@mcollina
Copy link
Member

This was applied to Node 10. The problem is indeed there, it’s just hard to stumble upon it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests