-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a proof of concept of optimized pipe #511
base: main
Are you sure you want to change the base?
Conversation
This is a proof-of-concept code snippet for my answer to the optimized piping vision. Basically brain dump + a little validation of solidness. |
Very, very interesting. Could you explain the vision a bit more? I think I see the outlines, but getting a high-level view would be nice. I also think it would be ideal if we allowed JS strategies somehow. I guess they make things fairly observable though, as size() gets called all the time... that's tricky. I'm going to work today on a requirements-based spec for pipeTo. Let's make sure it synchronizes with your vision. |
OK. I forgot to note but this is intended to be used just for demonstrating that our API enables this kind of optimization. Not necessarily intended to be a part of the specification. PipeRequestFirst, an instance of the PipeRequest class corresponds to each pipeTo() invocation. It holds the instances and parameters given to the method:
It also has
Every time a new pipeTo() is invoked, a PipeRequest instance describing the pipeTo() invocation is created and:
The global pipe manager holds:
The global pipe manager chooses the best subset of triplets of:
The subset must cover all the active PipeRequests. E.g. skipped piping to Reorganizing pipesOngoing pipes can be stopped to be reorganized into longer or faster pipe. The duration (number of bytes to transfer) of a long piping is limited to the minimum of the requested number of bytes to transfer of all PipeRequests covered by the long piping. Candidate propagationThe propagation of pipe candidates may happen asynchronously. E.g. IdentityTransformStream does that. This is useful when an IdentityTransformStream already have some chunks enqueued in it. It may exert backpressure to the source readable stream of an existing pipe whose destination is the writable side of the IdentityTransformStream so that it temporarily stops write()-ing new chunks. Once all the chunks are drained, it can announce the piping candidates received at the readable side to the writable side to choose the faster one. I think this backpressure is not normal backpressure signal but should be made by announcing an empty pipeCandidates to completely stop the piping (readable to the writable of the IdentityTransformStream) and wait for the ongoing piping to stop. We need some mechanism to realize this. Without strict backpressure like this, it's possible the ongoing pipe write()s new chunks before seeing the updates pipe candidates including long piping and switch to it before the queued chunks are processed. |
Extended the API a bit to address the flushing issue I discussed in the last comment. |
Wow, perfect, thank you for the explanation! I understand this is not necessarily something for the spec, but I think it will be valuable to have written up somewhere that we can point people to as an example of how to use the spec's flexibility around piping to accomplish great things. It might be worth prototyping as well, if we have the time and energy; I'm not sure if that was your plan in this PR or if you were just using the .js code as a way to organize the thoughts. So let me try to restate the actors in the system:
With this in mind, the notifyPipeCandidates/waitPipeCandidatesChange mechanisms are all about coordinating to potentially update the set of pipes in action when the set of pipe requests in action change. This is what the reorganizing pipes section is about, I think? I'm not sure I fully understand the candidate propagation section, or why it implies a new backpressure mechanism. My vision of identity transform streams (which is not fully thought out, so might be wrong) is that in a system like this, they get completely excluded from the list of pipes, i.e. I guess you are concerned about cases like |
Yeah, initially I attempted to just implement the API into the ReadableStream, but it looked it's going to complicate the reference impl so much, so I wrote it in a separate file. I want to evolve the code snippet to something working for verifying our ideas. Yes.
Right
Ah, yeah. Asynchronous invocation of pipeTo()s and pending chunks. IdentityTransformStream's readable side and writable side themselves work as ReadableStream and WritableStream, and they can be separately passed around. Even pipeThrough() may be given some writable / readable pair which are build without considering the transform streams concept at all. I'd like to confirm that this understanding of mine is correct, first. Or, do you have some idea in your mind that we brand check that a given pair of writable / readable to pipeThrough is "transform stream"? So, the writable side of an identity transform may get write()-en manually even when it's exerting backpressure. Then, some chunks are queued inside the transform stream. Once pipeTo() is invoked on both the readable side and the writable side of the transform stream with pending chunks, we can consider optimizing this pair of pipeTo()-s. But we need to have the transform stream flush the chunks before establishing such skipped transfer. Hmm, we can also resolve this by having a requirement enforced on all pipeTo() implementations to stop writing when the normal backpressure signal is exerted as well as the current pipeTo() reference implementation is doing. Yeah, it may work. |
I haven't thought this through fully. But my idea was that yes, we would add an unobservable brand check and fast path if we know it's a transform stream created with the TransformStream constructor (or with
Got it. I understand the problem now.
The draft in #512 says "While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null, the user agent must not read from reader." I think this is what we want, although I guess it is talking about reading, not writing. Maybe it should say both. |
1 similar comment
I haven't thought this through fully. But my idea was that yes, we would add an unobservable brand check and fast path if we know it's a transform stream created with the TransformStream constructor (or with
Got it. I understand the problem now.
The draft in #512 says "While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null, the user agent must not read from reader." I think this is what we want, although I guess it is talking about reading, not writing. Maybe it should say both. |
I see. Anyway we need to perform some special interaction with the streams to realize the optimization. Even the propagation idea is requiring the streams to expose special signal / method. One good thing of leaving the decision to the streams and have the global manager to fetch some predefined signal from them as done in the propagation idea above is that each transform stream can control the timing to announce longer piping. E.g. one could consume some initial header to do something special, and then pass through data to the next streams.
Yeah! I'm not yet sure about necessity of different backpressure signals. Let's just try. |
Related to #359, #325, #97, #321, #461