Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeFrom to allow writable streams to "pull" at their own pace #146

Open
domenic opened this issue Jul 17, 2014 · 12 comments
Open

pipeFrom to allow writable streams to "pull" at their own pace #146

domenic opened this issue Jul 17, 2014 · 12 comments

Comments

@domenic
Copy link
Member

domenic commented Jul 17, 2014

This spins out of the "Questions about the Fetch API" thread on the whatwg list, in particular @willchan's reply around here and the subsequent follow-ups. We had a video-chat conversation that helped clarify things and I want to capture them here.

Our current piping algorithm essentially says: whenever the source has data, and the dest is not exerting backpressure, read from the source and write to dest. @willchan calls this "push", because the dest does not really get to decide how it consumes from source. He explains that a "pull" model would be better, wherein you give source to the underlying sink implementation (probably via dest), which then grabs data out of it as it determines is necessary.

This is most important for high-performance binary streams which will allow reading of specific amounts of bytes (#111), because the writable stream implementer (e.g., the UA) could then use smart algorithms to figure out exactly what size chunks they want to try transferring, depending on e.g. how well the streaming has gone so far, what type of network they are on, and other such factors.

This functionality will not be useful for most writable streams: only those who know how to be smart about consuming the data. Object streams in particular are unlikely to want to use this.

The tentative idea I had for solving this was something like the following:

  • Introduce WritableStream.prototype.pipeFrom, and move all of the existing code in ReadableStream.prototype.pipeTo into that. This is the default pipe-from implementation. This is possible since the pipe code does not depend on any internals, just the public API, and in fact it could be a standalone function.
  • Have ReadableStream.prototype.pipeTo(dest) become essentially dest.pipeFrom(this). So pipeTo just becomes a convenience so that authors can write things in right-to-left order.
  • Allow writable streams that have advanced use cases to subclass WritableStream and override the pipeFrom method to include custom logic.

This actually solves a number of other problems:

It helps give a framework for streams to "recognize" each other, e.g. for off-main-thread-piping (#97) via things like splice: writable streams recognizing file descriptors can recognize that a readable stream representing a file descriptor is being piped to them, and then do splicing instead of the usual algorithm---but then fall back to super(...args) if they do not recognize the stream.

It also allows dest streams to apply any other stream-specific logic. For example, a stream representing a HTTP request body to be sent out could recognize a file descriptor stream being piped to it, get the file's length, and then set that on its content-length header. The popular request package in Node.js makes use of these sorts of tricks extensively.

These could be done the other way around, too, via overriden pipeTo, but it seems more of the dest's responsibility rather than the source to do this kind of recognition.

I am optimistic that this does not really add any complexity to the default case, while adding good flexibility for the fastest-possible implementations for high-performance binary streams.

@tyoshino
Copy link
Member

What I planned to do is:

  • expose a method say amountQueuable() to the underlying source of the ReadableStream to allow the source to synchronously probe how much data the source may enqueue to the stream
  • have pull() to be invoked every time the amount enqueuable to the [[queue]] is increased (possibly due to read() or update of highWaterMark) so that the underlying source is notified of that
  • expose a method say amountWritable() on the WritableStream using which we can know how much data it can accept (without entering the "waiting" state)
  • modify wait() or add some new method to allow users of the WritableStream to get notified when the amount writable to the queue (without entering the "waiting" state) is increased (possibly due to completion of write() on the underlying sink or update of highWaterMark)
  • expose ReadableStream's strategy's highWaterMark somehow to the user of the ReadableStream so that the user can tell how much data the user is expecting currently
  • expose WritableStream's strategy's highWaterMark somehow to its underlying sink so that the sink can adjust amount of data to accept into the queue (will affect the result of amountWritable())
  • modify pipeTo() to update the strategy on the ReadableStream source via the method above based on the result of amountWritable() call on the WritableStream dest

This way, only the amount of data needed/acceptable is communicated. We could add option to pipeTo to turn on/off this i.e. making it able to work as both push/pull. I'm still thinking how we could keep abstraction in good shape and simple streams still working without caring very precise flow control like this.

This approach allows a WritableStream to adjust what amount the peer ReadableStream should produce, but neither give precise control on the size of chunks of read data nor give control over the buffers to save generated data as proposed in #111.

This looks adding huge complexity, but even after revising, some part of above will be needed I think. Please see below.


I think your pipeFrom() idea implies addition of some methods on the ReadableStream so that we can tell it how much data is expected to be generated. Right? Without that, we're still only be able to tell the ReadableStream to generate something by invoking pull via wait() without specifying the amount. WritableStream's underlying sink is already be able to call wait() on the peer ReadableStream via current pipeTo by eating writeRecords to make the WritableStream return "writable" on state call. (BTW, the change aedbf18 I proposed changed pipeTo() to call wait() on the ReadableStream even when WritableStream is not yet "writable").

If so, we can also address this issue by adding "some methods" on the WritableStream side instead of ReadableStream and have (special) pipeTo() to look at the methods to control amount to pull from the underlying source. So, I'm wondering if pipeTo/pipeFrom really makes much difference.

@tyoshino
Copy link
Member

BTW, the change aedbf18 I proposed changed pipeTo()

Filed a separate issue #153 to remember to give an answer to this.

@domenic
Copy link
Member Author

domenic commented Jul 22, 2014

What I planned to do is:

Thanks for this. It is great to have it well thought out and written down. I'll try to process it all over the next couple of days of meetings :).

I think your pipeFrom() idea implies addition of some methods on the ReadableStream so that we can tell it how much data is expected to be generated. Right? Without that, we're still only be able to tell the ReadableStream to generate something by invoking pull via wait() without specifying the amount. WritableStream's underlying sink is already be able to call wait() on the peer ReadableStream via current pipeTo by eating writeRecords to make the WritableStream return "writable" on state call.

My current thinking (which may not be very good) is that we subsume all amount-specific negotiation into byte streams and their specific protocol. E.g. if we had ReadableByteStream.prototype.readInto(dest, offset, length) then WritableByteStream.prototype.pipeFrom(source) would notice that it was being connected to a ReadableByteStream (or more generally, anything with a readInto method) and would use that to request directly from the readable byte stream the appropriate number of bytes.

What do you think? I am not sure it is a good design. On the one hand, it isolates the complexity into only a few places, and confines it to bytes streams. But on the other hand it seems like a weakness of our model that we are doing that kind of byte stream specialization, and not giving that kind of amount-negotiation capability to other streams. In which case something more general involving the kind of stuff you propose would be good. Or perhaps some combination of the stuff you propose and the current "strategy idea": the strategy idea was designed to abstract away all the byte-related stuff in one small place, but if it ends up leaking elsewhere anyway, then perhaps that should be reconsidered.

If so, we can also address this issue by adding "some methods" on the WritableStream side instead of ReadableStream and have (special) pipeTo() to look at the methods to control amount to pull from the underlying source. So, I'm wondering if pipeTo/pipeFrom really makes much difference.

I agree it doesn't really make much of a difference. You can have specialized methods on either side. But somehow it seems more natural for the specialize the pulling process than to specialize the pushing process. I wish I had a better argument.

@willchan
Copy link

I haven't thought too much about how to achieve the suggested goals I laid out to @domenic that are important for high performance networking. Let me sketch out some ideas for what I would naively do:

  • Have a lower layer ReadableByteStream and WritableByteStream classes. These need to support byte stream and piping APIs that mirror POSIX read()/write()/splice(). They do not contain any affordances for buffering (internal queues) nor reading/writing objects.
  • Have higher layer ReadableStream and WritableStream classes with buffering affordances and APIs for controlling the amount of buffering and reading and writing objects.
  • It's not clear to me if these different stream classes should share a prototype hierarchy. I could imagine providing helper functions for constructing ReadableStream/WritableStream objects from ReadableByteStream/WritableByteStream objects, and implementing a default buffering strategy to serialize/deserialize objects to/from byte streams. I'm kinda liking not having them share the same public API, because when I talked to @domenic about the issues with things like partial write() calls where you provide a buffer of size n to write, the call may return with <=n bytes written, which makes for an ugly unified interface between byte streams and object streams. Instead, all the low level APIs would use byte streams, which consumers would wrap in the buffered object stream versions instead for ease of use. This mirrors the typical strategy for Java reader/writer objects. Alternatively, the APIs could return a subclass of ReadableStream/WritableStream that exposed a getter for the underlying ReadableByteStream/WritableByteStream.
  • I agree that pipeFrom() can also be used to implement pipeTo(). That sounds like a good idea for the convenience of writing right to left.
  • For the byte stream versions, I think pipeFrom()/pipeTo() should take a length/size parameter. This mirrors what splice() does. This allows for efficiently piping large protocol frames, such as a WebSocket frame where the frame header specifies the payload length. You'd need to read the frame header into a buffer first and parse out the payload length, and then afterward you could pipe the entire frame payload. Advanced implementations can replace the pipe implementation with a splice() system call for efficiency, when the implementation knows the concrete source and destination streams and they correspond to file descriptors. When the implementation doesn't know the concrete streams, an advanced implementation can't use splice() and must read from the ReadableByteStream into a buffer and write to the WritableByteStream. Size of buffer chosen will vary depending on implementation heuristics, trading off between memory (buffer size), CPU (fewer syscalls by using larger buffers), throughput (better throughput the larger buffers you use), interactivity/responsiveness (some OSes will not allow partial reads/writes, which means if the app tries to do overly large reads or writes, it will lose the ability to reprioritize some of the data).
  • For the object stream versions, pipeTo() should just shove data to the destination stream as fast as possible. Advanced implementations with knowledge of both the source and destination streams can similarly use splice() for efficiency.

@tyoshino
Copy link
Member

@willchan Many of your ideas are captured by the W3C streams spec I was co-authoring as it initially aimed to provide streaming interface only for bytes. I can update it to provide a prototype while incorporating ideas established here and using the same identifiers and API surface as much as possible.

@domenic
Copy link
Member Author

domenic commented Jul 23, 2014

@willchan

Have a lower layer ReadableByteStream and WritableByteStream classes. These need to support byte stream and piping APIs that mirror POSIX read()/write()/splice(). They do not contain any affordances for buffering (internal queues) nor reading/writing objects.

I think this is too low-level to be useful in a web-exposed way. Lack of queuing is too big of a footgun to expose to users. To me it is fundamental to what we mean by "stream" in JS. If we think these kind of things are valuable as a potential building block then they should be named something else.

Have higher layer ReadableStream and WritableStream classes with buffering affordances and APIs for controlling the amount of buffering and reading and writing objects.
It's not clear to me if these different stream classes should share a prototype hierarchy. I could imagine providing helper functions for constructing ReadableStream/WritableStream objects from ReadableByteStream/WritableByteStream objects, and implementing a default buffering strategy to serialize/deserialize objects to/from byte streams. I'm kinda liking not having them share the same public API, because when I talked to @domenic about the issues with things like partial write() calls where you provide a buffer of size n to write, the call may return with <=n bytes written, which makes for an ugly unified interface between byte streams and object streams. Instead, all the low level APIs would use byte streams, which consumers would wrap in the buffered object stream versions instead for ease of use. This mirrors the typical strategy for Java reader/writer objects. Alternatively, the APIs could return a subclass of ReadableStream/WritableStream that exposed a getter for the underlying ReadableByteStream/WritableByteStream.

To me the biggest concern is that if you get a byte stream it should interoperate with an object stream easily. E.g. you can write a transform from one to the other. Or if you have a generic stream-consuming mechanism, it should be able to consume a specialized byte stream, perhaps slightly slower or with less-controlled backpressure.

That is what is behind my current vision, wherein ReadableByteStream derives from ReadableStream. Then all of the ReadableStream methods have sane semantics when used by a consumer that expects a generic ReadableStream, but consumers who are willing to special-case for (or demand) a ReadableByteStream can use the more-specific functionality it exposes. One of those consumers would probably be WritableByteStream.

@tyoshino

Many of your ideas are captured by the W3C streams spec I was co-authoring as it initially aimed to provide streaming interface only for bytes. I can update it to provide a prototype while incorporating ideas established here and using the same identifiers and API surface as much as possible.

Would you be up for doing it as a prototype in this repo? Maybe reference-implementation/lib/experimental/* or similar? I agree some concrete prototypes would help.

@tyoshino
Copy link
Member

a prototype in this repo

will do.

@domenic
Copy link
Member Author

domenic commented Jul 23, 2014

a prototype in this repo

will do.

Great! Let me give you commit access so you can work more freely in that area. We should both do pull requests and/or work in branches w.r.t. non-experimental changes of course.

@willchan
Copy link

Have a lower layer ReadableByteStream and WritableByteStream classes. These need to support byte stream and piping APIs that mirror POSIX read()/write()/splice(). They do not contain any affordances for buffering (internal queues) nor reading/writing objects.

I think this is too low-level to be useful in a web-exposed way. Lack of queuing is too big of a footgun to expose to users. To me it is fundamental to what we mean by "stream" in JS. If we think these kind of things are valuable as a potential building block then they should be named something else.

I'm fairly agnostic to how they are exposed. I agree that it's a footgun that most users should not want to use. I'd be fine with them being renamed to something else if that's preferred. It begs the question though of whether or not these low-level APIs would be exposed in standard web APIs like fetch().

To me the biggest concern is that if you get a byte stream it should interoperate with an object stream easily. E.g. you can write a transform from one to the other. Or if you have a generic stream-consuming mechanism, it should be able to consume a specialized byte stream, perhaps slightly slower or with less-controlled backpressure.

I agree with this statement. The key part here is that you can transform from one to another. An object stream could wrap a byte stream and deserialize the bytes into objects and serialize the objects into bytes.

That is what is behind my current vision, wherein ReadableByteStream derives from ReadableStream. Then all of the ReadableStream methods have sane semantics when used by a consumer that expects a generic ReadableStream, but consumers who are willing to special-case for (or demand) a ReadableByteStream can use the more-specific functionality it exposes. One of those consumers would probably be WritableByteStream.

So being able to transform between an object stream and a byte stream doesn't necessarily mean they need to share the same interface. I'm OK with it if that's what folks want. There are some tradeoffs here, because sharing the same interface either means giving up some performance or complicating the API with a different I/O model from the traditional stream I/O model. I'm in particular thinking of partial read()/write() calls, returning a promise that indicates the partial number of bytes read/written. This model differs substantially from the object read/writes that use internal queueing.

@domenic
Copy link
Member Author

domenic commented Jul 23, 2014

An object stream could wrap a byte stream and deserialize the bytes into objects and serialize the objects into bytes.

This API is more awkward than I can like, as it means you are no longer able to write code that does not care which it gets. It would be sad if the story was "to interface with any user code, wrap your stream-returned-from-fetch in an object stream." Or worse, "check if the code you're passing it to special-cases for byte streams." It's better for the story to be "pass the stream-returned-from-fetch to the stream-consuming API," with the stream-consuming API able to be transparently upgraded in the future to specialize for byte streams.

But, I can see how it gives a clean separation.

. There are some tradeoffs here, because sharing the same interface either means giving up some performance or complicating the API with a different I/O model from the traditional stream I/O model.

Right. Giving up performance is not OK. (Things like forcing asynchrony might be acceptable, but certainly not buffer copying or suboptimal backpressure negotiation.) So the question is, how complicated do things get. I remain hopeful that you can layer on a small addition for the partial read/write calls, but it may turn out to be not-that-small. In which case a wrapper-esque approach (ReadableStream.fromPipe or something) might be necessary.

@willchan
Copy link

Tough API choices

Yeah, there's fundamentally a sucky choice to be made here. I can see both sides of the debate. I'm not as hopeful that you can layer on a small addition for the partial read/write calls, but we should flesh it out first to see. I look forward to seeing prototypes of the different options.

@domenic
Copy link
Member Author

domenic commented Oct 21, 2014

In an offline thread @acolwell mentioned it would be useful for a writable stream to know whether it was being piped to. We got a bit off-topic here, but the framework outlined in the OP could be used to accomplish this, although in a kind of strange way. (You would subclass WritableStream, and the subclass would override pipeFrom to delegate to super.pipeFrom but also record whether or not it was ongoing.)

His concrete use case is making MSE's SourceBuffer backed by a writable stream, and disabling higher-level operations on the SourceBuffer while it is being piped to. We should probably figure out a way to address that use case. It could be as simple as a piping property, but that doesn't feel very integrated into the design...

The issue essentially right now is a conflict between designing piping as a higher-level algorithm that just uses the public APIs to transfer data, versus wanting to make it more specialized in certain cases (super-important for performance). I need to think harder on that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

3 participants