-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support stream multiplexing for websockets #2
Comments
This issue needs further specification based on the Q&A notes I'm copying over here. The original design of our RPC system was the assumption that the underlying transport will handle multiplexing and encryption. It was supposed to give us 2 major constructs to work with: "connection" and "stream". RPC-wise, the only thing it needs to care about is "stream" concept. When you make an RPC call to somewhere else, you need to open a stream. If you receive a stream, then you are handling an RPC call. That's it. The stream has to bidirectional and full-duplex. This is achieved right now by both js-quic and the current WS implementation in PK. However there's a key feature right now that's actually missing in the WS implementation. In QUIC, it is possible for the server which received a connection, to open up new streams back to the client on the same connection. With WS, it is not currently possible. Right now we are treating each WS connection as the stream, meaning a 1 to 1 relationship between "connection" and "stream". Without this ability, there's no way to achieve a sort of RPC conversation context. We have some situations where this is useful. Where the handling of an RPC call involves calling back to the client on a different handler. Think like Now with WS, it could work if both sides can just open new connections to each other. You can do this in QUIC too. However it's not as efficient as simply opening a new stream on the same connection. And furthermore, this actually cannot work if the client cannot run a server to accept new connections. When using QUIC, agents are both clients and servers. Thus you can initiate connections from any host. When using WS for the client service. The PKC is only a client, and the PKA is only a server. It's not possible for the PKA to open a new WS connection to the PKC. And this is further complicated by the fact that PKC may run in a browser! Which would not expose the underlying websocket server at all! So in order to achieve a sort of RPC conversation. It would be necessary to add an additional multiplexing layer on top of the WS connection. Thus creating WS streams on top of WS connections. Doing this would make our WS transport very similar to QUIC. Doing this also means the WS protocol is more complicated. We leverage WS's natural message framing (using binary messages), and put a header in front. This header must then supply 2 pieces of information: the stream ID and the message type. The stream ID can be a varint similar to QUIC's varint. The message type can be The rest of the payload will then be plugged into the relevant WS stream for further processing. Note that JSON RPC has an id field. We can just simply copy over the stream ID here, or use it for logging/auditing/identification. It's not really used for tracking our streams. Could be useful for tracing. When using
For QUIC-style varints, the encoding is a bit different than Protocol Buffers varints. The size of the varint is determined by the two highest bits of the first byte:
For the numbers 0 through 10, only the 1-byte format is needed since they can all be represented in 6 bits or fewer:
Using printf "\x00" # 0
printf "\x01" # 1
printf "\x02" # 2
# ... and so on
printf "\x0A" # 10 As you can see, the representation for numbers 0 to 10 is identical in output between Protocol Buffers and QUIC-style varints. The difference arises mainly when you go beyond 6-bit values, as the way the continuation bits work and the total length of the varint changes between the two methods. |
The benefits of doing this are:
Once you have something like this. Any WS connection can emit new streams. If you don't want to handle streams from the other side. You just need to close the stream as soon as you handle them. Streams will need 3 kinds of messages. Data message is the regular one. Close message is another. And finally an error message. We can use mechanisms similar to QUIC streams to achieve this. Another thing is that the stream ID has to be monotonic. Once a stream is closed or errored, its stream ID is not allowed to be re-used. So they must just keep increasing the stream ID. In this sense, it is necessary consider the protocol error if a stream ID appears again. See how QUIC deals with this and copy the behaviour. |
Message framing of the different stream-related messages is easy due to websocket's guarantee that each message is atomically transferred even if the underlying connection does fragmentation. Therefore, this is all we really need.
If however message framing was not guaranteed. We would add Message Length after the type, and it would probably also be a QUIC-style varint. Note that QUIC-style varints have potentially different sizes. Just like varints in general. However they are bounded by the maximum size. Whereas protobuf's varints are not, they can just keep going on forever. The QUIC-style varints I think are little more efficient even though they have a max bound. |
This work should only be done during |
As an aside, I was also exploring the idea of HTTP2 as the transport. Apparently HTTP2 streams could be used too. Based on this issue, it would also require an additional multiplexing built on top to support the The browser apparently supports using async function* generateDataChunks() {
for (let i = 0; i < 5; i++) {
yield new TextEncoder().encode(`Chunk ${i}\n`);
await new Promise(resolve => setTimeout(resolve, 1000)); // Simulate delay between chunks
}
}
const stream = new ReadableStream({
async start(controller) {
for await (const chunk of generateDataChunks()) {
controller.enqueue(chunk);
}
controller.close();
}
});
const response = await fetch('https://localhost:8443/echo-endpoint', {
method: 'POST',
body: stream,
headers: { 'Content-Type': 'text/plain' }
});
// Process the response headers
console.log('Status Code:', response.status);
console.log('Content-Type:', response.headers.get('content-type'));
// Process the data chunks
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log('Received from server:', new TextDecoder().decode(value));
} This however is more complex as the HTTP2 stream doesn't have a message framing. And you'd have to do that yourself. Each HTTP2 stream itself would be multiplexed on top of a single HTTP2 connection. The need to layer an additional multiplexed stream on top of http2 is only necessary because the browser doesn't expose the http2 streams directly. If you didn't care about the browser, you could just use HTTP2 connection and HTTP2 streams just like QUIC connection and QUIC streams. The For browser communication, I don't think we would bother doing this with HTTP2, as we already have WS to do this. But it's just an interesting idea. Don't forget that this would be specific to HTTP2, as HTTP3 would be different again. The differences would be different depending on HTTP Anyway this means these kinds of transports would be possible.
|
Moving this to |
An extra control message needs to be introduced, being ACK. |
Follow this: https://chat.openai.com/share/18a9bec1-adf4-425f-baf1-5645a47c24c3 as well for the conversation regarding ACK. |
The backpressure mechanism along with the muxing and demuxing of streams are all tightly interconnected. So it has to be designed bit by bit. You'll need to start with just the message protocol first by using a binary header as per the first chatgpt link in https://chat.openai.com/share/18a9bec1-adf4-425f-baf1-5645a47c24c3. Also @amydevs can you please spec out the tasks above, just think high level tasks first. |
Good work on the spec so far, there is a good amount of information on how the back-pressure and writing works. Small note about the ack message, As you described above I get the impression you're sending the available buffer each time you ack. Remember that this is a concurrent system. If you for example sent 500 bytes and 500 bytes quickly. When the receiving side processed the first 500 bytes and acked the 1024 available space. The sender could immediately send another 1024 bytes. At this point we'd have 1524 bytes in flight, well exceeding the available buffer. Very likely resulting in an error. I think we should only ack the number of bytes processed and not the number of bytes available. On the sending side we keep a count of available bytes we can send. We subtract after writing and add after receiving the acks. Addition and subtraction here is commutative and atomic so we can't butcher the value. Overwriting this value concurrently will run into consistency problems. The spec is also light on details for how the error and close messages will work. But also needs more details on how the messages are structured as well. How does an error message look? What details are included in it? Will it have an code and message? Same for the close message. If we consider each stream as a separate state machine, how do the state transitions work? The sending and receiving streams function independently right, how does that work? We don't need to go into full implementation detail, but we do need enough of it described so that anyone coming to this cold can understand enough of the structure to start working on it. |
Specification
To avoid the overhead of creating and negotiation separate websocket connections per RPC call. We need to add the ability to multiplex streams over a single websocket stream.
Since webstreams are a message based protocal, we can wrap each message in a header to add data for multiplexing. Worst case we can searalized and deseralize JSON in a stream, the same way we do for the JSONRPC messages.
This will be abstracted on top of webstreams. We will have a multiplexing transform that will implement the multiplexing and a demux transform that does the reverse.
Message structure
A streamed websocket message is constructed in three parts:
The QUIC-style Varint is used to represent the monotonic ID of the up to the limits of a 62 bit unsigned integer (stored as big-endian).
The message type is a byte that represents the type of the message received:
Variable-Length Integer Encoding
This is to be done similar to QUIC.
The
Variable-Length Integer Encoding
, or VarInt is an unsigned integer represented by anywhere from 1 to 8 bytes. It should use the first two bits of the initial byte to determine how many bytes the VarInt uses. Hence the usable bits of an encoded VarInt is 2 bits smaller than that of a u8, u16, u32, u64 number.0b00
0b01
0b10
0b11
As the maximum unsigned integer that can be stored is 62 bits, I am using a
bigint
to represent the decoded varint.DATA Payload
In the case where the message is of type
DATA
, the payload is arbitrary data written through the sender'sWritableStream
.ACK Payload
In the case where the message is of type
ACK
, the payload represents the remaining empty buffer space of the receiver. This is thedesiredSize
. This is represented as a 32bit big-endian integer (4 bytes).ERROR and CLOSE Payload
In the case where the message is of type
ERROR
orCLOSE
. The initial byte will represent whether theReadableStream
or theWritableStream
of the receiver should be closed. This is represented by this enum:The remaining bytes in an
ERROR
frame should be a Var-Int with QUIC-StyleVariable-Length Integer Encoding
to represent an error code. This error code should be converted by the user of the library into a useful error by passing in a function into the constructor ofWebSocketStream
.The handling of the payloads should be so that the closing/erroring of the sender's WritableStream should close the receiver's ReadableStream and vice versa. This will be facilitated by the properties on the connection
readableEnded
andwritableEnded
. For example, closing of a sender's writable stream should set the sender'swritableEnded
totrue
, and then send anERROR
/CLOSE
message withStreamShutdown.Read
to the receiver. The receiver will then handle the message by erroring on theReadableStream
's controller, and settingreadableEnded
totrue
.All application level errors produced by the user of the API will be converted by injected
codeToReason
andreasonToCode
functions.Lifecycle
Error Codes
These are the following non-application errors:
All non-application errors will not be handled by the injected
codeToReason
andreasonToCode
functions.Started States
Upon starting a WebSocketStream, either:
A peer-created WebSocketStream can only be created upon receiving an Ack message. This avoids race conditions where data is sent immediately after an Ack message, causing the receiver to have a half-open WebSocketStream. This also avoids a sender creating a stream and sending Data to it without knowing what the buffer size of the receiver is.
Closing States
Upon closing a WritableStream, the WebSocketStream will pass by the following states:
This opposite is true for closing a ReadableStream.
It is important to ensure that any operations that involve a blocking promise check if a Stream is closed before and after the promise resolution. This will make sure that the operations that the promises were blocking will not continue if the Stream is closed.
Backpressure
Backpressure will be achieved by implementing a message type that represents the acknowledgement of a message received.
On initiation of the stream, the receiver will send an
ACK
with their buffer size to tell how much data the sender can send before blocking.Hence, a sender will send their
DATA
and the receiver will sendACK
when they have processed the data that the sender has sent.The
ACK
message will be resent accordingly as data is processed by the receiver.After the first initial
ACK
sent by the receiver, the sender sets a byte counter to the value of the size of the receiver's buffer. Each subsequentACK
should include thereadBytes
rather than the buffer size, so that the sender can add to the counter, representing how many free bytes are available on the receiver's buffer. The sender should block when the counter reaches 0.If the sender were to send a message bigger than the receiver's buffer, the message should be sent in chunks. The sending of subsequent chunks after the first should be blocked on awaiting an ACK from the recipient.
!
Additional context
Tasks
The text was updated successfully, but these errors were encountered: