Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a proof of concept of optimized pipe #511

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Add a proof of concept of optimized pipe #511

wants to merge 2 commits into from

Conversation

tyoshino
Copy link
Member

Related to #359, #325, #97, #321, #461

@tyoshino
Copy link
Member Author

This is a proof-of-concept code snippet for my answer to the optimized piping vision. Basically brain dump + a little validation of solidness.

@domenic
Copy link
Member

domenic commented Aug 30, 2016

Very, very interesting. Could you explain the vision a bit more? I think I see the outlines, but getting a high-level view would be nice.

I also think it would be ideal if we allowed JS strategies somehow. I guess they make things fairly observable though, as size() gets called all the time... that's tricky.

I'm going to work today on a requirements-based spec for pipeTo. Let's make sure it synchronizes with your vision.

@tyoshino
Copy link
Member Author

OK.

I forgot to note but this is intended to be used just for demonstrating that our API enables this kind of optimization. Not necessarily intended to be a part of the specification.

PipeRequest

First, an instance of the PipeRequest class corresponds to each pipeTo() invocation. It holds the instances and parameters given to the method:

  • destination writable stream instance
  • source readable stream instance
  • parameters such as preventClose given to pipeTo()

It also has

  • the pipe slot which points the Pipe instance which represents ongoing piping which is going to satisfy the PipeRequest (wholly or partially) if any.
  • the done flag which is initially false.

Every time a new pipeTo() is invoked, a PipeRequest instance describing the pipeTo() invocation is created and:

  • the PipeRequest is registered with the global pipe manager.
  • the PipeRequest retrieves current (skipped) piping candidates from the destination writable stream from .pipeCandidates and passes it to the source readable stream by calling .notifyPipeCandidates(). This is called pipe candidates propagation.
  • the PipeRequest subscribes to the destination writable stream to get notified of updates on the piping candidates of the destination writable stream by calling .waitPipeCandidatesChange(). Every time new candidates are received, it's propagated to the source readable stream.

The global pipe manager holds:

  • a list of all readable stream instances being piped.
  • lists representing piping candidates for each of the piped readable streams.
    • It's a list of references to PipeRequest instances. It's sorted in the order where ones went through the least number of "pipe candidates propagation" comes first. I.e. destinations closest to the readable stream comes first.

The global pipe manager chooses the best subset of triplets of:

  • a readable stream
  • pipe candidate (i.e. the destination writable stream)
  • the transfer method to use. E.g. between a TCP socket and a file descriptor, either of the default pipeTo() algorithm operating over the public ReadableStream._Reader/WritableStream._Writer interfaces, or offloading by sendfile() could be used. The global pipe manager lists up available transfer methods by checking the underlying sources/sinks of the readable/writable stream pair.

The subset must cover all the active PipeRequests. E.g. skipped piping to b in a list representing skipped piping candidates [a, b, c] can cover PipeRequests a and b. c must be covered by any other pipe.

Reorganizing pipes

Ongoing pipes can be stopped to be reorganized into longer or faster pipe. The duration (number of bytes to transfer) of a long piping is limited to the minimum of the requested number of bytes to transfer of all PipeRequests covered by the long piping.

Candidate propagation

The propagation of pipe candidates may happen asynchronously. E.g. IdentityTransformStream does that. This is useful when an IdentityTransformStream already have some chunks enqueued in it. It may exert backpressure to the source readable stream of an existing pipe whose destination is the writable side of the IdentityTransformStream so that it temporarily stops write()-ing new chunks. Once all the chunks are drained, it can announce the piping candidates received at the readable side to the writable side to choose the faster one. I think this backpressure is not normal backpressure signal but should be made by announcing an empty pipeCandidates to completely stop the piping (readable to the writable of the IdentityTransformStream) and wait for the ongoing piping to stop. We need some mechanism to realize this. Without strict backpressure like this, it's possible the ongoing pipe write()s new chunks before seeing the updates pipe candidates including long piping and switch to it before the queued chunks are processed.

@tyoshino
Copy link
Member Author

Extended the API a bit to address the flushing issue I discussed in the last comment.

@domenic
Copy link
Member

domenic commented Aug 31, 2016

Wow, perfect, thank you for the explanation! I understand this is not necessarily something for the spec, but I think it will be valuable to have written up somewhere that we can point people to as an example of how to use the spec's flexibility around piping to accomplish great things. It might be worth prototyping as well, if we have the time and energy; I'm not sure if that was your plan in this PR or if you were just using the .js code as a way to organize the thoughts.

So let me try to restate the actors in the system:

  • pipe requests represent a pipeTo call, i.e. (readable, writable, options) tuples
  • pipes represent actual data transfers between readable and writable streams
    • they include information on the transfer method, so they are roughly (readable, writable, transfer method) tuples.
  • the pipe manager is responsible for finding a series of pipes that satisfy all ongoing pipe requests
    • it probably also is responsible for ensuring that no matter what pipes are in play, the options for each pipe request are honored

With this in mind, the notifyPipeCandidates/waitPipeCandidatesChange mechanisms are all about coordinating to potentially update the set of pipes in action when the set of pipe requests in action change. This is what the reorganizing pipes section is about, I think?

I'm not sure I fully understand the candidate propagation section, or why it implies a new backpressure mechanism. My vision of identity transform streams (which is not fully thought out, so might be wrong) is that in a system like this, they get completely excluded from the list of pipes, i.e. rs.pipeThrough(id).pipeTo(ws) will create two pipe requests but only one pipe (from rs to ws). Then normal backpressure propagation would take place between rs and ws.

I guess you are concerned about cases like rs.pipeThrough(id); setTimeout(() => id.readable.pipeTo(ws), 10000), when you talk about asynchronocity? Or is it something else?

@tyoshino
Copy link
Member Author

tyoshino commented Sep 6, 2016

I'm not sure if that was your plan in this PR or if you were just using the .js code as a way to organize the thoughts.

Yeah, initially I attempted to just implement the API into the ReadableStream, but it looked it's going to complicate the reference impl so much, so I wrote it in a separate file. I want to evolve the code snippet to something working for verifying our ideas. Yes.

This is what the reorganizing pipes section is about, I think?

Right

I guess you are concerned about cases like rs.pipeThrough(id); setTimeout(() => id.readable.pipeTo(ws), 10000), when you talk about asynchronocity? Or is it something else?

Ah, yeah. Asynchronous invocation of pipeTo()s and pending chunks.

IdentityTransformStream's readable side and writable side themselves work as ReadableStream and WritableStream, and they can be separately passed around. Even pipeThrough() may be given some writable / readable pair which are build without considering the transform streams concept at all. I'd like to confirm that this understanding of mine is correct, first. Or, do you have some idea in your mind that we brand check that a given pair of writable / readable to pipeThrough is "transform stream"?

So, the writable side of an identity transform may get write()-en manually even when it's exerting backpressure. Then, some chunks are queued inside the transform stream.

Once pipeTo() is invoked on both the readable side and the writable side of the transform stream with pending chunks, we can consider optimizing this pair of pipeTo()-s. But we need to have the transform stream flush the chunks before establishing such skipped transfer.

Hmm, we can also resolve this by having a requirement enforced on all pipeTo() implementations to stop writing when the normal backpressure signal is exerted as well as the current pipeTo() reference implementation is doing. Yeah, it may work.

@domenic
Copy link
Member

domenic commented Sep 6, 2016

IdentityTransformStream's readable side and writable side themselves work as ReadableStream and WritableStream, and they can be separately passed around. Even pipeThrough() may be given some writable / readable pair which are build without considering the transform streams concept at all. I'd like to confirm that this understanding of mine is correct, first. Or, do you have some idea in your mind that we brand check that a given pair of writable / readable to pipeThrough is "transform stream"?

I haven't thought this through fully. But my idea was that yes, we would add an unobservable brand check and fast path if we know it's a transform stream created with the TransformStream constructor (or with TransformStream.identity() or whatever).

Once pipeTo() is invoked on both the readable side and the writable side of the transform stream with pending chunks, we can consider optimizing this pair of pipeTo()-s. But we need to have the transform stream flush the chunks before establishing such skipped transfer.

Got it. I understand the problem now.

Hmm, we can also resolve this by having a requirement enforced on all pipeTo() implementations to stop writing when the normal backpressure signal is exerted as well as the current pipeTo() reference implementation is doing. Yeah, it may work.

The draft in #512 says "While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null, the user agent must not read from reader." I think this is what we want, although I guess it is talking about reading, not writing. Maybe it should say both.

1 similar comment
@domenic
Copy link
Member

domenic commented Sep 6, 2016

IdentityTransformStream's readable side and writable side themselves work as ReadableStream and WritableStream, and they can be separately passed around. Even pipeThrough() may be given some writable / readable pair which are build without considering the transform streams concept at all. I'd like to confirm that this understanding of mine is correct, first. Or, do you have some idea in your mind that we brand check that a given pair of writable / readable to pipeThrough is "transform stream"?

I haven't thought this through fully. But my idea was that yes, we would add an unobservable brand check and fast path if we know it's a transform stream created with the TransformStream constructor (or with TransformStream.identity() or whatever).

Once pipeTo() is invoked on both the readable side and the writable side of the transform stream with pending chunks, we can consider optimizing this pair of pipeTo()-s. But we need to have the transform stream flush the chunks before establishing such skipped transfer.

Got it. I understand the problem now.

Hmm, we can also resolve this by having a requirement enforced on all pipeTo() implementations to stop writing when the normal backpressure signal is exerted as well as the current pipeTo() reference implementation is doing. Yeah, it may work.

The draft in #512 says "While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null, the user agent must not read from reader." I think this is what we want, although I guess it is talking about reading, not writing. Maybe it should say both.

@tyoshino
Copy link
Member Author

tyoshino commented Sep 7, 2016

we would add an unobservable brand check and fast path

I see. Anyway we need to perform some special interaction with the streams to realize the optimization. Even the propagation idea is requiring the streams to expose special signal / method.

One good thing of leaving the decision to the streams and have the global manager to fetch some predefined signal from them as done in the propagation idea above is that each transform stream can control the timing to announce longer piping. E.g. one could consume some initial header to do something special, and then pass through data to the next streams.

Maybe it should say both.

Yeah! I'm not yet sure about necessity of different backpressure signals. Let's just try.

@ricea ricea added the do not merge yet Pull request must not be merged per rationale in comment label Sep 6, 2018
Base automatically changed from master to main January 15, 2021 07:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
do not merge yet Pull request must not be merged per rationale in comment
Development

Successfully merging this pull request may close these issues.

3 participants