Skip to content

Support stream multiplexing for websockets #2

Closed
@tegefaulkes

Description

@tegefaulkes

Specification

To avoid the overhead of creating and negotiation separate websocket connections per RPC call. We need to add the ability to multiplex streams over a single websocket stream.

Since webstreams are a message based protocal, we can wrap each message in a header to add data for multiplexing. Worst case we can searalized and deseralize JSON in a stream, the same way we do for the JSONRPC messages.

This will be abstracted on top of webstreams. We will have a multiplexing transform that will implement the multiplexing and a demux transform that does the reverse.

Message structure

A streamed websocket message is constructed in three parts:

+----------------------+----------------+------------------------+
|   QUIC-style Varint  |  Message Type  |       Payload          |
+----------------------+----------------+------------------------+

The QUIC-style Varint is used to represent the monotonic ID of the up to the limits of a 62 bit unsigned integer (stored as big-endian).

The message type is a byte that represents the type of the message received:

enum MessageType {
  DATA = 0x00,
  ACK = 0x01,
  ERROR = 0x02,
  CLOSE = 0x03,
}
Variable-Length Integer Encoding

This is to be done similar to QUIC.
The Variable-Length Integer Encoding, or VarInt is an unsigned integer represented by anywhere from 1 to 8 bytes. It should use the first two bits of the initial byte to determine how many bytes the VarInt uses. Hence the usable bits of an encoded VarInt is 2 bits smaller than that of a u8, u16, u32, u64 number.

VarInt Prefix Size (bytes) Usable Bits
0b00 1 6
0b01 2 14
0b10 4 30
0b11 8 62

As the maximum unsigned integer that can be stored is 62 bits, I am using a bigint to represent the decoded varint.

DATA Payload

In the case where the message is of type DATA, the payload is arbitrary data written through the sender's WritableStream.

ACK Payload

In the case where the message is of type ACK, the payload represents the remaining empty buffer space of the receiver. This is the desiredSize. This is represented as a 32bit big-endian integer (4 bytes).

ERROR and CLOSE Payload

In the case where the message is of type ERROR or CLOSE. The initial byte will represent whether the ReadableStream or the WritableStream of the receiver should be closed. This is represented by this enum:

enum StreamShutdown {
  Read = 0,
  Write = 1,
}

The remaining bytes in an ERROR frame should be a Var-Int with QUIC-Style Variable-Length Integer Encoding to represent an error code. This error code should be converted by the user of the library into a useful error by passing in a function into the constructor of WebSocketStream.

image

The handling of the payloads should be so that the closing/erroring of the sender's WritableStream should close the receiver's ReadableStream and vice versa. This will be facilitated by the properties on the connection readableEnded and writableEnded. For example, closing of a sender's writable stream should set the sender's writableEnded to true, and then send an ERROR/CLOSE message with StreamShutdown.Read to the receiver. The receiver will then handle the message by erroring on the ReadableStream's controller, and setting readableEnded to true.

All application level errors produced by the user of the API will be converted by injected codeToReason and reasonToCode functions.

Lifecycle

Error Codes

These are the following non-application errors:

const enum StreamErrorCode {
  Unknown = 0,
  // an error parsing the structure of the message
  ErrorReadableStreamParse = 1,
  // an error occurring from the buffer of a readablestream going over the maximum size
  ErrorReadableStreamBufferOverflow = 2,
}

All non-application errors will not be handled by the injected codeToReason and reasonToCode functions.

Started States

Upon starting a WebSocketStream, either:

  • The WritableStream is blocked.
  • The WritableStream is immediately ready.
  • The WritableStream is unblocked.

image

A peer-created WebSocketStream can only be created upon receiving an Ack message. This avoids race conditions where data is sent immediately after an Ack message, causing the receiver to have a half-open WebSocketStream. This also avoids a sender creating a stream and sending Data to it without knowing what the buffer size of the receiver is.

Closing States

Upon closing a WritableStream, the WebSocketStream will pass by the following states:

  • WritableStream Open - Peer ReadableStream Open
  • WritableStream Closed - Peer ReadableStream Open (Half Closed)
  • WritableStream Closed - Peer Readable Stream Closed

This opposite is true for closing a ReadableStream.

It is important to ensure that any operations that involve a blocking promise check if a Stream is closed before and after the promise resolution. This will make sure that the operations that the promises were blocking will not continue if the Stream is closed.

Backpressure

Backpressure will be achieved by implementing a message type that represents the acknowledgement of a message received.

On initiation of the stream, the receiver will send an ACK with their buffer size to tell how much data the sender can send before blocking.

Hence, a sender will send their DATA and the receiver will send ACK when they have processed the data that the sender has sent.

The ACK message will be resent accordingly as data is processed by the receiver.

image

After the first initial ACK sent by the receiver, the sender sets a byte counter to the value of the size of the receiver's buffer. Each subsequent ACK should include the readBytes rather than the buffer size, so that the sender can add to the counter, representing how many free bytes are available on the receiver's buffer. The sender should block when the counter reaches 0.

If the sender were to send a message bigger than the receiver's buffer, the message should be sent in chunks. The sending of subsequent chunks after the first should be blocked on awaiting an ACK from the recipient.

!image

Additional context

Tasks

  1. Spec this out.
  2. Implement parsing messages and message types
  3. Implement backpressure
  4. Implement error handling

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions