Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 37 additions & 56 deletions packages/react-client/src/ReactFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';

import {
enablePostpone,
enableFlightReadableStream,
enableOwnerStacks,
enableServerComponentLogs,
enableProfilerTimer,
Expand Down Expand Up @@ -406,14 +405,12 @@ function wakeChunkIfInitialized<T>(

function triggerErrorOnChunk<T>(chunk: SomeChunk<T>, error: mixed): void {
if (chunk.status !== PENDING && chunk.status !== BLOCKED) {
if (enableFlightReadableStream) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
// $FlowFixMe[incompatible-call]: The error method should accept mixed.
controller.error(error);
}
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
// $FlowFixMe[incompatible-call]: The error method should accept mixed.
controller.error(error);
return;
}
const listeners = chunk.reason;
Expand Down Expand Up @@ -512,13 +509,11 @@ function resolveModelChunk<T>(
value: UninitializedModel,
): void {
if (chunk.status !== PENDING) {
if (enableFlightReadableStream) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueModel(value);
}
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueModel(value);
return;
}
const resolveListeners = chunk.value;
Expand Down Expand Up @@ -1718,16 +1713,14 @@ function resolveModel(

function resolveText(response: Response, id: number, text: string): void {
const chunks = response._chunks;
if (enableFlightReadableStream) {
const chunk = chunks.get(id);
if (chunk && chunk.status !== PENDING) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueValue(text);
return;
}
const chunk = chunks.get(id);
if (chunk && chunk.status !== PENDING) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueValue(text);
return;
}
chunks.set(id, createInitializedTextChunk(response, text));
}
Expand All @@ -1738,16 +1731,14 @@ function resolveBuffer(
buffer: $ArrayBufferView | ArrayBuffer,
): void {
const chunks = response._chunks;
if (enableFlightReadableStream) {
const chunk = chunks.get(id);
if (chunk && chunk.status !== PENDING) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueValue(buffer);
return;
}
const chunk = chunks.get(id);
if (chunk && chunk.status !== PENDING) {
// If we get more data to an already resolved ID, we assume that it's
// a stream chunk since any other row shouldn't have more than one entry.
const streamChunk: InitializedStreamChunk<any> = (chunk: any);
const controller = streamChunk.reason;
controller.enqueueValue(buffer);
return;
}
chunks.set(id, createInitializedBufferChunk(response, buffer));
}
Expand Down Expand Up @@ -2967,38 +2958,28 @@ function processFullStringRow(
);
}
case 82 /* "R" */: {
if (enableFlightReadableStream) {
startReadableStream(response, id, undefined);
return;
}
startReadableStream(response, id, undefined);
return;
}
// Fallthrough
case 114 /* "r" */: {
if (enableFlightReadableStream) {
startReadableStream(response, id, 'bytes');
return;
}
startReadableStream(response, id, 'bytes');
return;
}
// Fallthrough
case 88 /* "X" */: {
if (enableFlightReadableStream) {
startAsyncIterable(response, id, false);
return;
}
startAsyncIterable(response, id, false);
return;
}
// Fallthrough
case 120 /* "x" */: {
if (enableFlightReadableStream) {
startAsyncIterable(response, id, true);
return;
}
startAsyncIterable(response, id, true);
return;
}
// Fallthrough
case 67 /* "C" */: {
if (enableFlightReadableStream) {
stopStream(response, id, row);
return;
}
stopStream(response, id, row);
return;
}
// Fallthrough
case 80 /* "P" */: {
Expand Down
37 changes: 16 additions & 21 deletions packages/react-client/src/ReactFlightReplyClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import type {
import type {LazyComponent} from 'react/src/ReactLazy';
import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';

import {
enableRenderableContext,
enableFlightReadableStream,
} from 'shared/ReactFeatureFlags';
import {enableRenderableContext} from 'shared/ReactFeatureFlags';

import {
REACT_ELEMENT_TYPE,
Expand Down Expand Up @@ -663,23 +660,21 @@ export function processReply(
return Array.from((iterator: any));
}

if (enableFlightReadableStream) {
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
if (
typeof ReadableStream === 'function' &&
value instanceof ReadableStream
) {
return serializeReadableStream(value);
}
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
(value: any)[ASYNC_ITERATOR];
if (typeof getAsyncIterator === 'function') {
// We treat AsyncIterables as a Fragment and as such we might need to key them.
return serializeAsyncIterable(
(value: any),
getAsyncIterator.call((value: any)),
);
}
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
if (
typeof ReadableStream === 'function' &&
value instanceof ReadableStream
) {
return serializeReadableStream(value);
}
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
(value: any)[ASYNC_ITERATOR];
if (typeof getAsyncIterator === 'function') {
// We treat AsyncIterables as a Fragment and as such we might need to key them.
return serializeAsyncIterable(
(value: any),
getAsyncIterator.call((value: any)),
);
}

// Verify that this is a simple plain object.
Expand Down
11 changes: 5 additions & 6 deletions packages/react-client/src/__tests__/ReactFlight-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2623,7 +2623,7 @@ describe('ReactFlight', () => {
);
});

// @gate enableFlightReadableStream && enableAsyncIterableChildren
// @gate enableAsyncIterableChildren
it('shares state when moving keyed Server Components that render async iterables', async () => {
function StatefulClient({name, initial}) {
const [state] = React.useState(initial);
Expand Down Expand Up @@ -2814,7 +2814,7 @@ describe('ReactFlight', () => {
);
});

// @gate enableFlightReadableStream && enableAsyncIterableChildren
// @gate enableAsyncIterableChildren
it('preserves debug info for server-to-server pass through of async iterables', async () => {
let resolve;
const iteratorPromise = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -2849,10 +2849,9 @@ describe('ReactFlight', () => {
},
);

if (gate(flag => flag.enableFlightReadableStream)) {
// Wait for the iterator to finish
await iteratorPromise;
}
// Wait for the iterator to finish
await iteratorPromise;

await 0; // One more tick for the return value / closing.

const transport = ReactNoopFlightServer.render(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2079,7 +2079,6 @@ describe('ReactFlightDOMBrowser', () => {
});
}

// @gate enableFlightReadableStream
it('should supports streaming ReadableStream with objects', async () => {
const errors = [];
let controller1;
Expand Down Expand Up @@ -2161,7 +2160,6 @@ describe('ReactFlightDOMBrowser', () => {
expect(errors).toEqual(['rejected']);
});

// @gate enableFlightReadableStream
it('should cancels the underlying ReadableStream when we are cancelled', async () => {
let controller;
let cancelReason;
Expand Down Expand Up @@ -2194,7 +2192,6 @@ describe('ReactFlightDOMBrowser', () => {
expect(loggedReason).toBe(reason);
});

// @gate enableFlightReadableStream
it('should cancels the underlying ReadableStream when we abort', async () => {
const errors = [];
let controller;
Expand Down Expand Up @@ -2252,7 +2249,6 @@ describe('ReactFlightDOMBrowser', () => {
expect(errors).toEqual([reason]);
});

// @gate enableFlightReadableStream
it('should supports streaming AsyncIterables with objects', async () => {
let resolve;
const wait = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -2369,7 +2365,6 @@ describe('ReactFlightDOMBrowser', () => {
);
});

// @gate enableFlightReadableStream
it('should cancels the underlying AsyncIterable when we are cancelled', async () => {
let resolve;
const wait = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -2408,7 +2403,6 @@ describe('ReactFlightDOMBrowser', () => {
expect(loggedReason).toBe(reason);
});

// @gate enableFlightReadableStream
it('should cancels the underlying AsyncIterable when we abort', async () => {
const errors = [];
const abortController = new AbortController();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,6 @@ describe('ReactFlightDOMEdge', () => {
expect(result.get('value')).toBe('hello');
});

// @gate enableFlightReadableStream
it('can pass an async import to a ReadableStream while enqueuing in order', async () => {
let resolve;
const promise = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -690,7 +689,6 @@ describe('ReactFlightDOMEdge', () => {
expect(await reader.read()).toEqual({value: undefined, done: true});
});

// @gate enableFlightReadableStream
it('can pass an async import a AsyncIterable while allowing peaking at future values', async () => {
let resolve;
const promise = new Promise(r => (resolve = r));
Expand Down Expand Up @@ -742,7 +740,6 @@ describe('ReactFlightDOMEdge', () => {
expect(await iterator.next()).toEqual({value: undefined, done: true});
});

// @gate enableFlightReadableStream
it('should ideally dedupe objects inside async iterables but does not yet', async () => {
const obj = {
this: {is: 'a large objected'},
Expand Down Expand Up @@ -820,7 +817,6 @@ describe('ReactFlightDOMEdge', () => {
);
});

// @gate enableFlightReadableStream
it('should supports ReadableStreams with typed arrays', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
Expand Down Expand Up @@ -879,7 +875,6 @@ describe('ReactFlightDOMEdge', () => {
expect(streamedBuffers).toEqual(buffers);
});

// @gate enableFlightReadableStream
it('should support BYOB binary ReadableStreams', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ describe('ReactFlightDOMNode', () => {
);
});

// @gate enableFlightReadableStream
it('should cancels the underlying ReadableStream when we are cancelled', async () => {
let controller;
let cancelReason;
Expand Down Expand Up @@ -336,7 +335,6 @@ describe('ReactFlightDOMNode', () => {
);
});

// @gate enableFlightReadableStream
it('should cancels the underlying ReadableStream when we abort', async () => {
const errors = [];
let controller;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ describe('ReactFlightDOMReply', () => {
expect(response.obj).toBe(obj);
});

// @gate enableFlightReadableStream
it('should supports streaming ReadableStream with objects', async () => {
let controller1;
let controller2;
Expand Down Expand Up @@ -511,7 +510,6 @@ describe('ReactFlightDOMReply', () => {
});
});

// @gate enableFlightReadableStream
it('should supports streaming AsyncIterables with objects', async () => {
let resolve;
const wait = new Promise(r => (resolve = r));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ describe('ReactFlightDOMReplyEdge', () => {
expect(await resultBlob.arrayBuffer()).toEqual(await blob.arrayBuffer());
});

// @gate enableFlightReadableStream
it('should supports ReadableStreams with typed arrays', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
Expand Down Expand Up @@ -194,7 +193,6 @@ describe('ReactFlightDOMReplyEdge', () => {
expect(streamedBuffers).toEqual(buffers);
});

// @gate enableFlightReadableStream
it('should support BYOB binary ReadableStreams', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
Expand Down
Loading
Loading