|
| 1 | +'use strict'; |
| 2 | + |
| 3 | +const { |
| 4 | + Uint8Array, |
| 5 | +} = primordials; |
| 6 | + |
| 7 | +const { |
| 8 | + ReadableStream, |
| 9 | +} = require('internal/webstreams/readablestream'); |
| 10 | + |
| 11 | +const { |
| 12 | + WritableStream, |
| 13 | +} = require('internal/webstreams/writablestream'); |
| 14 | + |
| 15 | +const { |
| 16 | + isArrayBufferView, |
| 17 | +} = require('util/types'); |
| 18 | + |
| 19 | +const { |
| 20 | + codes: { |
| 21 | + ERR_INVALID_ARG_TYPE, |
| 22 | + }, |
| 23 | +} = require('internal/errors'); |
| 24 | + |
| 25 | +// QUIC datagrams are unordered, unreliable packets of data exchanged over |
| 26 | +// a session. They are unrelated to any specific QUIC stream. Our implementation |
| 27 | +// uses a ReadableStream to receive datagrams and a WritableStream to send them. |
| 28 | +// Any ArrayBufferView can be used when sending a datagram. Received datagrams |
| 29 | +// will always be Uint8Array. |
| 30 | +// The DatagramReadableStream and DatagramWritableStream instances are created |
| 31 | +// and held internally by the QUIC Session object. Only the readable and writable |
| 32 | +// properties will be exposed. |
| 33 | + |
| 34 | +class DatagramReadableStream { |
| 35 | + #readable = undefined; |
| 36 | + |
| 37 | + /** @type {ReadableStreamDefaultController} */ |
| 38 | + #controller = undefined; |
| 39 | + |
| 40 | + constructor() { |
| 41 | + let controller; |
| 42 | + this.#readable = new ReadableStream({ |
| 43 | + start(c) { controller = c; }, |
| 44 | + }); |
| 45 | + this.#controller = controller; |
| 46 | + } |
| 47 | + |
| 48 | + get readable() { return this.#readable; } |
| 49 | + |
| 50 | + // Close the ReadableStream. The underlying source will be closed. Any |
| 51 | + // datagrams already in the queue will be preserved and will be read. |
| 52 | + close() { |
| 53 | + this.#controller.close(); |
| 54 | + } |
| 55 | + |
| 56 | + // Errors the readable stream |
| 57 | + error(reason) { |
| 58 | + this.#controller.error(reason); |
| 59 | + } |
| 60 | + |
| 61 | + // Enqueue a datagram to be read by the stream. This will always be |
| 62 | + // a Uint8Array. |
| 63 | + enqueue(datagram) { |
| 64 | + this.#controller.enqueue(datagram); |
| 65 | + } |
| 66 | +} |
| 67 | + |
| 68 | +class DatagramWritableStream { |
| 69 | + #writable = undefined; |
| 70 | + /** @type {WritableStreamDefaultController} */ |
| 71 | + #controller = undefined; |
| 72 | + |
| 73 | + /** |
| 74 | + * @callback DatagramWrittenCallback |
| 75 | + * @param {Uint8Array} chunk |
| 76 | + * @returns {Promise<void>} |
| 77 | + */ |
| 78 | + |
| 79 | + /** |
| 80 | + * @callback DatagramClosedCallback |
| 81 | + * @returns {Promise<void>} |
| 82 | + */ |
| 83 | + |
| 84 | + /** |
| 85 | + * @callback DatagramAbortedCallback |
| 86 | + * @param {any} reason |
| 87 | + * @returns {Promise<void>} |
| 88 | + */ |
| 89 | + |
| 90 | + /** |
| 91 | + * @param {DatagramWrittenCallback} written |
| 92 | + * @param {DatagramClosedCallback} closed |
| 93 | + * @param {DatagramAbortedCallback} aborted |
| 94 | + */ |
| 95 | + constructor(written, closed, aborted) { |
| 96 | + let controller; |
| 97 | + this.#writable = new WritableStream({ |
| 98 | + start(c) { controller = c; }, |
| 99 | + async close(controller) { |
| 100 | + try { |
| 101 | + await closed(undefined); |
| 102 | + } catch (err) { |
| 103 | + controller.error(err); |
| 104 | + } |
| 105 | + }, |
| 106 | + async abort(reason) { |
| 107 | + try { |
| 108 | + await aborted(reason); |
| 109 | + } catch { |
| 110 | + // There's nothing to do in this case |
| 111 | + } |
| 112 | + }, |
| 113 | + async write(chunk, controller) { |
| 114 | + if (!isArrayBufferView(chunk)) { |
| 115 | + throw new ERR_INVALID_ARG_TYPE('chunk', ['ArrayBufferView'], chunk); |
| 116 | + } |
| 117 | + const { |
| 118 | + byteOffset, |
| 119 | + byteLength, |
| 120 | + } = chunk; |
| 121 | + chunk = new Uint8Array(chunk.buffer.transfer(), byteOffset, byteLength); |
| 122 | + try { |
| 123 | + await written(chunk); |
| 124 | + } catch (err) { |
| 125 | + controller.error(err); |
| 126 | + } |
| 127 | + }, |
| 128 | + }); |
| 129 | + this.#controller = controller; |
| 130 | + } |
| 131 | + |
| 132 | + get writable() { return this.#writable; } |
| 133 | + |
| 134 | + error(reason) { |
| 135 | + this.#controller.error(reason); |
| 136 | + } |
| 137 | +} |
| 138 | + |
| 139 | +module.exports = { |
| 140 | + DatagramReadableStream, |
| 141 | + DatagramWritableStream, |
| 142 | +}; |
0 commit comments