@@ -20,6 +20,8 @@ import type {Thenable} from 'shared/ReactTypes';
2020
2121import { Readable } from 'stream' ;
2222
23+ import { ASYNC_ITERATOR } from 'shared/ReactSymbols' ;
24+
2325import {
2426 createRequest ,
2527 createPrerenderRequest ,
@@ -34,6 +36,7 @@ import {
3436 reportGlobalError ,
3537 close ,
3638 resolveField ,
39+ resolveFile ,
3740 resolveFileInfo ,
3841 resolveFileChunk ,
3942 resolveFileComplete ,
@@ -51,6 +54,8 @@ export {
5154 createClientModuleProxy ,
5255} from '../ReactFlightTurbopackReferences' ;
5356
57+ import { textEncoder } from 'react-server/src/ReactServerStreamConfigNode' ;
58+
5459import type { TemporaryReferenceSet } from 'react-server/src/ReactFlightServerTemporaryReferences' ;
5560
5661export { createTemporaryReferenceSet } from 'react-server/src/ReactFlightServerTemporaryReferences' ;
@@ -128,11 +133,91 @@ function renderToPipeableStream(
128133 } ;
129134}
130135
131- function createFakeWritable ( readable : any ) : Writable {
136+ function createFakeWritableFromReadableStreamController (
137+ controller : ReadableStreamController ,
138+ ) : Writable {
132139 // The current host config expects a Writable so we create
133140 // a fake writable for now to push into the Readable.
134141 return ( {
135- write ( chunk ) {
142+ write ( chunk : string | Uint8Array ) {
143+ if ( typeof chunk === 'string' ) {
144+ chunk = textEncoder . encode ( chunk ) ;
145+ }
146+ controller . enqueue ( chunk ) ;
147+ // in web streams there is no backpressure so we can alwas write more
148+ return true ;
149+ } ,
150+ end ( ) {
151+ controller . close ( ) ;
152+ } ,
153+ destroy ( error ) {
154+ // $FlowFixMe[method-unbinding]
155+ if ( typeof controller . error === 'function' ) {
156+ // $FlowFixMe[incompatible-call]: This is an Error object or the destination accepts other types.
157+ controller . error ( error ) ;
158+ } else {
159+ controller . close ( ) ;
160+ }
161+ } ,
162+ } : any ) ;
163+ }
164+
165+ function renderToReadableStream (
166+ model : ReactClientValue ,
167+ turbopackMap : ClientManifest ,
168+ options ?: Options & {
169+ signal ?: AbortSignal ,
170+ } ,
171+ ) : ReadableStream {
172+ const request = createRequest (
173+ model ,
174+ turbopackMap ,
175+ options ? options . onError : undefined ,
176+ options ? options . identifierPrefix : undefined ,
177+ options ? options . onPostpone : undefined ,
178+ options ? options . temporaryReferences : undefined ,
179+ __DEV__ && options ? options . environmentName : undefined ,
180+ __DEV__ && options ? options . filterStackFrame : undefined ,
181+ ) ;
182+ if ( options && options . signal ) {
183+ const signal = options . signal ;
184+ if ( signal . aborted ) {
185+ abort ( request , ( signal : any ) . reason ) ;
186+ } else {
187+ const listener = ( ) => {
188+ abort ( request , ( signal : any ) . reason ) ;
189+ signal . removeEventListener ( 'abort' , listener ) ;
190+ } ;
191+ signal . addEventListener ( 'abort' , listener ) ;
192+ }
193+ }
194+ let writable : Writable ;
195+ const stream = new ReadableStream (
196+ {
197+ type : 'bytes' ,
198+ start : ( controller ) : ?Promise < void > => {
199+ writable = createFakeWritableFromReadableStreamController ( controller ) ;
200+ startWork ( request ) ;
201+ } ,
202+ pull : ( controller ) : ?Promise < void > => {
203+ startFlowing ( request , writable ) ;
204+ } ,
205+ cancel : ( reason ) : ?Promise < void > => {
206+ stopFlowing ( request ) ;
207+ abort ( request , reason ) ;
208+ } ,
209+ } ,
210+ // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams.
211+ { highWaterMark : 0 } ,
212+ ) ;
213+ return stream ;
214+ }
215+
216+ function createFakeWritableFromNodeReadable ( readable : any ) : Writable {
217+ // The current host config expects a Writable so we create
218+ // a fake writable for now to push into the Readable.
219+ return ( {
220+ write ( chunk : string | Uint8Array ) {
136221 return readable . push ( chunk ) ;
137222 } ,
138223 end ( ) {
@@ -171,7 +256,7 @@ function prerenderToNodeStream(
171256 startFlowing ( request , writable ) ;
172257 } ,
173258 } ) ;
174- const writable = createFakeWritable ( readable ) ;
259+ const writable = createFakeWritableFromNodeReadable ( readable ) ;
175260 resolve ( { prelude : readable } ) ;
176261 }
177262
@@ -205,6 +290,69 @@ function prerenderToNodeStream(
205290 } ) ;
206291}
207292
293+ function prerender (
294+ model : ReactClientValue ,
295+ turbopackMap : ClientManifest ,
296+ options ?: Options & {
297+ signal ?: AbortSignal ,
298+ } ,
299+ ) : Promise < {
300+ prelude : ReadableStream ,
301+ } > {
302+ return new Promise ( ( resolve , reject ) => {
303+ const onFatalError = reject ;
304+ function onAllReady ( ) {
305+ let writable : Writable ;
306+ const stream = new ReadableStream (
307+ {
308+ type : 'bytes' ,
309+ start : ( controller ) : ?Promise < void > => {
310+ writable =
311+ createFakeWritableFromReadableStreamController ( controller ) ;
312+ } ,
313+ pull : ( controller ) : ?Promise < void > => {
314+ startFlowing ( request , writable ) ;
315+ } ,
316+ cancel : ( reason ) : ?Promise < void > => {
317+ stopFlowing ( request ) ;
318+ abort ( request , reason ) ;
319+ } ,
320+ } ,
321+ // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams.
322+ { highWaterMark : 0 } ,
323+ ) ;
324+ resolve ( { prelude : stream } ) ;
325+ }
326+ const request = createPrerenderRequest (
327+ model ,
328+ turbopackMap ,
329+ onAllReady ,
330+ onFatalError ,
331+ options ? options . onError : undefined ,
332+ options ? options . identifierPrefix : undefined ,
333+ options ? options . onPostpone : undefined ,
334+ options ? options . temporaryReferences : undefined ,
335+ __DEV__ && options ? options . environmentName : undefined ,
336+ __DEV__ && options ? options . filterStackFrame : undefined ,
337+ ) ;
338+ if ( options && options . signal ) {
339+ const signal = options . signal ;
340+ if ( signal . aborted ) {
341+ const reason = ( signal : any ) . reason ;
342+ abort ( request , reason ) ;
343+ } else {
344+ const listener = ( ) => {
345+ const reason = ( signal : any ) . reason ;
346+ abort ( request , reason ) ;
347+ signal . removeEventListener ( 'abort' , listener ) ;
348+ } ;
349+ signal . addEventListener ( 'abort' , listener ) ;
350+ }
351+ }
352+ startWork ( request ) ;
353+ } ) ;
354+ }
355+
208356function decodeReplyFromBusboy < T > (
209357 busboyStream : Busboy ,
210358 turbopackMap : ServerManifest ,
@@ -286,11 +434,59 @@ function decodeReply<T>(
286434 return root ;
287435}
288436
437+ function decodeReplyFromAsyncIterable < T > (
438+ iterable: AsyncIterable< [ string , string | File ] > ,
439+ turbopackMap : ServerManifest ,
440+ options ?: { temporaryReferences ?: TemporaryReferenceSet } ,
441+ ) : Thenable < T > {
442+ const iterator : AsyncIterator < [ string , string | File ] > =
443+ iterable [ ASYNC_ITERATOR ] ( ) ;
444+
445+ const response = createResponse (
446+ turbopackMap ,
447+ '' ,
448+ options ? options . temporaryReferences : undefined ,
449+ ) ;
450+
451+ function progress (
452+ entry :
453+ | { done : false , + value : [ string , string | File ] , ...}
454+ | { done : true , + value : void , ...} ,
455+ ) {
456+ if ( entry . done ) {
457+ close ( response ) ;
458+ } else {
459+ const [ name , value ] = entry . value ;
460+ if ( typeof value === 'string' ) {
461+ resolveField ( response , name , value ) ;
462+ } else {
463+ resolveFile ( response , name , value ) ;
464+ }
465+ iterator . next ( ) . then ( progress , error ) ;
466+ }
467+ }
468+ function error ( reason : Error ) {
469+ reportGlobalError ( response , reason ) ;
470+ if ( typeof ( iterator : any ) . throw === 'function' ) {
471+ // The iterator protocol doesn't necessarily include this but a generator do.
472+ // $FlowFixMe should be able to pass mixed
473+ iterator . throw ( reason ) . then ( error , error ) ;
474+ }
475+ }
476+
477+ iterator . next ( ) . then ( progress , error ) ;
478+
479+ return getRoot ( response ) ;
480+ }
481+
289482export {
483+ renderToReadableStream ,
290484 renderToPipeableStream,
485+ prerender,
291486 prerenderToNodeStream,
292- decodeReplyFromBusboy ,
293487 decodeReply,
488+ decodeReplyFromBusboy,
489+ decodeReplyFromAsyncIterable,
294490 decodeAction,
295491 decodeFormState,
296492} ;
0 commit comments