Description
Specification
To avoid the overhead of creating and negotiation separate websocket connections per RPC call. We need to add the ability to multiplex streams over a single websocket stream.
Since webstreams are a message based protocal, we can wrap each message in a header to add data for multiplexing. Worst case we can searalized and deseralize JSON in a stream, the same way we do for the JSONRPC messages.
This will be abstracted on top of webstreams. We will have a multiplexing transform that will implement the multiplexing and a demux transform that does the reverse.
Message structure
A streamed websocket message is constructed in three parts:
+----------------------+----------------+------------------------+
| QUIC-style Varint | Message Type | Payload |
+----------------------+----------------+------------------------+
The QUIC-style Varint is used to represent the monotonic ID of the up to the limits of a 62 bit unsigned integer (stored as big-endian).
The message type is a byte that represents the type of the message received:
enum MessageType {
DATA = 0x00,
ACK = 0x01,
ERROR = 0x02,
CLOSE = 0x03,
}
Variable-Length Integer Encoding
This is to be done similar to QUIC.
The Variable-Length Integer Encoding
, or VarInt is an unsigned integer represented by anywhere from 1 to 8 bytes. It should use the first two bits of the initial byte to determine how many bytes the VarInt uses. Hence the usable bits of an encoded VarInt is 2 bits smaller than that of a u8, u16, u32, u64 number.
VarInt Prefix | Size (bytes) | Usable Bits |
---|---|---|
0b00 |
1 | 6 |
0b01 |
2 | 14 |
0b10 |
4 | 30 |
0b11 |
8 | 62 |
As the maximum unsigned integer that can be stored is 62 bits, I am using a bigint
to represent the decoded varint.
DATA Payload
In the case where the message is of type DATA
, the payload is arbitrary data written through the sender's WritableStream
.
ACK Payload
In the case where the message is of type ACK
, the payload represents the remaining empty buffer space of the receiver. This is the desiredSize
. This is represented as a 32bit big-endian integer (4 bytes).
ERROR and CLOSE Payload
In the case where the message is of type ERROR
or CLOSE
. The initial byte will represent whether the ReadableStream
or the WritableStream
of the receiver should be closed. This is represented by this enum:
enum StreamShutdown {
Read = 0,
Write = 1,
}
The remaining bytes in an ERROR
frame should be a Var-Int with QUIC-Style Variable-Length Integer Encoding
to represent an error code. This error code should be converted by the user of the library into a useful error by passing in a function into the constructor of WebSocketStream
.
The handling of the payloads should be so that the closing/erroring of the sender's WritableStream should close the receiver's ReadableStream and vice versa. This will be facilitated by the properties on the connection readableEnded
and writableEnded
. For example, closing of a sender's writable stream should set the sender's writableEnded
to true
, and then send an ERROR
/CLOSE
message with StreamShutdown.Read
to the receiver. The receiver will then handle the message by erroring on the ReadableStream
's controller, and setting readableEnded
to true
.
All application level errors produced by the user of the API will be converted by injected codeToReason
and reasonToCode
functions.
Lifecycle
Error Codes
These are the following non-application errors:
const enum StreamErrorCode {
Unknown = 0,
// an error parsing the structure of the message
ErrorReadableStreamParse = 1,
// an error occurring from the buffer of a readablestream going over the maximum size
ErrorReadableStreamBufferOverflow = 2,
}
All non-application errors will not be handled by the injected codeToReason
and reasonToCode
functions.
Started States
Upon starting a WebSocketStream, either:
- The WritableStream is blocked.
- The WritableStream is immediately ready.
- The WritableStream is unblocked.
A peer-created WebSocketStream can only be created upon receiving an Ack message. This avoids race conditions where data is sent immediately after an Ack message, causing the receiver to have a half-open WebSocketStream. This also avoids a sender creating a stream and sending Data to it without knowing what the buffer size of the receiver is.
Closing States
Upon closing a WritableStream, the WebSocketStream will pass by the following states:
- WritableStream Open - Peer ReadableStream Open
- WritableStream Closed - Peer ReadableStream Open (Half Closed)
- WritableStream Closed - Peer Readable Stream Closed
This opposite is true for closing a ReadableStream.
It is important to ensure that any operations that involve a blocking promise check if a Stream is closed before and after the promise resolution. This will make sure that the operations that the promises were blocking will not continue if the Stream is closed.
Backpressure
Backpressure will be achieved by implementing a message type that represents the acknowledgement of a message received.
On initiation of the stream, the receiver will send an ACK
with their buffer size to tell how much data the sender can send before blocking.
Hence, a sender will send their DATA
and the receiver will send ACK
when they have processed the data that the sender has sent.
The ACK
message will be resent accordingly as data is processed by the receiver.
After the first initial ACK
sent by the receiver, the sender sets a byte counter to the value of the size of the receiver's buffer. Each subsequent ACK
should include the readBytes
rather than the buffer size, so that the sender can add to the counter, representing how many free bytes are available on the receiver's buffer. The sender should block when the counter reaches 0.
If the sender were to send a message bigger than the receiver's buffer, the message should be sent in chunks. The sending of subsequent chunks after the first should be blocked on awaiting an ACK from the recipient.
Additional context
- Chat GPT discussion https://chat.openai.com/share/18a9bec1-adf4-425f-baf1-5645a47c24c3
Tasks
- Spec this out.
- Implement parsing messages and message types
- Implement backpressure
- Implement error handling