From 7909d8eabb7a702618f51e16a351df41aa8da88e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Markb=C3=A5ge?= Date: Tue, 16 Apr 2024 12:20:07 -0400 Subject: [PATCH] [Flight] Encode ReadableStream and AsyncIterables (#28847) This adds support in Flight for serializing four kinds of streams: - `ReadableStream` with objects as a model. This is a single shot iterator so you can read it only once. It can contain any value including Server Components. Chunks are encoded as is so if you send in 10 typed arrays, you get the same typed arrays out on the other side. - Binary `ReadableStream` with `type: 'bytes'` option. This supports the BYOB protocol. In this mode, the receiving side just gets `Uint8Array`s and they can be split across any single byte boundary into arbitrary chunks. - `AsyncIterable` where the `AsyncIterator` function is different than the `AsyncIterable` itself. In this case we assume that this might be a multi-shot iterable and so we buffer its value and you can iterate it multiple times on the other side. We support the `return` value as a value in the single completion slot, but you can't pass values in `next()`. If you want single-shot, return the AsyncIterator instead. - `AsyncIterator`. These gets serialized as a single-shot as it's just an iterator. `AsyncIterable`/`AsyncIterator` yield Promises that are instrumented with our `.status`/`.value` convention so that they can be synchronously looped over if available. They are also lazily parsed upon read. We can't do this with `ReadableStream` because we use the native implementation of `ReadableStream` which owns the promises. The format is a leading row that indicates which type of stream it is. Then a new row with the same ID is emitted for every chunk. Followed by either an error or close row. `AsyncIterable`s can also be returned as children of Server Components and then they're conceptually the same as fragment arrays/iterables. They can't actually be used as children in Fizz/Fiber but there's a separate plan for that. Only `AsyncIterable` not `AsyncIterator` will be valid as children - just like sync `Iterable` is already supported but single-shot `Iterator` is not. Notably, neither of these streams represent updates over time to a value. They represent multiple values in a list. When the server stream is aborted we also close the underlying stream. However, closing a stream on the client, doesn't close the underlying stream. A couple of possible follow ups I'm not planning on doing right now: - [ ] Free memory by releasing the buffer if an Iterator has been exhausted. Single shots could be optimized further to release individual items as you go. - [ ] We could clean up the underlying stream if the only pending data that's still flowing is from streams and all the streams have cleaned up. It's not very reliable though. It's better to do cancellation for the whole stream - e.g. at the framework level. - [ ] Implement smarter Binary Stream chunk handling. Currently we wait until we've received a whole row for binary chunks and copy them into consecutive memory. We need this to preserve semantics when passing typed arrays. However, for binary streams we don't need that. We can just send whatever pieces we have so far. --- .eslintrc.js | 6 + .../react-client/src/ReactFlightClient.js | 430 +++++++++++++- .../src/__tests__/ReactFlight-test.js | 261 ++++++++- .../__tests__/ReactFlightDOMBrowser-test.js | 381 +++++++++++++ .../src/__tests__/ReactFlightDOMEdge-test.js | 236 ++++++++ .../src/__tests__/ReactFlightDOMNode-test.js | 94 ++++ .../__tests__/ReactFlightDOMReplyEdge-test.js | 58 +- .../react-server/src/ReactFizzThenable.js | 23 +- .../react-server/src/ReactFlightServer.js | 523 ++++++++++++++++-- .../react-server/src/ReactFlightThenable.js | 23 +- packages/shared/ReactFeatureFlags.js | 1 + .../forks/ReactFeatureFlags.native-fb.js | 1 + .../forks/ReactFeatureFlags.native-oss.js | 1 + .../forks/ReactFeatureFlags.test-renderer.js | 1 + ...actFeatureFlags.test-renderer.native-fb.js | 1 + .../ReactFeatureFlags.test-renderer.www.js | 1 + .../shared/forks/ReactFeatureFlags.www.js | 2 + scripts/error-codes/codes.json | 4 +- 18 files changed, 1937 insertions(+), 110 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index 005727b5e14d4..75c0b9fe8aa38 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -499,7 +499,12 @@ module.exports = { DOMHighResTimeStamp: 'readonly', EventListener: 'readonly', Iterable: 'readonly', + AsyncIterable: 'readonly', + $AsyncIterable: 'readonly', + $AsyncIterator: 'readonly', Iterator: 'readonly', + AsyncIterator: 'readonly', + IteratorResult: 'readonly', JSONValue: 'readonly', JSResourceReference: 'readonly', MouseEventHandler: 'readonly', @@ -520,6 +525,7 @@ module.exports = { React$Portal: 'readonly', React$Ref: 'readonly', ReadableStreamController: 'readonly', + ReadableStreamReader: 'readonly', RequestInfo: 'readonly', RequestOptions: 'readonly', StoreAsGlobal: 'readonly', diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index b4739f4d720e3..1fb6ca6c2b0d1 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -42,6 +42,7 @@ import { enableBinaryFlight, enablePostpone, enableRefAsProp, + enableFlightReadableStream, } from 'shared/ReactFeatureFlags'; import { @@ -68,6 +69,13 @@ import { export type {CallServerCallback, EncodeFormActionCallback}; +interface FlightStreamController { + enqueueValue(value: any): void; + enqueueModel(json: UninitializedModel): void; + close(json: UninitializedModel): void; + error(error: Error): void; +} + type UninitializedModel = string; export type JSONValue = @@ -100,7 +108,7 @@ type PendingChunk = { reason: null | Array<(mixed) => mixed>, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type BlockedChunk = { status: 'blocked', @@ -108,7 +116,7 @@ type BlockedChunk = { reason: null | Array<(mixed) => mixed>, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type CyclicChunk = { status: 'cyclic', @@ -116,7 +124,7 @@ type CyclicChunk = { reason: null | Array<(mixed) => mixed>, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type ResolvedModelChunk = { status: 'resolved_model', @@ -124,7 +132,7 @@ type ResolvedModelChunk = { reason: null, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type ResolvedModuleChunk = { status: 'resolved_module', @@ -132,15 +140,24 @@ type ResolvedModuleChunk = { reason: null, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type InitializedChunk = { status: 'fulfilled', value: T, - reason: null, + reason: null | FlightStreamController, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, +}; +type InitializedStreamChunk< + T: ReadableStream | $AsyncIterable, +> = { + status: 'fulfilled', + value: T, + reason: FlightStreamController, + _response: Response, + then(resolve: (ReadableStream) => mixed, reject?: (mixed) => mixed): void, }; type ErroredChunk = { status: 'rejected', @@ -148,7 +165,7 @@ type ErroredChunk = { reason: mixed, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type SomeChunk = | PendingChunk @@ -175,7 +192,7 @@ Chunk.prototype = (Object.create(Promise.prototype): any); Chunk.prototype.then = function ( this: SomeChunk, resolve: (value: T) => mixed, - reject: (reason: mixed) => mixed, + reject?: (reason: mixed) => mixed, ) { const chunk: SomeChunk = this; // If we have resolved content, we try to initialize it first which @@ -210,7 +227,9 @@ Chunk.prototype.then = function ( } break; default: - reject(chunk.reason); + if (reject) { + reject(chunk.reason); + } break; } }; @@ -312,7 +331,14 @@ function wakeChunkIfInitialized( function triggerErrorOnChunk(chunk: SomeChunk, error: mixed): void { if (chunk.status !== PENDING && chunk.status !== BLOCKED) { - // We already resolved. We didn't expect to see this. + if (enableFlightReadableStream) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + // $FlowFixMe[incompatible-call]: The error method should accept mixed. + controller.error(error); + } return; } const listeners = chunk.reason; @@ -356,12 +382,63 @@ function createInitializedBufferChunk( return new Chunk(INITIALIZED, value, null, response); } +function createInitializedIteratorResultChunk( + response: Response, + value: T, + done: boolean, +): InitializedChunk> { + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(INITIALIZED, {done: done, value: value}, null, response); +} + +function createInitializedStreamChunk< + T: ReadableStream | $AsyncIterable, +>( + response: Response, + value: T, + controller: FlightStreamController, +): InitializedChunk { + // We use the reason field to stash the controller since we already have that + // field. It's a bit of a hack but efficient. + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(INITIALIZED, value, controller, response); +} + +function createResolvedIteratorResultChunk( + response: Response, + value: UninitializedModel, + done: boolean, +): ResolvedModelChunk> { + // To reuse code as much code as possible we add the wrapper element as part of the JSON. + const iteratorResultJSON = + (done ? '{"done":true,"value":' : '{"done":false,"value":') + value + '}'; + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(RESOLVED_MODEL, iteratorResultJSON, null, response); +} + +function resolveIteratorResultChunk( + chunk: SomeChunk>, + value: UninitializedModel, + done: boolean, +): void { + // To reuse code as much code as possible we add the wrapper element as part of the JSON. + const iteratorResultJSON = + (done ? '{"done":true,"value":' : '{"done":false,"value":') + value + '}'; + resolveModelChunk(chunk, iteratorResultJSON); +} + function resolveModelChunk( chunk: SomeChunk, value: UninitializedModel, ): void { if (chunk.status !== PENDING) { - // We already resolved. We didn't expect to see this. + if (enableFlightReadableStream) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + controller.enqueueModel(value); + } return; } const resolveListeners = chunk.value; @@ -685,6 +762,7 @@ function getOutlinedModel( typeof chunkValue === 'object' && chunkValue !== null && (Array.isArray(chunkValue) || + typeof chunkValue[ASYNC_ITERATOR] === 'function' || chunkValue.$$typeof === REACT_ELEMENT_TYPE) && !chunkValue._debugInfo ) { @@ -966,8 +1044,17 @@ function resolveModel( function resolveText(response: Response, id: number, text: string): void { const chunks = response._chunks; - // We assume that we always reference large strings after they've been - // emitted. + if (enableFlightReadableStream) { + const chunk = chunks.get(id); + if (chunk && chunk.status !== PENDING) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + controller.enqueueValue(text); + return; + } + } chunks.set(id, createInitializedTextChunk(response, text)); } @@ -977,7 +1064,17 @@ function resolveBuffer( buffer: $ArrayBufferView | ArrayBuffer, ): void { const chunks = response._chunks; - // We assume that we always reference buffers after they've been emitted. + if (enableFlightReadableStream) { + const chunk = chunks.get(id); + if (chunk && chunk.status !== PENDING) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + controller.enqueueValue(buffer); + return; + } + } chunks.set(id, createInitializedBufferChunk(response, buffer)); } @@ -1035,6 +1132,268 @@ function resolveModule( } } +function resolveStream>( + response: Response, + id: number, + stream: T, + controller: FlightStreamController, +): void { + const chunks = response._chunks; + const chunk = chunks.get(id); + if (!chunk) { + chunks.set(id, createInitializedStreamChunk(response, stream, controller)); + return; + } + if (chunk.status !== PENDING) { + // We already resolved. We didn't expect to see this. + return; + } + const resolveListeners = chunk.value; + const resolvedChunk: InitializedStreamChunk = (chunk: any); + resolvedChunk.status = INITIALIZED; + resolvedChunk.value = stream; + resolvedChunk.reason = controller; + if (resolveListeners !== null) { + wakeChunk(resolveListeners, chunk.value); + } +} + +function startReadableStream( + response: Response, + id: number, + type: void | 'bytes', +): void { + let controller: ReadableStreamController = (null: any); + const stream = new ReadableStream({ + type: type, + start(c) { + controller = c; + }, + }); + let previousBlockedChunk: SomeChunk | null = null; + const flightController = { + enqueueValue(value: T): void { + if (previousBlockedChunk === null) { + controller.enqueue(value); + } else { + // We're still waiting on a previous chunk so we can't enqueue quite yet. + previousBlockedChunk.then(function () { + controller.enqueue(value); + }); + } + }, + enqueueModel(json: UninitializedModel): void { + if (previousBlockedChunk === null) { + // If we're not blocked on any other chunks, we can try to eagerly initialize + // this as a fast-path to avoid awaiting them. + const chunk: ResolvedModelChunk = createResolvedModelChunk( + response, + json, + ); + initializeModelChunk(chunk); + const initializedChunk: SomeChunk = chunk; + if (initializedChunk.status === INITIALIZED) { + controller.enqueue(initializedChunk.value); + } else { + chunk.then( + v => controller.enqueue(v), + e => controller.error((e: any)), + ); + previousBlockedChunk = chunk; + } + } else { + // We're still waiting on a previous chunk so we can't enqueue quite yet. + const blockedChunk = previousBlockedChunk; + const chunk: SomeChunk = createPendingChunk(response); + chunk.then( + v => controller.enqueue(v), + e => controller.error((e: any)), + ); + previousBlockedChunk = chunk; + blockedChunk.then(function () { + if (previousBlockedChunk === chunk) { + // We were still the last chunk so we can now clear the queue and return + // to synchronous emitting. + previousBlockedChunk = null; + } + resolveModelChunk(chunk, json); + }); + } + }, + close(json: UninitializedModel): void { + if (previousBlockedChunk === null) { + controller.close(); + } else { + const blockedChunk = previousBlockedChunk; + // We shouldn't get any more enqueues after this so we can set it back to null. + previousBlockedChunk = null; + blockedChunk.then(() => controller.close()); + } + }, + error(error: mixed): void { + if (previousBlockedChunk === null) { + // $FlowFixMe[incompatible-call] + controller.error(error); + } else { + const blockedChunk = previousBlockedChunk; + // We shouldn't get any more enqueues after this so we can set it back to null. + previousBlockedChunk = null; + blockedChunk.then(() => controller.error((error: any))); + } + }, + }; + resolveStream(response, id, stream, flightController); +} + +const ASYNC_ITERATOR = Symbol.asyncIterator; + +function asyncIterator(this: $AsyncIterator) { + // Self referencing iterator. + return this; +} + +function createIterator( + next: (arg: void) => SomeChunk>, +): $AsyncIterator { + const iterator: any = { + next: next, + // TODO: Add return/throw as options for aborting. + }; + // TODO: The iterator could inherit the AsyncIterator prototype which is not exposed as + // a global but exists as a prototype of an AsyncGenerator. However, it's not needed + // to satisfy the iterable protocol. + (iterator: any)[ASYNC_ITERATOR] = asyncIterator; + return iterator; +} + +function startAsyncIterable( + response: Response, + id: number, + iterator: boolean, +): void { + const buffer: Array>> = []; + let closed = false; + let nextWriteIndex = 0; + const flightController = { + enqueueValue(value: T): void { + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createInitializedIteratorResultChunk( + response, + value, + false, + ); + } else { + const chunk: PendingChunk> = (buffer[ + nextWriteIndex + ]: any); + const resolveListeners = chunk.value; + const rejectListeners = chunk.reason; + const initializedChunk: InitializedChunk> = + (chunk: any); + initializedChunk.status = INITIALIZED; + initializedChunk.value = {done: false, value: value}; + if (resolveListeners !== null) { + wakeChunkIfInitialized(chunk, resolveListeners, rejectListeners); + } + } + nextWriteIndex++; + }, + enqueueModel(value: UninitializedModel): void { + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createResolvedIteratorResultChunk( + response, + value, + false, + ); + } else { + resolveIteratorResultChunk(buffer[nextWriteIndex], value, false); + } + nextWriteIndex++; + }, + close(value: UninitializedModel): void { + closed = true; + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createResolvedIteratorResultChunk( + response, + value, + true, + ); + } else { + resolveIteratorResultChunk(buffer[nextWriteIndex], value, true); + } + nextWriteIndex++; + while (nextWriteIndex < buffer.length) { + // In generators, any extra reads from the iterator have the value undefined. + resolveIteratorResultChunk( + buffer[nextWriteIndex++], + '"$undefined"', + true, + ); + } + }, + error(error: Error): void { + closed = true; + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = + createPendingChunk>(response); + } + while (nextWriteIndex < buffer.length) { + triggerErrorOnChunk(buffer[nextWriteIndex++], error); + } + }, + }; + const iterable: $AsyncIterable = { + [ASYNC_ITERATOR](): $AsyncIterator { + let nextReadIndex = 0; + return createIterator(arg => { + if (arg !== undefined) { + throw new Error( + 'Values cannot be passed to next() of AsyncIterables passed to Client Components.', + ); + } + if (nextReadIndex === buffer.length) { + if (closed) { + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk( + INITIALIZED, + {done: true, value: undefined}, + null, + response, + ); + } + buffer[nextReadIndex] = + createPendingChunk>(response); + } + return buffer[nextReadIndex++]; + }); + }, + }; + // TODO: If it's a single shot iterator we can optimize memory by cleaning up the buffer after + // reading through the end, but currently we favor code size over this optimization. + resolveStream( + response, + id, + iterator ? iterable[ASYNC_ITERATOR]() : iterable, + flightController, + ); +} + +function stopStream( + response: Response, + id: number, + row: UninitializedModel, +): void { + const chunks = response._chunks; + const chunk = chunks.get(id); + if (!chunk || chunk.status !== INITIALIZED) { + // We didn't expect not to have an existing stream; + return; + } + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + controller.close(row === '' ? '"$undefined"' : row); +} + type ErrorWithDigest = Error & {digest?: string}; function resolveErrorProd( response: Response, @@ -1362,6 +1721,41 @@ function processFullRow( 'matching versions on the server and the client.', ); } + case 82 /* "R" */: { + if (enableFlightReadableStream) { + startReadableStream(response, id, undefined); + return; + } + } + // Fallthrough + case 114 /* "r" */: { + if (enableFlightReadableStream) { + startReadableStream(response, id, 'bytes'); + return; + } + } + // Fallthrough + case 88 /* "X" */: { + if (enableFlightReadableStream) { + startAsyncIterable(response, id, false); + return; + } + } + // Fallthrough + case 120 /* "x" */: { + if (enableFlightReadableStream) { + startAsyncIterable(response, id, true); + return; + } + } + // Fallthrough + case 67 /* "C" */: { + if (enableFlightReadableStream) { + stopStream(response, id, row); + return; + } + } + // Fallthrough case 80 /* "P" */: { if (enablePostpone) { if (__DEV__) { @@ -1433,7 +1827,11 @@ export function processBinaryChunk( rowTag = resolvedRowTag; rowState = ROW_LENGTH; i++; - } else if (resolvedRowTag > 64 && resolvedRowTag < 91 /* "A"-"Z" */) { + } else if ( + (resolvedRowTag > 64 && resolvedRowTag < 91) /* "A"-"Z" */ || + resolvedRowTag === 114 /* "r" */ || + resolvedRowTag === 120 /* "x" */ + ) { rowTag = resolvedRowTag; rowState = ROW_CHUNK_BY_NEWLINE; i++; diff --git a/packages/react-client/src/__tests__/ReactFlight-test.js b/packages/react-client/src/__tests__/ReactFlight-test.js index 2164ca0b2781a..4b8fb5c3c43c9 100644 --- a/packages/react-client/src/__tests__/ReactFlight-test.js +++ b/packages/react-client/src/__tests__/ReactFlight-test.js @@ -2068,6 +2068,149 @@ describe('ReactFlight', () => { expect(ReactNoop).toMatchRenderedOutput(
Ba
); }); + it('shares state when moving keyed Server Components that render fragments', async () => { + function StatefulClient({name, initial}) { + const [state] = React.useState(initial); + return {state}; + } + const Stateful = clientReference(StatefulClient); + + function ServerComponent({item, initial}) { + return [ + , + , + ]; + } + + const transport = ReactNoopFlightServer.render( +
+ + +
, + ); + + await act(async () => { + ReactNoop.render(await ReactNoopFlightClient.read(transport)); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ a1 + b1 + a2 + b2 +
, + ); + + // We swap the Server Components and the state of each child inside each fragment should move. + // Really the Fragment itself moves. + const transport2 = ReactNoopFlightServer.render( +
+ + +
, + ); + + await act(async () => { + ReactNoop.render(await ReactNoopFlightClient.read(transport2)); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ a2 + b2 + a1 + b1 +
, + ); + }); + + // @gate enableFlightReadableStream + it('shares state when moving keyed Server Components that render async iterables', async () => { + function StatefulClient({name, initial}) { + const [state] = React.useState(initial); + return {state}; + } + const Stateful = clientReference(StatefulClient); + + function ServerComponent({item, initial}) { + // While the ServerComponent itself could be an async generator, single-shot iterables + // are not supported as React children since React might need to re-map them based on + // state updates. So we create an AsyncIterable instead. + return { + async *[Symbol.asyncIterator]() { + yield ; + yield ; + }, + }; + } + + function ListClient({children}) { + // TODO: Unwrap AsyncIterables natively in React. For now we do it in this wrapper. + const resolvedChildren = []; + // eslint-disable-next-line no-for-of-loops/no-for-of-loops + for (const fragment of children) { + // We should've wrapped each child in a keyed Fragment. + expect(fragment.type).toBe(React.Fragment); + const fragmentChildren = []; + const iterator = fragment.props.children[Symbol.asyncIterator](); + for (let entry; !(entry = React.use(iterator.next())).done; ) { + fragmentChildren.push(entry.value); + } + resolvedChildren.push( + + {fragmentChildren} + , + ); + } + return
{resolvedChildren}
; + } + + const List = clientReference(ListClient); + + const transport = ReactNoopFlightServer.render( + + + + , + ); + + await act(async () => { + ReactNoop.render(await ReactNoopFlightClient.read(transport)); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ a1 + b1 + a2 + b2 +
, + ); + + // We swap the Server Components and the state of each child inside each fragment should move. + // Really the Fragment itself moves. + const transport2 = ReactNoopFlightServer.render( + + + + , + ); + + await act(async () => { + ReactNoop.render(await ReactNoopFlightClient.read(transport2)); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ a2 + b2 + a1 + b1 +
, + ); + }); + it('preserves debug info for server-to-server pass through', async () => { function ThirdPartyLazyComponent() { return !; @@ -2081,6 +2224,10 @@ describe('ReactFlight', () => { return stranger; } + function ThirdPartyFragmentComponent() { + return [Who, ' ', dis?]; + } + function ServerComponent({transport}) { // This is a Server Component that receives other Server Components from a third party. const children = ReactNoopFlightClient.read(transport); @@ -2090,7 +2237,7 @@ describe('ReactFlight', () => { const promiseComponent = Promise.resolve(); const thirdPartyTransport = ReactNoopFlightServer.render( - [promiseComponent, lazy], + [promiseComponent, lazy, ], { environmentName: 'third-party', }, @@ -2123,6 +2270,17 @@ describe('ReactFlight', () => { ? [{name: 'ThirdPartyLazyComponent', env: 'third-party', owner: null}] : undefined, ); + expect(thirdPartyChildren[2]._debugInfo).toEqual( + __DEV__ + ? [ + { + name: 'ThirdPartyFragmentComponent', + env: 'third-party', + owner: null, + }, + ] + : undefined, + ); ReactNoop.render(result); }); @@ -2130,6 +2288,107 @@ describe('ReactFlight', () => {
Hello, stranger ! + Who dis? +
, + ); + }); + + // @gate enableFlightReadableStream + it('preserves debug info for server-to-server pass through of async iterables', async () => { + let resolve; + const iteratorPromise = new Promise(r => (resolve = r)); + + function ThirdPartyAsyncIterableComponent({item, initial}) { + // While the ServerComponent itself could be an async generator, single-shot iterables + // are not supported as React children since React might need to re-map them based on + // state updates. So we create an AsyncIterable instead. + return { + async *[Symbol.asyncIterator]() { + yield Who; + yield dis?; + resolve(); + }, + }; + } + + function ListClient({children: fragment}) { + // TODO: Unwrap AsyncIterables natively in React. For now we do it in this wrapper. + const resolvedChildren = []; + const iterator = fragment.props.children[Symbol.asyncIterator](); + for (let entry; !(entry = React.use(iterator.next())).done; ) { + resolvedChildren.push(entry.value); + } + return
{resolvedChildren}
; + } + + const List = clientReference(ListClient); + + function Keyed({children}) { + // Keying this should generate a fragment. + return children; + } + + function ServerComponent({transport}) { + // This is a Server Component that receives other Server Components from a third party. + const children = ReactServer.use( + ReactNoopFlightClient.read(transport), + ).root; + return ( + + {children} + + ); + } + + const thirdPartyTransport = ReactNoopFlightServer.render( + {root: }, + { + environmentName: 'third-party', + }, + ); + + if (gate(flag => flag.enableFlightReadableStream)) { + // Wait for the iterator to finish + await iteratorPromise; + } + await 0; // One more tick for the return value / closing. + + const transport = ReactNoopFlightServer.render( + , + ); + + await act(async () => { + const promise = ReactNoopFlightClient.read(transport); + expect(promise._debugInfo).toEqual( + __DEV__ + ? [{name: 'ServerComponent', env: 'Server', owner: null}] + : undefined, + ); + const result = await promise; + const thirdPartyFragment = await result.props.children; + expect(thirdPartyFragment._debugInfo).toEqual( + __DEV__ ? [{name: 'Keyed', env: 'Server', owner: null}] : undefined, + ); + // We expect the debug info to be transferred from the inner stream to the outer. + expect(thirdPartyFragment.props.children._debugInfo).toEqual( + __DEV__ + ? [ + { + name: 'ThirdPartyAsyncIterableComponent', + env: 'third-party', + owner: null, + }, + ] + : undefined, + ); + + ReactNoop.render(result); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ Who + dis?
, ); }); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js index 1f5034f4bf985..c92f7799bb6e8 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js @@ -1406,4 +1406,385 @@ describe('ReactFlightDOMBrowser', () => { expect(postponed).toBe('testing postpone'); expect(error).toBe(null); }); + + function passThrough(stream) { + // Simulate more realistic network by splitting up and rejoining some chunks. + // This lets us test that we don't accidentally rely on particular bounds of the chunks. + return new ReadableStream({ + async start(controller) { + const reader = stream.getReader(); + function push() { + reader.read().then(({done, value}) => { + if (done) { + controller.close(); + return; + } + controller.enqueue(value); + push(); + return; + }); + } + push(); + }, + }); + } + + // @gate enableFlightReadableStream + it('should supports streaming ReadableStream with objects', async () => { + const errors = []; + let controller1; + let controller2; + const s1 = new ReadableStream({ + start(c) { + controller1 = c; + }, + }); + const s2 = new ReadableStream({ + start(c) { + controller2 = c; + }, + }); + const rscStream = ReactServerDOMServer.renderToReadableStream( + { + s1, + s2, + }, + {}, + { + onError(x) { + errors.push(x); + return x; + }, + }, + ); + const result = await ReactServerDOMClient.createFromReadableStream( + passThrough(rscStream), + ); + const reader1 = result.s1.getReader(); + const reader2 = result.s2.getReader(); + + controller1.enqueue({hello: 'world'}); + controller2.enqueue({hi: 'there'}); + expect(await reader1.read()).toEqual({ + value: {hello: 'world'}, + done: false, + }); + expect(await reader2.read()).toEqual({ + value: {hi: 'there'}, + done: false, + }); + + controller1.enqueue('text1'); + controller2.enqueue('text2'); + controller1.close(); + controller2.error('rejected'); + + expect(await reader1.read()).toEqual({ + value: 'text1', + done: false, + }); + expect(await reader1.read()).toEqual({ + value: undefined, + done: true, + }); + expect(await reader2.read()).toEqual({ + value: 'text2', + done: false, + }); + let error = null; + try { + await reader2.read(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('rejected'); + expect(errors).toEqual(['rejected']); + }); + + // @gate enableFlightReadableStream + it('should cancels the underlying ReadableStream when we are cancelled', async () => { + let controller; + let cancelReason; + const s = new ReadableStream({ + start(c) { + controller = c; + }, + cancel(r) { + cancelReason = r; + }, + }); + let loggedReason; + const rscStream = ReactServerDOMServer.renderToReadableStream( + s, + {}, + { + onError(reason) { + loggedReason = reason; + }, + }, + ); + const reader = rscStream.getReader(); + controller.enqueue('hi'); + const reason = new Error('aborted'); + reader.cancel(reason); + await reader.read(); + expect(cancelReason).toBe(reason); + expect(loggedReason).toBe(reason); + }); + + // @gate enableFlightReadableStream + it('should cancels the underlying ReadableStream when we abort', async () => { + const errors = []; + let controller; + let cancelReason; + const abortController = new AbortController(); + const s = new ReadableStream({ + start(c) { + controller = c; + }, + cancel(r) { + cancelReason = r; + }, + }); + const rscStream = ReactServerDOMServer.renderToReadableStream( + s, + {}, + { + signal: abortController.signal, + onError(x) { + errors.push(x); + return x.message; + }, + }, + ); + const result = await ReactServerDOMClient.createFromReadableStream( + passThrough(rscStream), + ); + const reader = result.getReader(); + controller.enqueue('hi'); + + await 0; + + const reason = new Error('aborted'); + abortController.abort(reason); + + // We should be able to read the part we already emitted before the abort + expect(await reader.read()).toEqual({ + value: 'hi', + done: false, + }); + + expect(cancelReason).toBe(reason); + + let error = null; + try { + await reader.read(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('aborted'); + expect(errors).toEqual([reason]); + }); + + // @gate enableFlightReadableStream + it('should supports streaming AsyncIterables with objects', async () => { + let resolve; + const wait = new Promise(r => (resolve = r)); + const errors = []; + const multiShotIterable = { + async *[Symbol.asyncIterator]() { + const next = yield {hello: 'A'}; + expect(next).toBe(undefined); + await wait; + yield {hi: 'B'}; + return 'C'; + }, + }; + const singleShotIterator = (async function* () { + const next = yield {hello: 'D'}; + expect(next).toBe(undefined); + await wait; + yield {hi: 'E'}; + // eslint-disable-next-line no-throw-literal + throw 'F'; + })(); + + const rscStream = ReactServerDOMServer.renderToReadableStream( + { + multiShotIterable, + singleShotIterator, + }, + {}, + { + onError(x) { + errors.push(x); + return x; + }, + }, + ); + const result = await ReactServerDOMClient.createFromReadableStream( + passThrough(rscStream), + ); + + const iterator1 = result.multiShotIterable[Symbol.asyncIterator](); + const iterator2 = result.singleShotIterator[Symbol.asyncIterator](); + + expect(iterator1).not.toBe(result.multiShotIterable); + expect(iterator2).toBe(result.singleShotIterator); + + expect(await iterator1.next()).toEqual({ + value: {hello: 'A'}, + done: false, + }); + expect(await iterator2.next()).toEqual({ + value: {hello: 'D'}, + done: false, + }); + + await resolve(); + + expect(await iterator1.next()).toEqual({ + value: {hi: 'B'}, + done: false, + }); + expect(await iterator2.next()).toEqual({ + value: {hi: 'E'}, + done: false, + }); + expect(await iterator1.next()).toEqual({ + value: 'C', // Return value + done: true, + }); + expect(await iterator1.next()).toEqual({ + value: undefined, + done: true, + }); + + let error = null; + try { + await iterator2.next(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('F'); + expect(errors).toEqual(['F']); + + // Multi-shot iterables should be able to do the same thing again + const iterator3 = result.multiShotIterable[Symbol.asyncIterator](); + + expect(iterator3).not.toBe(iterator1); + + // We should be able to iterate over the iterable again and it should be + // synchronously available using instrumented promises so that React can + // rerender it synchronously. + expect(iterator3.next().value).toEqual({ + value: {hello: 'A'}, + done: false, + }); + expect(iterator3.next().value).toEqual({ + value: {hi: 'B'}, + done: false, + }); + expect(iterator3.next().value).toEqual({ + value: 'C', // Return value + done: true, + }); + expect(iterator3.next().value).toEqual({ + value: undefined, + done: true, + }); + + expect(() => iterator3.next('this is not allowed')).toThrow( + 'Values cannot be passed to next() of AsyncIterables passed to Client Components.', + ); + }); + + // @gate enableFlightReadableStream + it('should cancels the underlying AsyncIterable when we are cancelled', async () => { + let resolve; + const wait = new Promise(r => (resolve = r)); + let thrownReason; + const iterator = (async function* () { + try { + await wait; + yield 'a'; + yield 'b'; + } catch (x) { + thrownReason = x; + } + yield 'c'; + })(); + let loggedReason; + const rscStream = ReactServerDOMServer.renderToReadableStream( + iterator, + {}, + { + onError(reason) { + loggedReason = reason; + }, + }, + ); + const reader = rscStream.getReader(); + const reason = new Error('aborted'); + reader.cancel(reason); + await resolve(); + await reader.read(); + expect(thrownReason).toBe(reason); + expect(loggedReason).toBe(reason); + }); + + // @gate enableFlightReadableStream + it('should cancels the underlying AsyncIterable when we abort', async () => { + const errors = []; + const abortController = new AbortController(); + let resolve; + const wait = new Promise(r => (resolve = r)); + let thrownReason; + const iterator = (async function* () { + try { + yield 'a'; + await wait; + yield 'b'; + } catch (x) { + thrownReason = x; + } + yield 'c'; + })(); + const rscStream = ReactServerDOMServer.renderToReadableStream( + iterator, + {}, + { + signal: abortController.signal, + onError(x) { + errors.push(x); + return x.message; + }, + }, + ); + const result = await ReactServerDOMClient.createFromReadableStream( + passThrough(rscStream), + ); + + const reason = new Error('aborted'); + abortController.abort(reason); + + await resolve(); + + // We should be able to read the part we already emitted before the abort + expect(await result.next()).toEqual({ + value: 'a', + done: false, + }); + + expect(thrownReason).toBe(reason); + + let error = null; + try { + await result.next(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('aborted'); + expect(errors).toEqual([reason]); + }); }); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js index e7b4e06f49f87..6e2e02047bbe6 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js @@ -80,6 +80,7 @@ describe('ReactFlightDOMEdge', () => { reader.read().then(({done, value}) => { if (done) { controller.enqueue(prevChunk); + prevChunk = new Uint8Array(0); controller.close(); return; } @@ -90,7 +91,18 @@ describe('ReactFlightDOMEdge', () => { controller.enqueue(chunk.subarray(0, chunk.length - 50)); prevChunk = chunk.subarray(chunk.length - 50); } else { + // Wait to see if we get some more bytes to join in. prevChunk = chunk; + // Flush if we don't get any more. + (async function flushAfterAFewTasks() { + for (let i = 0; i < 10; i++) { + await i; + } + if (prevChunk.byteLength > 0) { + controller.enqueue(prevChunk); + } + prevChunk = new Uint8Array(0); + })(); } push(); }); @@ -112,6 +124,18 @@ describe('ReactFlightDOMEdge', () => { } } + async function readByteLength(stream) { + const reader = stream.getReader(); + let length = 0; + while (true) { + const {done, value} = await reader.read(); + if (done) { + return length; + } + length += value.byteLength; + } + } + it('should allow an alternative module mapping to be used for SSR', async () => { function ClientComponent() { return Client Component; @@ -430,6 +454,97 @@ describe('ReactFlightDOMEdge', () => { expect(result.get('value')).toBe('hello'); }); + // @gate enableFlightReadableStream + it('can pass an async import to a ReadableStream while enqueuing in order', async () => { + let resolve; + const promise = new Promise(r => (resolve = r)); + + const asyncClient = clientExports(promise); + + // We await the value on the servers so it's an async value that the client should wait for + const awaitedValue = await asyncClient; + + const s = new ReadableStream({ + start(c) { + c.enqueue('hello'); + c.enqueue(awaitedValue); + c.enqueue('!'); + c.close(); + }, + }); + + const stream = passThrough( + ReactServerDOMServer.renderToReadableStream(s, webpackMap), + ); + + const result = await ReactServerDOMClient.createFromReadableStream(stream, { + ssrManifest: { + moduleMap: null, + moduleLoading: null, + }, + }); + + const reader = result.getReader(); + + expect(await reader.read()).toEqual({value: 'hello', done: false}); + + const readPromise = reader.read(); + // We resolve this after we've already received the '!' row. + await resolve('world'); + + expect(await readPromise).toEqual({value: 'world', done: false}); + expect(await reader.read()).toEqual({value: '!', done: false}); + expect(await reader.read()).toEqual({value: undefined, done: true}); + }); + + // @gate enableFlightReadableStream + it('can pass an async import a AsyncIterable while allowing peaking at future values', async () => { + let resolve; + const promise = new Promise(r => (resolve = r)); + + const asyncClient = clientExports(promise); + + const multiShotIterable = { + async *[Symbol.asyncIterator]() { + yield 'hello'; + // We await the value on the servers so it's an async value that the client should wait for + yield await asyncClient; + yield '!'; + }, + }; + + const stream = passThrough( + ReactServerDOMServer.renderToReadableStream( + multiShotIterable, + webpackMap, + ), + ); + + // Parsing the root blocks because the module hasn't loaded yet + const result = await ReactServerDOMClient.createFromReadableStream(stream, { + ssrManifest: { + moduleMap: null, + moduleLoading: null, + }, + }); + + const iterator = result[Symbol.asyncIterator](); + + expect(await iterator.next()).toEqual({value: 'hello', done: false}); + + const readPromise = iterator.next(); + + // While the previous promise didn't resolve yet, we should be able to peak at the next value + // by iterating past it. + expect(await iterator.next()).toEqual({value: '!', done: false}); + + // We resolve the previous row after we've already received the '!' row. + await resolve('world'); + expect(await readPromise).toEqual({value: 'world', done: false}); + + expect(await iterator.next()).toEqual({value: undefined, done: true}); + }); + it('warns if passing a this argument to bind() of a server reference', async () => { const ServerModule = serverExports({ greet: function () {}, @@ -456,4 +571,125 @@ describe('ReactFlightDOMEdge', () => { {withoutStack: true}, ); }); + + // @gate enableFlightReadableStream && enableBinaryFlight + it('should supports ReadableStreams with typed arrays', async () => { + const buffer = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]).buffer; + const buffers = [ + buffer, + new Int8Array(buffer, 1), + new Uint8Array(buffer, 2), + new Uint8ClampedArray(buffer, 2), + new Int16Array(buffer, 2), + new Uint16Array(buffer, 2), + new Int32Array(buffer, 4), + new Uint32Array(buffer, 4), + new Float32Array(buffer, 4), + new Float64Array(buffer, 0), + new BigInt64Array(buffer, 0), + new BigUint64Array(buffer, 0), + new DataView(buffer, 3), + ]; + + // This is not a binary stream, it's a stream that contain binary chunks. + const s = new ReadableStream({ + start(c) { + for (let i = 0; i < buffers.length; i++) { + c.enqueue(buffers[i]); + } + c.close(); + }, + }); + + const stream = ReactServerDOMServer.renderToReadableStream(s, {}); + + const [stream1, stream2] = passThrough(stream).tee(); + + const result = await ReactServerDOMClient.createFromReadableStream( + stream1, + { + ssrManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + expect(await readByteLength(stream2)).toBeLessThan(300); + + const streamedBuffers = []; + const reader = result.getReader(); + let entry; + while (!(entry = await reader.read()).done) { + streamedBuffers.push(entry.value); + } + + expect(streamedBuffers).toEqual(buffers); + }); + + // @gate enableFlightReadableStream && enableBinaryFlight + it('should support BYOB binary ReadableStreams', async () => { + const buffer = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]).buffer; + const buffers = [ + new Int8Array(buffer, 1), + new Uint8Array(buffer, 2), + new Uint8ClampedArray(buffer, 2), + new Int16Array(buffer, 2), + new Uint16Array(buffer, 2), + new Int32Array(buffer, 4), + new Uint32Array(buffer, 4), + new Float32Array(buffer, 4), + new Float64Array(buffer, 0), + new BigInt64Array(buffer, 0), + new BigUint64Array(buffer, 0), + new DataView(buffer, 3), + ]; + + // This a binary stream where each chunk ends up as Uint8Array. + const s = new ReadableStream({ + type: 'bytes', + start(c) { + for (let i = 0; i < buffers.length; i++) { + c.enqueue(buffers[i]); + } + c.close(); + }, + }); + + const stream = ReactServerDOMServer.renderToReadableStream(s, {}); + + const [stream1, stream2] = passThrough(stream).tee(); + + const result = await ReactServerDOMClient.createFromReadableStream( + stream1, + { + ssrManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + expect(await readByteLength(stream2)).toBeLessThan(300); + + const streamedBuffers = []; + const reader = result.getReader({mode: 'byob'}); + let entry; + while (!(entry = await reader.read(new Uint8Array(10))).done) { + expect(entry.value instanceof Uint8Array).toBe(true); + streamedBuffers.push(entry.value); + } + + // The streamed buffers might be in different chunks and in Uint8Array form but + // the concatenated bytes should be the same. + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( + buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ), + ); + }); }); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js index 87fc83360018e..df1850896d827 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js @@ -9,6 +9,9 @@ 'use strict'; +global.ReadableStream = + require('web-streams-polyfill/ponyfill/es6').ReadableStream; + // Don't wait before processing work on the server. // TODO: we can replace this with FlightServer.act(). global.setImmediate = cb => cb(); @@ -258,4 +261,95 @@ describe('ReactFlightDOMNode', () => { 'Client Component', ); }); + + // @gate enableFlightReadableStream + it('should cancels the underlying ReadableStream when we are cancelled', async () => { + let controller; + let cancelReason; + const s = new ReadableStream({ + start(c) { + controller = c; + }, + cancel(r) { + cancelReason = r; + }, + }); + + const rscStream = ReactServerDOMServer.renderToPipeableStream( + s, + {}, + { + onError(error) { + return error.message; + }, + }, + ); + + const writable = new Stream.PassThrough(); + rscStream.pipe(writable); + + controller.enqueue('hi'); + + const reason = new Error('aborted'); + writable.destroy(reason); + + await new Promise(resolve => { + writable.on('error', () => { + resolve(); + }); + }); + + expect(cancelReason.message).toBe( + 'The destination stream errored while writing data.', + ); + }); + + // @gate enableFlightReadableStream + it('should cancels the underlying ReadableStream when we abort', async () => { + const errors = []; + let controller; + let cancelReason; + const s = new ReadableStream({ + start(c) { + controller = c; + }, + cancel(r) { + cancelReason = r; + }, + }); + const rscStream = ReactServerDOMServer.renderToPipeableStream( + s, + {}, + { + onError(x) { + errors.push(x); + return x.message; + }, + }, + ); + + const readable = new Stream.PassThrough(); + rscStream.pipe(readable); + + const result = await ReactServerDOMClient.createFromNodeStream(readable, { + moduleMap: {}, + moduleLoading: webpackModuleLoading, + }); + const reader = result.getReader(); + controller.enqueue('hi'); + + const reason = new Error('aborted'); + rscStream.abort(reason); + + expect(cancelReason).toBe(reason); + + let error = null; + try { + await reader.read(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('aborted'); + expect(errors).toEqual([reason]); + }); }); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js index d7000de7f3526..00a53a590c5e1 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js @@ -109,33 +109,35 @@ describe('ReactFlightDOMReplyEdge', () => { expect(await result.arrayBuffer()).toEqual(await blob.arrayBuffer()); }); - it('can transport FormData (blobs)', async () => { - const bytes = new Uint8Array([ - 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, - ]); - const blob = new Blob([bytes, bytes], { - type: 'application/x-test', + if (typeof FormData !== 'undefined' && typeof File !== 'undefined') { + it('can transport FormData (blobs)', async () => { + const bytes = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]); + const blob = new Blob([bytes, bytes], { + type: 'application/x-test', + }); + + const formData = new FormData(); + formData.append('hi', 'world'); + formData.append('file', blob, 'filename.test'); + + expect(formData.get('file') instanceof File).toBe(true); + expect(formData.get('file').name).toBe('filename.test'); + + const body = await ReactServerDOMClient.encodeReply(formData); + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + + expect(result instanceof FormData).toBe(true); + expect(result.get('hi')).toBe('world'); + const resultBlob = result.get('file'); + expect(resultBlob instanceof Blob).toBe(true); + expect(resultBlob.name).toBe('filename.test'); // In this direction we allow file name to pass through but not other direction. + expect(resultBlob.size).toBe(bytes.length * 2); + expect(await resultBlob.arrayBuffer()).toEqual(await blob.arrayBuffer()); }); - - const formData = new FormData(); - formData.append('hi', 'world'); - formData.append('file', blob, 'filename.test'); - - expect(formData.get('file') instanceof File).toBe(true); - expect(formData.get('file').name).toBe('filename.test'); - - const body = await ReactServerDOMClient.encodeReply(formData); - const result = await ReactServerDOMServer.decodeReply( - body, - webpackServerMap, - ); - - expect(result instanceof FormData).toBe(true); - expect(result.get('hi')).toBe('world'); - const resultBlob = result.get('file'); - expect(resultBlob instanceof Blob).toBe(true); - expect(resultBlob.name).toBe('filename.test'); // In this direction we allow file name to pass through but not other direction. - expect(resultBlob.size).toBe(bytes.length * 2); - expect(await resultBlob.arrayBuffer()).toEqual(await blob.arrayBuffer()); - }); + } }); diff --git a/packages/react-server/src/ReactFizzThenable.js b/packages/react-server/src/ReactFizzThenable.js index b863d8b6f9f99..1991b9de1ca61 100644 --- a/packages/react-server/src/ReactFizzThenable.js +++ b/packages/react-server/src/ReactFizzThenable.js @@ -82,6 +82,9 @@ export function trackUsedThenable( // Only instrument the thenable if the status if not defined. If // it's defined, but an unknown value, assume it's been instrumented by // some custom userspace implementation. We treat it as "pending". + // Attach a dummy listener, to ensure that any lazy initialization can + // happen. Flight lazily parses JSON when the value is actually awaited. + thenable.then(noop, noop); } else { const pendingThenable: PendingThenable = (thenable: any); pendingThenable.status = 'pending'; @@ -101,17 +104,17 @@ export function trackUsedThenable( } }, ); + } - // Check one more time in case the thenable resolved synchronously - switch (thenable.status) { - case 'fulfilled': { - const fulfilledThenable: FulfilledThenable = (thenable: any); - return fulfilledThenable.value; - } - case 'rejected': { - const rejectedThenable: RejectedThenable = (thenable: any); - throw rejectedThenable.reason; - } + // Check one more time in case the thenable resolved synchronously + switch (thenable.status) { + case 'fulfilled': { + const fulfilledThenable: FulfilledThenable = (thenable: any); + return fulfilledThenable.value; + } + case 'rejected': { + const rejectedThenable: RejectedThenable = (thenable: any); + throw rejectedThenable.reason; } } diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 3c8ac8c1469a8..254e26e675bd0 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -20,6 +20,8 @@ import { enableServerComponentLogs, } from 'shared/ReactFeatureFlags'; +import {enableFlightReadableStream} from 'shared/ReactFeatureFlags'; + import { scheduleWork, flushBuffered, @@ -199,6 +201,8 @@ if ( const ObjectPrototype = Object.prototype; +const ASYNC_ITERATOR = Symbol.asyncIterator; + type JSONValue = | string | boolean @@ -236,6 +240,8 @@ export type ReactClientValue = | null | void | bigint + | ReadableStream + | $AsyncIterable | Iterable | Array | Map @@ -282,6 +288,7 @@ export type Request = { nextChunkId: number, pendingChunks: number, hints: Hints, + abortListeners: Set<(reason: mixed) => void>, abortableTasks: Set, pingedTasks: Array, completedImportChunks: Array, @@ -378,6 +385,7 @@ export function createRequest( nextChunkId: 0, pendingChunks: 0, hints, + abortListeners: new Set(), abortableTasks: abortSet, pingedTasks: pingedTasks, completedImportChunks: ([]: Array), @@ -509,15 +517,220 @@ function serializeThenable( emitErrorChunk(request, newTask.id, digest, reason); } request.abortableTasks.delete(newTask); - if (request.destination !== null) { - flushCompletedChunks(request, request.destination); - } + enqueueFlush(request); }, ); return newTask.id; } +function serializeReadableStream( + request: Request, + task: Task, + stream: ReadableStream, +): string { + // Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the + // receiving side. It also implies that different chunks can be split up or merged as opposed + // to a readable stream that happens to have Uint8Array as the type which might expect it to be + // received in the same slices. + // $FlowFixMe: This is a Node.js extension. + let supportsBYOB: void | boolean = stream.supportsBYOB; + if (supportsBYOB === undefined) { + try { + // $FlowFixMe[extra-arg]: This argument is accepted. + stream.getReader({mode: 'byob'}).releaseLock(); + supportsBYOB = true; + } catch (x) { + supportsBYOB = false; + } + } + + const reader = stream.getReader(); + + // This task won't actually be retried. We just use it to attempt synchronous renders. + const streamTask = createTask( + request, + task.model, + task.keyPath, + task.implicitSlot, + request.abortableTasks, + ); + request.abortableTasks.delete(streamTask); + + request.pendingChunks++; // The task represents the Start row. This adds a Stop row. + + const startStreamRow = + streamTask.id.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n'; + request.completedRegularChunks.push(stringToChunk(startStreamRow)); + + // There's a race condition between when the stream is aborted and when the promise + // resolves so we track whether we already aborted it to avoid writing twice. + let aborted = false; + function progress(entry: {done: boolean, value: ReactClientValue, ...}) { + if (aborted) { + return; + } + + if (entry.done) { + request.abortListeners.delete(error); + const endStreamRow = streamTask.id.toString(16) + ':C\n'; + request.completedRegularChunks.push(stringToChunk(endStreamRow)); + enqueueFlush(request); + aborted = true; + } else { + try { + streamTask.model = entry.value; + request.pendingChunks++; + tryStreamTask(request, streamTask); + enqueueFlush(request); + reader.read().then(progress, error); + } catch (x) { + error(x); + } + } + } + function error(reason: mixed) { + if (aborted) { + return; + } + aborted = true; + request.abortListeners.delete(error); + if ( + enablePostpone && + typeof reason === 'object' && + reason !== null && + (reason: any).$$typeof === REACT_POSTPONE_TYPE + ) { + const postponeInstance: Postpone = (reason: any); + logPostpone(request, postponeInstance.message); + emitPostponeChunk(request, streamTask.id, postponeInstance); + } else { + const digest = logRecoverableError(request, reason); + emitErrorChunk(request, streamTask.id, digest, reason); + } + enqueueFlush(request); + // $FlowFixMe should be able to pass mixed + reader.cancel(reason).then(error, error); + } + request.abortListeners.add(error); + reader.read().then(progress, error); + return serializeByValueID(streamTask.id); +} + +function serializeAsyncIterable( + request: Request, + task: Task, + iterable: $AsyncIterable, + iterator: $AsyncIterator, +): string { + // Generators/Iterators are Iterables but they're also their own iterator + // functions. If that's the case, we treat them as single-shot. Otherwise, + // we assume that this iterable might be a multi-shot and allow it to be + // iterated more than once on the client. + const isIterator = iterable === iterator; + + // This task won't actually be retried. We just use it to attempt synchronous renders. + const streamTask = createTask( + request, + task.model, + task.keyPath, + task.implicitSlot, + request.abortableTasks, + ); + request.abortableTasks.delete(streamTask); + + request.pendingChunks++; // The task represents the Start row. This adds a Stop row. + + const startStreamRow = + streamTask.id.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n'; + request.completedRegularChunks.push(stringToChunk(startStreamRow)); + + if (__DEV__) { + const debugInfo: ?ReactDebugInfo = (iterable: any)._debugInfo; + if (debugInfo) { + forwardDebugInfo(request, streamTask.id, debugInfo); + } + } + + // There's a race condition between when the stream is aborted and when the promise + // resolves so we track whether we already aborted it to avoid writing twice. + let aborted = false; + function progress( + entry: + | {done: false, +value: ReactClientValue, ...} + | {done: true, +value: ReactClientValue, ...}, + ) { + if (aborted) { + return; + } + + if (entry.done) { + request.abortListeners.delete(error); + let endStreamRow; + if (entry.value === undefined) { + endStreamRow = streamTask.id.toString(16) + ':C\n'; + } else { + // Unlike streams, the last value may not be undefined. If it's not + // we outline it and encode a reference to it in the closing instruction. + try { + const chunkId = outlineModel(request, entry.value); + endStreamRow = + streamTask.id.toString(16) + + ':C' + + stringify(serializeByValueID(chunkId)) + + '\n'; + } catch (x) { + error(x); + return; + } + } + request.completedRegularChunks.push(stringToChunk(endStreamRow)); + enqueueFlush(request); + aborted = true; + } else { + try { + streamTask.model = entry.value; + request.pendingChunks++; + tryStreamTask(request, streamTask); + enqueueFlush(request); + iterator.next().then(progress, error); + } catch (x) { + error(x); + return; + } + } + } + function error(reason: mixed) { + if (aborted) { + return; + } + aborted = true; + request.abortListeners.delete(error); + if ( + enablePostpone && + typeof reason === 'object' && + reason !== null && + (reason: any).$$typeof === REACT_POSTPONE_TYPE + ) { + const postponeInstance: Postpone = (reason: any); + logPostpone(request, postponeInstance.message); + emitPostponeChunk(request, streamTask.id, postponeInstance); + } else { + const digest = logRecoverableError(request, reason); + emitErrorChunk(request, streamTask.id, digest, reason); + } + enqueueFlush(request); + if (typeof (iterator: any).throw === 'function') { + // The iterator protocol doesn't necessarily include this but a generator do. + // $FlowFixMe should be able to pass mixed + iterator.throw(reason).then(error, error); + } + } + request.abortListeners.add(error); + iterator.next().then(progress, error); + return serializeByValueID(streamTask.id); +} + export function emitHint( request: Request, code: Code, @@ -691,6 +904,37 @@ function renderFragment( task: Task, children: $ReadOnlyArray, ): ReactJSONValue { + if (enableServerComponentKeys && task.keyPath !== null) { + // We have a Server Component that specifies a key but we're now splitting + // the tree using a fragment. + const fragment = [ + REACT_ELEMENT_TYPE, + REACT_FRAGMENT_TYPE, + task.keyPath, + {children}, + ]; + if (!task.implicitSlot) { + // If this was keyed inside a set. I.e. the outer Server Component was keyed + // then we need to handle reorders of the whole set. To do this we need to wrap + // this array in a keyed Fragment. + return fragment; + } + // If the outer Server Component was implicit but then an inner one had a key + // we don't actually need to be able to move the whole set around. It'll always be + // in an implicit slot. The key only exists to be able to reset the state of the + // children. We could achieve the same effect by passing on the keyPath to the next + // set of components inside the fragment. This would also allow a keyless fragment + // reconcile against a single child. + // Unfortunately because of JSON.stringify, we can't call the recursive loop for + // each child within this context because we can't return a set with already resolved + // values. E.g. a string would get double encoded. Returning would pop the context. + // So instead, we wrap it with an unkeyed fragment and inner keyed fragment. + return [fragment]; + } + // Since we're yielding here, that implicitly resets the keyPath context on the + // way up. Which is what we want since we've consumed it. If this changes to + // be recursive serialization, we need to reset the keyPath and implicitSlot, + // before recursing here. if (__DEV__) { const debugInfo: ?ReactDebugInfo = (children: any)._debugInfo; if (debugInfo) { @@ -705,12 +949,21 @@ function renderFragment( // from the server by the time we emit it. forwardDebugInfo(request, debugID, debugInfo); } + // Since we're rendering this array again, create a copy that doesn't + // have the debug info so we avoid outlining or emitting debug info again. + children = Array.from(children); } } - if (!enableServerComponentKeys) { - return children; - } - if (task.keyPath !== null) { + return children; +} + +function renderAsyncFragment( + request: Request, + task: Task, + children: $AsyncIterable, + getAsyncIterator: () => $AsyncIterator, +): ReactJSONValue { + if (enableServerComponentKeys && task.keyPath !== null) { // We have a Server Component that specifies a key but we're now splitting // the tree using a fragment. const fragment = [ @@ -737,11 +990,13 @@ function renderFragment( // So instead, we wrap it with an unkeyed fragment and inner keyed fragment. return [fragment]; } + // Since we're yielding here, that implicitly resets the keyPath context on the // way up. Which is what we want since we've consumed it. If this changes to // be recursive serialization, we need to reset the keyPath and implicitSlot, // before recursing here. - return children; + const asyncIterator = getAsyncIterator.call(children); + return serializeAsyncIterable(request, task, children, asyncIterator); } function renderClientElement( @@ -1156,13 +1411,9 @@ function serializeTemporaryReference( } function serializeLargeTextString(request: Request, text: string): string { - request.pendingChunks += 2; + request.pendingChunks++; const textId = request.nextChunkId++; - const textChunk = stringToChunk(text); - const binaryLength = byteLengthOfChunk(textChunk); - const row = textId.toString(16) + ':T' + binaryLength.toString(16) + ','; - const headerChunk = stringToChunk(row); - request.completedRegularChunks.push(headerChunk, textChunk); + emitTextChunk(request, textId, text); return serializeByValueID(textId); } @@ -1212,27 +1463,9 @@ function serializeTypedArray( tag: string, typedArray: $ArrayBufferView, ): string { - if (enableTaint) { - if (TaintRegistryByteLengths.has(typedArray.byteLength)) { - // If we have had any tainted values of this length, we check - // to see if these bytes matches any entries in the registry. - const tainted = TaintRegistryValues.get( - binaryToComparableString(typedArray), - ); - if (tainted !== undefined) { - throwTaintViolation(tainted.message); - } - } - } - request.pendingChunks += 2; + request.pendingChunks++; const bufferId = request.nextChunkId++; - // TODO: Convert to little endian if that's not the server default. - const binaryChunk = typedArrayToBinaryChunk(typedArray); - const binaryLength = byteLengthOfBinaryChunk(binaryChunk); - const row = - bufferId.toString(16) + ':' + tag + binaryLength.toString(16) + ','; - const headerChunk = stringToChunk(row); - request.completedRegularChunks.push(headerChunk, binaryChunk); + emitTypedArrayChunk(request, bufferId, tag, typedArray); return serializeByValueID(bufferId); } @@ -1248,10 +1481,16 @@ function serializeBlob(request: Request, blob: Blob): string { const reader = blob.stream().getReader(); + let aborted = false; function progress( entry: {done: false, value: Uint8Array} | {done: true, value: void}, ): Promise | void { + if (aborted) { + return; + } if (entry.done) { + request.abortListeners.delete(error); + aborted = true; pingTask(request, newTask); return; } @@ -1262,13 +1501,21 @@ function serializeBlob(request: Request, blob: Blob): string { } function error(reason: mixed) { + if (aborted) { + return; + } + aborted = true; + request.abortListeners.delete(error); const digest = logRecoverableError(request, reason); emitErrorChunk(request, newTask.id, digest, reason); request.abortableTasks.delete(newTask); - if (request.destination !== null) { - flushCompletedChunks(request, request.destination); - } + enqueueFlush(request); + // $FlowFixMe should be able to pass mixed + reader.cancel(reason).then(error, error); } + + request.abortListeners.add(error); + // $FlowFixMe[incompatible-call] reader.read().then(progress).catch(error); @@ -1667,6 +1914,27 @@ function renderModelDestructive( return renderFragment(request, task, Array.from((value: any))); } + if (enableFlightReadableStream) { + // TODO: Blob is not available in old Node. Remove the typeof check later. + if ( + typeof ReadableStream === 'function' && + value instanceof ReadableStream + ) { + return serializeReadableStream(request, task, value); + } + const getAsyncIterator: void | (() => $AsyncIterator) = + (value: any)[ASYNC_ITERATOR]; + if (typeof getAsyncIterator === 'function') { + // We treat AsyncIterables as a Fragment and as such we might need to key them. + return renderAsyncFragment( + request, + task, + (value: any), + getAsyncIterator, + ); + } + } + // Verify that this is a simple plain object. const proto = getPrototypeOf(value); if ( @@ -2031,6 +2299,42 @@ function emitDebugChunk( request.completedRegularChunks.push(processedChunk); } +function emitTypedArrayChunk( + request: Request, + id: number, + tag: string, + typedArray: $ArrayBufferView, +): void { + if (enableTaint) { + if (TaintRegistryByteLengths.has(typedArray.byteLength)) { + // If we have had any tainted values of this length, we check + // to see if these bytes matches any entries in the registry. + const tainted = TaintRegistryValues.get( + binaryToComparableString(typedArray), + ); + if (tainted !== undefined) { + throwTaintViolation(tainted.message); + } + } + } + request.pendingChunks++; // Extra chunk for the header. + // TODO: Convert to little endian if that's not the server default. + const binaryChunk = typedArrayToBinaryChunk(typedArray); + const binaryLength = byteLengthOfBinaryChunk(binaryChunk); + const row = id.toString(16) + ':' + tag + binaryLength.toString(16) + ','; + const headerChunk = stringToChunk(row); + request.completedRegularChunks.push(headerChunk, binaryChunk); +} + +function emitTextChunk(request: Request, id: number, text: string): void { + request.pendingChunks++; // Extra chunk for the header. + const textChunk = stringToChunk(text); + const binaryLength = byteLengthOfChunk(textChunk); + const row = id.toString(16) + ':T' + binaryLength.toString(16) + ','; + const headerChunk = stringToChunk(row); + request.completedRegularChunks.push(headerChunk, textChunk); +} + function serializeEval(source: string): string { if (!__DEV__) { // These errors should never make it into a build so we don't need to encode them in codes.json @@ -2391,6 +2695,96 @@ function forwardDebugInfo( } } +function emitChunk( + request: Request, + task: Task, + value: ReactClientValue, +): void { + const id = task.id; + // For certain types we have special types, we typically outlined them but + // we can emit them directly for this row instead of through an indirection. + if (typeof value === 'string') { + if (enableTaint) { + const tainted = TaintRegistryValues.get(value); + if (tainted !== undefined) { + throwTaintViolation(tainted.message); + } + } + emitTextChunk(request, id, value); + return; + } + if (enableBinaryFlight) { + if (value instanceof ArrayBuffer) { + emitTypedArrayChunk(request, id, 'A', new Uint8Array(value)); + return; + } + if (value instanceof Int8Array) { + // char + emitTypedArrayChunk(request, id, 'O', value); + return; + } + if (value instanceof Uint8Array) { + // unsigned char + emitTypedArrayChunk(request, id, 'o', value); + return; + } + if (value instanceof Uint8ClampedArray) { + // unsigned clamped char + emitTypedArrayChunk(request, id, 'U', value); + return; + } + if (value instanceof Int16Array) { + // sort + emitTypedArrayChunk(request, id, 'S', value); + return; + } + if (value instanceof Uint16Array) { + // unsigned short + emitTypedArrayChunk(request, id, 's', value); + return; + } + if (value instanceof Int32Array) { + // long + emitTypedArrayChunk(request, id, 'L', value); + return; + } + if (value instanceof Uint32Array) { + // unsigned long + emitTypedArrayChunk(request, id, 'l', value); + return; + } + if (value instanceof Float32Array) { + // float + emitTypedArrayChunk(request, id, 'G', value); + return; + } + if (value instanceof Float64Array) { + // double + emitTypedArrayChunk(request, id, 'g', value); + return; + } + if (value instanceof BigInt64Array) { + // number + emitTypedArrayChunk(request, id, 'M', value); + return; + } + if (value instanceof BigUint64Array) { + // unsigned number + // We use "m" instead of "n" since JSON can start with "null" + emitTypedArrayChunk(request, id, 'm', value); + return; + } + if (value instanceof DataView) { + emitTypedArrayChunk(request, id, 'V', value); + return; + } + } + // For anything else we need to try to serialize it using JSON. + // $FlowFixMe[incompatible-type] stringify can return null for undefined but we never do + const json: string = stringify(value, task.toJSON); + emitModelChunk(request, task.id, json); +} + const emptyRoot = {}; function retryTask(request: Request, task: Task): void { @@ -2435,19 +2829,17 @@ function retryTask(request: Request, task: Task): void { task.keyPath = null; task.implicitSlot = false; - let json: string; if (typeof resolvedModel === 'object' && resolvedModel !== null) { // Object might contain unresolved values like additional elements. // This is simulating what the JSON loop would do if this was part of it. - // $FlowFixMe[incompatible-type] stringify can return null for undefined but we never do - json = stringify(resolvedModel, task.toJSON); + emitChunk(request, task, resolvedModel); } else { // If the value is a string, it means it's a terminal value and we already escaped it // We don't need to escape it again so it's not passed the toJSON replacer. // $FlowFixMe[incompatible-type] stringify can return null for undefined but we never do - json = stringify(resolvedModel); + const json: string = stringify(resolvedModel); + emitModelChunk(request, task.id, json); } - emitModelChunk(request, task.id, json); request.abortableTasks.delete(task); task.status = COMPLETED; @@ -2489,6 +2881,24 @@ function retryTask(request: Request, task: Task): void { } } +function tryStreamTask(request: Request, task: Task): void { + // This is used to try to emit something synchronously but if it suspends, + // we emit a reference to a new outlined task immediately instead. + const prevDebugID = debugID; + if (__DEV__) { + // We don't use the id of the stream task for debugID. Instead we leave it null + // so that we instead outline the row to get a new debugID if needed. + debugID = null; + } + try { + emitChunk(request, task, task.model); + } finally { + if (__DEV__) { + debugID = prevDebugID; + } + } +} + function performWork(request: Request): void { const prevDispatcher = ReactSharedInternals.H; ReactSharedInternals.H = HooksDispatcher; @@ -2603,6 +3013,7 @@ function flushCompletedChunks( cleanupTaintQueue(request); } close(destination); + request.destination = null; } } @@ -2660,9 +3071,9 @@ export function stopFlowing(request: Request): void { export function abort(request: Request, reason: mixed): void { try { const abortableTasks = request.abortableTasks; + // We have tasks to abort. We'll emit one error row and then emit a reference + // to that row from every row that's still remaining. if (abortableTasks.size > 0) { - // We have tasks to abort. We'll emit one error row and then emit a reference - // to that row from every row that's still remaining. request.pendingChunks++; const errorId = request.nextChunkId++; if ( @@ -2687,6 +3098,30 @@ export function abort(request: Request, reason: mixed): void { abortableTasks.forEach(task => abortTask(task, request, errorId)); abortableTasks.clear(); } + const abortListeners = request.abortListeners; + if (abortListeners.size > 0) { + let error; + if ( + enablePostpone && + typeof reason === 'object' && + reason !== null && + (reason: any).$$typeof === REACT_POSTPONE_TYPE + ) { + // We aborted with a Postpone but since we're passing this to an + // external handler, passing this object would leak it outside React. + // We create an alternative reason for it instead. + error = new Error('The render was aborted due to being postponed.'); + } else { + error = + reason === undefined + ? new Error( + 'The render was aborted by the server without a reason.', + ) + : reason; + } + abortListeners.forEach(callback => callback(error)); + abortListeners.clear(); + } if (request.destination !== null) { flushCompletedChunks(request, request.destination); } diff --git a/packages/react-server/src/ReactFlightThenable.js b/packages/react-server/src/ReactFlightThenable.js index 852c13b2be4e4..cfda818f19ffc 100644 --- a/packages/react-server/src/ReactFlightThenable.js +++ b/packages/react-server/src/ReactFlightThenable.js @@ -82,6 +82,9 @@ export function trackUsedThenable( // Only instrument the thenable if the status if not defined. If // it's defined, but an unknown value, assume it's been instrumented by // some custom userspace implementation. We treat it as "pending". + // Attach a dummy listener, to ensure that any lazy initialization can + // happen. Flight lazily parses JSON when the value is actually awaited. + thenable.then(noop, noop); } else { const pendingThenable: PendingThenable = (thenable: any); pendingThenable.status = 'pending'; @@ -101,17 +104,17 @@ export function trackUsedThenable( } }, ); + } - // Check one more time in case the thenable resolved synchronously - switch (thenable.status) { - case 'fulfilled': { - const fulfilledThenable: FulfilledThenable = (thenable: any); - return fulfilledThenable.value; - } - case 'rejected': { - const rejectedThenable: RejectedThenable = (thenable: any); - throw rejectedThenable.reason; - } + // Check one more time in case the thenable resolved synchronously + switch (thenable.status) { + case 'fulfilled': { + const fulfilledThenable: FulfilledThenable = (thenable: any); + return fulfilledThenable.value; + } + case 'rejected': { + const rejectedThenable: RejectedThenable = (thenable: any); + throw rejectedThenable.reason; } } diff --git a/packages/shared/ReactFeatureFlags.js b/packages/shared/ReactFeatureFlags.js index 700e85ea6de9e..02c4abc3a85de 100644 --- a/packages/shared/ReactFeatureFlags.js +++ b/packages/shared/ReactFeatureFlags.js @@ -81,6 +81,7 @@ export const enableLegacyCache = __EXPERIMENTAL__; export const enableFetchInstrumentation = true; export const enableBinaryFlight = __EXPERIMENTAL__; +export const enableFlightReadableStream = __EXPERIMENTAL__; export const enableTaint = __EXPERIMENTAL__; diff --git a/packages/shared/forks/ReactFeatureFlags.native-fb.js b/packages/shared/forks/ReactFeatureFlags.native-fb.js index c3aee2ab41bee..5c29c8b9a71d8 100644 --- a/packages/shared/forks/ReactFeatureFlags.native-fb.js +++ b/packages/shared/forks/ReactFeatureFlags.native-fb.js @@ -44,6 +44,7 @@ export const enableCache = true; export const enableLegacyCache = false; export const enableFetchInstrumentation = false; export const enableBinaryFlight = true; +export const enableFlightReadableStream = true; export const enableTaint = true; export const enablePostpone = false; export const debugRenderPhaseSideEffectsForStrictMode = __DEV__; diff --git a/packages/shared/forks/ReactFeatureFlags.native-oss.js b/packages/shared/forks/ReactFeatureFlags.native-oss.js index 6f5293398b8c8..e1064dfa1bf59 100644 --- a/packages/shared/forks/ReactFeatureFlags.native-oss.js +++ b/packages/shared/forks/ReactFeatureFlags.native-oss.js @@ -52,6 +52,7 @@ export const enableTaint = __NEXT_RN_MAJOR__; export const enableUnifiedSyncLane = __NEXT_RN_MAJOR__; export const enableFizzExternalRuntime = __NEXT_RN_MAJOR__; // DOM-only export const enableBinaryFlight = __NEXT_RN_MAJOR__; // DOM-only +export const enableFlightReadableStream = __NEXT_RN_MAJOR__; // DOM-only export const enableServerComponentKeys = __NEXT_RN_MAJOR__; export const enableServerComponentLogs = __NEXT_RN_MAJOR__; diff --git a/packages/shared/forks/ReactFeatureFlags.test-renderer.js b/packages/shared/forks/ReactFeatureFlags.test-renderer.js index 1fe4a6e2d86d9..b0a0c7f5d5342 100644 --- a/packages/shared/forks/ReactFeatureFlags.test-renderer.js +++ b/packages/shared/forks/ReactFeatureFlags.test-renderer.js @@ -22,6 +22,7 @@ export const enableCache = true; export const enableLegacyCache = __EXPERIMENTAL__; export const enableFetchInstrumentation = true; export const enableBinaryFlight = true; +export const enableFlightReadableStream = true; export const enableTaint = true; export const enablePostpone = false; export const disableCommentsAsDOMContainers = true; diff --git a/packages/shared/forks/ReactFeatureFlags.test-renderer.native-fb.js b/packages/shared/forks/ReactFeatureFlags.test-renderer.native-fb.js index e78740263ff90..40297ad5fa620 100644 --- a/packages/shared/forks/ReactFeatureFlags.test-renderer.native-fb.js +++ b/packages/shared/forks/ReactFeatureFlags.test-renderer.native-fb.js @@ -22,6 +22,7 @@ export const enableCache = true; export const enableLegacyCache = false; export const enableFetchInstrumentation = false; export const enableBinaryFlight = true; +export const enableFlightReadableStream = true; export const enableTaint = true; export const enablePostpone = false; export const disableCommentsAsDOMContainers = true; diff --git a/packages/shared/forks/ReactFeatureFlags.test-renderer.www.js b/packages/shared/forks/ReactFeatureFlags.test-renderer.www.js index a222f31812407..0aa61e8b9bd4e 100644 --- a/packages/shared/forks/ReactFeatureFlags.test-renderer.www.js +++ b/packages/shared/forks/ReactFeatureFlags.test-renderer.www.js @@ -22,6 +22,7 @@ export const enableCache = true; export const enableLegacyCache = true; export const enableFetchInstrumentation = false; export const enableBinaryFlight = true; +export const enableFlightReadableStream = true; export const enableTaint = true; export const enablePostpone = false; export const disableCommentsAsDOMContainers = true; diff --git a/packages/shared/forks/ReactFeatureFlags.www.js b/packages/shared/forks/ReactFeatureFlags.www.js index d05926e29bafa..82e3b08c7ecd0 100644 --- a/packages/shared/forks/ReactFeatureFlags.www.js +++ b/packages/shared/forks/ReactFeatureFlags.www.js @@ -69,6 +69,8 @@ export const enableLegacyCache = true; export const enableFetchInstrumentation = false; export const enableBinaryFlight = false; +export const enableFlightReadableStream = false; + export const enableTaint = false; export const enablePostpone = false; diff --git a/scripts/error-codes/codes.json b/scripts/error-codes/codes.json index 2ccd0ee9f9a3e..1fef8eb97413f 100644 --- a/scripts/error-codes/codes.json +++ b/scripts/error-codes/codes.json @@ -507,5 +507,7 @@ "519": "Hydration Mismatch Exception: This is not a real error, and should not leak into userspace. If you're seeing this, it's likely a bug in React.", "520": "There was an error during concurrent rendering but React was able to recover by instead synchronously rendering the entire root.", "521": "flushSyncWork should not be called from builds that support legacy mode. This is a bug in React.", - "522": "Invalid form element. requestFormReset must be passed a form that was rendered by React." + "522": "Invalid form element. requestFormReset must be passed a form that was rendered by React.", + "523": "The render was aborted due to being postponed.", + "524": "Values cannot be passed to next() of AsyncIterables passed to Client Components." }