Skip to content

Commit

Permalink
Add BinaryChunk type
Browse files Browse the repository at this point in the history
This lets us have different environments operate on different types, such
as passing the raw typed array when possible.
  • Loading branch information
sebmarkbage committed Jun 15, 2023
1 parent 2d111f1 commit e5e3f54
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 28 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ module.exports = {
$PropertyType: 'readonly',
$ReadOnly: 'readonly',
$ReadOnlyArray: 'readonly',
$ArrayBufferView: 'readonly',
$Shape: 'readonly',
AnimationFrameID: 'readonly',
// For Flow type annotation. Only `BigInt` is valid at runtime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface Destination {

export opaque type PrecomputedChunk = string;
export opaque type Chunk = string;
export opaque type BinaryChunk = string;

export function scheduleWork(callback: () => void) {
callback();
Expand All @@ -25,14 +26,14 @@ export function beginWriting(destination: Destination) {}

export function writeChunk(
destination: Destination,
chunk: Chunk | PrecomputedChunk,
chunk: Chunk | PrecomputedChunk | BinaryChunk,
): void {
writeChunkAndReturn(destination, chunk);
}

export function writeChunkAndReturn(
destination: Destination,
chunk: Chunk | PrecomputedChunk,
chunk: Chunk | PrecomputedChunk | BinaryChunk,
): boolean {
return destination.push(chunk);
}
Expand All @@ -51,6 +52,12 @@ export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
return content;
}

export function typedArrayToBinaryChunk(
content: $ArrayBufferView,
): BinaryChunk {
throw new Error('Not implemented.');
}

export function clonePrecomputedChunk(
chunk: PrecomputedChunk,
): PrecomputedChunk {
Expand All @@ -61,6 +68,10 @@ export function byteLengthOfChunk(chunk: Chunk | PrecomputedChunk): number {
throw new Error('Not implemented.');
}

export function byteLengthOfBinaryChunk(chunk: BinaryChunk): number {
throw new Error('Not implemented.');
}

export function closeWithError(destination: Destination, error: mixed): void {
// $FlowFixMe[incompatible-call]: This is an Error object or the destination accepts other types.
destination.destroy(error);
Expand Down
11 changes: 11 additions & 0 deletions packages/react-server-dom-fb/src/ReactServerStreamConfigFB.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export type Destination = {

export opaque type PrecomputedChunk = string;
export opaque type Chunk = string;
export type BinaryChunk = string;

export function scheduleWork(callback: () => void) {
// We don't schedule work in this model, and instead expect performWork to always be called repeatedly.
Expand Down Expand Up @@ -57,6 +58,12 @@ export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
return content;
}

export function typedArrayToBinaryChunk(
content: $ArrayBufferView,
): BinaryChunk {
throw new Error('Not implemented.');
}

export function clonePrecomputedChunk(
chunk: PrecomputedChunk,
): PrecomputedChunk {
Expand All @@ -67,6 +74,10 @@ export function byteLengthOfChunk(chunk: Chunk | PrecomputedChunk): number {
throw new Error('Not implemented.');
}

export function byteLengthOfBinaryChunk(chunk: BinaryChunk): number {
throw new Error('Not implemented.');
}

export function closeWithError(destination: Destination, error: mixed): void {
destination.done = true;
destination.fatal = true;
Expand Down
6 changes: 3 additions & 3 deletions packages/react-server/src/ReactFlightServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* @flow
*/

import type {Chunk, Destination} from './ReactServerStreamConfig';
import type {Chunk, BinaryChunk, Destination} from './ReactServerStreamConfig';

import {
scheduleWork,
Expand Down Expand Up @@ -172,7 +172,7 @@ export type Request = {
pingedTasks: Array<Task>,
completedImportChunks: Array<Chunk>,
completedHintChunks: Array<Chunk>,
completedRegularChunks: Array<Chunk>,
completedRegularChunks: Array<Chunk | BinaryChunk>,
completedErrorChunks: Array<Chunk>,
writtenSymbols: Map<symbol, number>,
writtenClientReferences: Map<ClientReferenceKey, number>,
Expand Down Expand Up @@ -231,7 +231,7 @@ export function createRequest(
pingedTasks: pingedTasks,
completedImportChunks: ([]: Array<Chunk>),
completedHintChunks: ([]: Array<Chunk>),
completedRegularChunks: ([]: Array<Chunk>),
completedRegularChunks: ([]: Array<Chunk | BinaryChunk>),
completedErrorChunks: ([]: Array<Chunk>),
writtenSymbols: new Map(),
writtenClientReferences: new Map(),
Expand Down
40 changes: 32 additions & 8 deletions packages/react-server/src/ReactServerStreamConfigBrowser.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export type Destination = ReadableStreamController;

export type PrecomputedChunk = Uint8Array;
export opaque type Chunk = Uint8Array;
export type BinaryChunk = Uint8Array;

export function scheduleWork(callback: () => void) {
callback();
Expand All @@ -32,13 +33,13 @@ export function beginWriting(destination: Destination) {

export function writeChunk(
destination: Destination,
chunk: PrecomputedChunk | Chunk,
chunk: PrecomputedChunk | Chunk | BinaryChunk,
): void {
if (chunk.length === 0) {
if (chunk.byteLength === 0) {
return;
}

if (chunk.length > VIEW_SIZE) {
if (chunk.byteLength > VIEW_SIZE) {
if (__DEV__) {
if (precomputedChunkSet.has(chunk)) {
console.error(
Expand Down Expand Up @@ -68,7 +69,7 @@ export function writeChunk(

let bytesToWrite = chunk;
const allowableBytes = ((currentView: any): Uint8Array).length - writtenBytes;
if (allowableBytes < bytesToWrite.length) {
if (allowableBytes < bytesToWrite.byteLength) {
// this chunk would overflow the current view. We enqueue a full view
// and start a new view with the remaining chunk
if (allowableBytes === 0) {
Expand All @@ -89,12 +90,12 @@ export function writeChunk(
writtenBytes = 0;
}
((currentView: any): Uint8Array).set(bytesToWrite, writtenBytes);
writtenBytes += bytesToWrite.length;
writtenBytes += bytesToWrite.byteLength;
}

export function writeChunkAndReturn(
destination: Destination,
chunk: PrecomputedChunk | Chunk,
chunk: PrecomputedChunk | Chunk | BinaryChunk,
): boolean {
writeChunk(destination, chunk);
// in web streams there is no backpressure so we can alwas write more
Expand All @@ -119,7 +120,9 @@ export function stringToChunk(content: string): Chunk {
return textEncoder.encode(content);
}

const precomputedChunkSet: Set<Chunk> = __DEV__ ? new Set() : (null: any);
const precomputedChunkSet: Set<Chunk | BinaryChunk> = __DEV__
? new Set()
: (null: any);

export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
const precomputedChunk = textEncoder.encode(content);
Expand All @@ -131,10 +134,27 @@ export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
return precomputedChunk;
}

export function typedArrayToBinaryChunk(
content: $ArrayBufferView,
): BinaryChunk {
// Convert any non-Uint8Array array to Uint8Array. We could avoid this for Uint8Arrays.
// If we passed through this straight to enqueue we wouldn't have to convert it but since
// we need to copy the buffer in that case, we need to convert it to copy it.
// When we copy it into another array using set() it needs to be a Uint8Array.
const buffer = new Uint8Array(
content.buffer,
content.byteOffset,
content.byteLength,
);
// We clone large chunks so that we can transfer them when we write them.
// Others get copied into the target buffer.
return content.byteLength > VIEW_SIZE ? buffer.slice() : buffer;
}

export function clonePrecomputedChunk(
precomputedChunk: PrecomputedChunk,
): PrecomputedChunk {
return precomputedChunk.length > VIEW_SIZE
return precomputedChunk.byteLength > VIEW_SIZE
? precomputedChunk.slice()
: precomputedChunk;
}
Expand All @@ -143,6 +163,10 @@ export function byteLengthOfChunk(chunk: Chunk | PrecomputedChunk): number {
return chunk.byteLength;
}

export function byteLengthOfBinaryChunk(chunk: BinaryChunk): number {
return chunk.byteLength;
}

export function closeWithError(destination: Destination, error: mixed): void {
// $FlowFixMe[method-unbinding]
if (typeof destination.error === 'function') {
Expand Down
18 changes: 15 additions & 3 deletions packages/react-server/src/ReactServerStreamConfigBun.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@

type BunReadableStreamController = ReadableStreamController & {
end(): mixed,
write(data: Chunk): void,
write(data: Chunk | BinaryChunk): void,
error(error: Error): void,
};
export type Destination = BunReadableStreamController;

export type PrecomputedChunk = string;
export opaque type Chunk = string;
export type BinaryChunk = $ArrayBufferView;

export function scheduleWork(callback: () => void) {
callback();
Expand All @@ -30,7 +31,7 @@ export function beginWriting(destination: Destination) {}

export function writeChunk(
destination: Destination,
chunk: PrecomputedChunk | Chunk,
chunk: PrecomputedChunk | Chunk | BinaryChunk,
): void {
if (chunk.length === 0) {
return;
Expand All @@ -41,7 +42,7 @@ export function writeChunk(

export function writeChunkAndReturn(
destination: Destination,
chunk: PrecomputedChunk | Chunk,
chunk: PrecomputedChunk | Chunk | BinaryChunk,
): boolean {
return !!destination.write(chunk);
}
Expand All @@ -60,6 +61,13 @@ export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
return content;
}

export function typedArrayToBinaryChunk(
content: $ArrayBufferView,
): BinaryChunk {
// TODO: Does this needs to be cloned if it's transferred in enqueue()?
return content;
}

export function clonePrecomputedChunk(
chunk: PrecomputedChunk,
): PrecomputedChunk {
Expand All @@ -70,6 +78,10 @@ export function byteLengthOfChunk(chunk: Chunk | PrecomputedChunk): number {
return Buffer.byteLength(chunk, 'utf8');
}

export function byteLengthOfBinaryChunk(chunk: BinaryChunk): number {
return chunk.byteLength;
}

export function closeWithError(destination: Destination, error: mixed): void {
if (typeof destination.error === 'function') {
// $FlowFixMe[incompatible-call]: This is an Error object or the destination accepts other types.
Expand Down
40 changes: 32 additions & 8 deletions packages/react-server/src/ReactServerStreamConfigEdge.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export type Destination = ReadableStreamController;

export type PrecomputedChunk = Uint8Array;
export opaque type Chunk = Uint8Array;
export type BinaryChunk = Uint8Array;

export function scheduleWork(callback: () => void) {
setTimeout(callback, 0);
Expand All @@ -32,13 +33,13 @@ export function beginWriting(destination: Destination) {

export function writeChunk(
destination: Destination,
chunk: PrecomputedChunk | Chunk,
chunk: PrecomputedChunk | Chunk | BinaryChunk,
): void {
if (chunk.length === 0) {
if (chunk.byteLength === 0) {
return;
}

if (chunk.length > VIEW_SIZE) {
if (chunk.byteLength > VIEW_SIZE) {
if (__DEV__) {
if (precomputedChunkSet.has(chunk)) {
console.error(
Expand Down Expand Up @@ -68,7 +69,7 @@ export function writeChunk(

let bytesToWrite = chunk;
const allowableBytes = ((currentView: any): Uint8Array).length - writtenBytes;
if (allowableBytes < bytesToWrite.length) {
if (allowableBytes < bytesToWrite.byteLength) {
// this chunk would overflow the current view. We enqueue a full view
// and start a new view with the remaining chunk
if (allowableBytes === 0) {
Expand All @@ -89,12 +90,12 @@ export function writeChunk(
writtenBytes = 0;
}
((currentView: any): Uint8Array).set(bytesToWrite, writtenBytes);
writtenBytes += bytesToWrite.length;
writtenBytes += bytesToWrite.byteLength;
}

export function writeChunkAndReturn(
destination: Destination,
chunk: PrecomputedChunk | Chunk,
chunk: PrecomputedChunk | Chunk | BinaryChunk,
): boolean {
writeChunk(destination, chunk);
// in web streams there is no backpressure so we can alwas write more
Expand All @@ -119,7 +120,9 @@ export function stringToChunk(content: string): Chunk {
return textEncoder.encode(content);
}

const precomputedChunkSet: Set<Chunk> = __DEV__ ? new Set() : (null: any);
const precomputedChunkSet: Set<Chunk | BinaryChunk> = __DEV__
? new Set()
: (null: any);

export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
const precomputedChunk = textEncoder.encode(content);
Expand All @@ -131,10 +134,27 @@ export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
return precomputedChunk;
}

export function typedArrayToBinaryChunk(
content: $ArrayBufferView,
): BinaryChunk {
// Convert any non-Uint8Array array to Uint8Array. We could avoid this for Uint8Arrays.
// If we passed through this straight to enqueue we wouldn't have to convert it but since
// we need to copy the buffer in that case, we need to convert it to copy it.
// When we copy it into another array using set() it needs to be a Uint8Array.
const buffer = new Uint8Array(
content.buffer,
content.byteOffset,
content.byteLength,
);
// We clone large chunks so that we can transfer them when we write them.
// Others get copied into the target buffer.
return content.byteLength > VIEW_SIZE ? buffer.slice() : buffer;
}

export function clonePrecomputedChunk(
precomputedChunk: PrecomputedChunk,
): PrecomputedChunk {
return precomputedChunk.length > VIEW_SIZE
return precomputedChunk.byteLength > VIEW_SIZE
? precomputedChunk.slice()
: precomputedChunk;
}
Expand All @@ -143,6 +163,10 @@ export function byteLengthOfChunk(chunk: Chunk | PrecomputedChunk): number {
return chunk.byteLength;
}

export function byteLengthOfBinaryChunk(chunk: BinaryChunk): number {
return chunk.byteLength;
}

export function closeWithError(destination: Destination, error: mixed): void {
// $FlowFixMe[method-unbinding]
if (typeof destination.error === 'function') {
Expand Down
Loading

0 comments on commit e5e3f54

Please sign in to comment.