Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions packages/next/src/server/app-render/app-render-prerender-utils.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,46 @@
import type { Readable } from 'node:stream'
import { InvariantError } from '../../shared/lib/invariant-error'

export type StreamLike = ReadableStream<Uint8Array> | Readable

function isWebStream(stream: StreamLike): stream is ReadableStream<Uint8Array> {
return typeof (stream as ReadableStream).tee === 'function'
}

// React's RSC prerender function will emit an incomplete flight stream when using `prerender`. If the connection
// closes then whatever hanging chunks exist will be errored. This is because prerender (an experimental feature)
// has not yet implemented a concept of resume. For now we will simulate a paused connection by wrapping the stream
// in one that doesn't close even when the underlying is complete.
export class ReactServerResult {
private _stream: null | ReadableStream<Uint8Array>
private _stream: null | StreamLike

constructor(stream: ReadableStream<Uint8Array>) {
constructor(stream: StreamLike) {
this._stream = stream
}

tee() {
tee(): StreamLike {
if (this._stream === null) {
throw new Error(
'Cannot tee a ReactServerResult that has already been consumed'
)
}
const tee = this._stream.tee()
this._stream = tee[0]
return tee[1]
if (isWebStream(this._stream)) {
const tee = this._stream.tee()
this._stream = tee[0]
return tee[1]
}
// Node.js Readable: pipe to two PassThrough streams
const { PassThrough } =
require('node:stream') as typeof import('node:stream')
const pt1 = new PassThrough()
const pt2 = new PassThrough()
this._stream.pipe(pt1)
this._stream.pipe(pt2)
this._stream = pt1
return pt2
}

consume() {
consume(): StreamLike {
if (this._stream === null) {
throw new Error(
'Cannot consume a ReactServerResult that has already been consumed'
Expand Down Expand Up @@ -55,18 +73,32 @@ export async function createReactServerPrerenderResult(
}

export async function createReactServerPrerenderResultFromRender(
underlying: ReadableStream<Uint8Array>
underlying: StreamLike
): Promise<ReactServerPrerenderResult> {
const chunks: Array<Uint8Array> = []
const reader = underlying.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) {
break
} else {
chunks.push(value)

if (isWebStream(underlying)) {
const reader = underlying.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) {
break
} else {
chunks.push(value)
}
}
} else {
// Node.js Readable stream
const readable: Readable = underlying
await new Promise<void>((resolve, reject) => {
readable.on('data', (chunk: Buffer | Uint8Array) => {
chunks.push(chunk instanceof Uint8Array ? chunk : new Uint8Array(chunk))
})
readable.on('end', resolve)
readable.on('error', reject)
})
}

return new ReactServerPrerenderResult(chunks)
}
export class ReactServerPrerenderResult {
Expand Down
85 changes: 48 additions & 37 deletions packages/next/src/server/app-render/app-render.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ import {
getClientPrerender,
processPrelude as processPreludeOp,
createDocumentClosingStream,
teeStream,
toReadableStream,
} from './stream-ops'
import type { AnyStream } from './stream-ops'
import { stripInternalQueries } from '../internal-utils'
import {
NEXT_HMR_REFRESH_HEADER,
Expand Down Expand Up @@ -894,7 +897,7 @@ async function generateDynamicFlightRenderResultWithStagesInDev(
}

let debugChannel: DebugChannelPair | undefined
let stream: ReadableStream<Uint8Array>
let stream: AnyStream

if (
// We only do this flow if we can safely recreate the store from scratch
Expand Down Expand Up @@ -1797,7 +1800,7 @@ function ErrorApp<T>({
// certain object shape. The generic type is not used directly in the type so it
// requires a disabling of the eslint rule disallowing unused vars
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export type BinaryStreamOf<T> = ReadableStream<Uint8Array>
export type BinaryStreamOf<T> = AnyStream

async function renderToHTMLOrFlightImpl(
req: BaseNextRequest,
Expand Down Expand Up @@ -2488,7 +2491,7 @@ async function renderToStream(
metadata: AppPageRenderResultMetadata,
createRequestStore: (() => RequestStore) | undefined,
devFallbackParams: OpaqueFallbackRouteParams | null
): Promise<ReadableStream<Uint8Array>> {
): Promise<AnyStream> {
/* eslint-disable @next/internal/no-ambiguous-jsx -- React Client */
const {
assetPrefix,
Expand Down Expand Up @@ -3192,20 +3195,22 @@ async function renderWithRestartOnCacheMissInDev(
initialStageController.advanceStage(RenderStage.EarlyStatic)
startTime = performance.now() + performance.timeOrigin

const streamPair = renderToFlightStream(
ComponentMod,
initialRscPayload,
clientModules,
{
onError,
environmentName,
startTime,
filterStackFrame,
debugChannel: debugChannel?.serverSide,
signal: initialReactController.signal,
},
(fn) => workUnitAsyncStorage.run(requestStore, fn)
).tee()
const streamPair = teeStream(
renderToFlightStream(
ComponentMod,
initialRscPayload,
clientModules,
{
onError,
environmentName,
startTime,
filterStackFrame,
debugChannel: debugChannel?.serverSide,
signal: initialReactController.signal,
},
(fn) => workUnitAsyncStorage.run(requestStore, fn)
)
)

// If we abort the render, we want to reject the stage-dependent promises as well.
// Note that we want to install this listener after the render is started
Expand All @@ -3216,14 +3221,18 @@ async function renderWithRestartOnCacheMissInDev(

const stream = streamPair[0]
const accumulatedChunksPromise = accumulateStreamChunks(
streamPair[1],
toReadableStream(streamPair[1]),
initialStageController,
initialDataController.signal
)

initialDataController.signal.addEventListener('abort', () => {
accumulatedChunksPromise.catch(() => {})
stream.cancel()
if (stream instanceof ReadableStream) {
stream.cancel()
} else {
stream.destroy()
}
})

return {
Expand Down Expand Up @@ -3339,24 +3348,26 @@ async function renderWithRestartOnCacheMissInDev(
finalStageController.advanceStage(RenderStage.EarlyStatic)
startTime = performance.now() + performance.timeOrigin

const streamPair = renderToFlightStream(
ComponentMod,
finalRscPayload,
clientModules,
{
onError,
environmentName,
startTime,
filterStackFrame,
debugChannel: debugChannel?.serverSide,
},
(fn) => workUnitAsyncStorage.run(requestStore, fn)
).tee()
const streamPair = teeStream(
renderToFlightStream(
ComponentMod,
finalRscPayload,
clientModules,
{
onError,
environmentName,
startTime,
filterStackFrame,
debugChannel: debugChannel?.serverSide,
},
(fn) => workUnitAsyncStorage.run(requestStore, fn)
)
)

return {
stream: streamPair[0],
accumulatedChunksPromise: accumulateStreamChunks(
streamPair[1],
toReadableStream(streamPair[1]),
finalStageController,
null
),
Expand Down Expand Up @@ -3585,7 +3596,7 @@ async function logMessagesAndSendErrorsToBrowser(
{ filterStackFrame }
)

sendErrorsToBrowser(errorsFlightStream, htmlRequestId)
sendErrorsToBrowser(toReadableStream(errorsFlightStream), htmlRequestId)
}
}

Expand Down Expand Up @@ -4426,7 +4437,7 @@ async function validateInstantConfigNavigation(
}

type PrerenderToStreamResult = {
stream: ReadableStream<Uint8Array>
stream: AnyStream
digestErrorsMap: Map<string, DigestedError>
ssrErrors: Array<unknown>
dynamicAccess?: null | Array<DynamicAccess>
Expand Down Expand Up @@ -5252,7 +5263,7 @@ async function prerenderToStream(
)
}

let htmlStream: ReadableStream<Uint8Array> = prelude
let htmlStream: AnyStream = prelude
if (postponed != null) {
// We postponed but nothing dynamic was used. We resume the render now and immediately abort it
// so we can set all the postponed boundaries to client render mode before we store the HTML response
Expand Down Expand Up @@ -5540,7 +5551,7 @@ async function prerenderToStream(
)
}

let htmlStream: ReadableStream<Uint8Array> = prelude
let htmlStream: AnyStream = prelude
if (postponed != null) {
// We postponed but nothing dynamic was used. We resume the render now and immediately abort it
// so we can set all the postponed boundaries to client render mode before we store the HTML response
Expand Down
3 changes: 2 additions & 1 deletion packages/next/src/server/app-render/flight-render-result.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { Readable } from 'node:stream'
import { RSC_CONTENT_TYPE_HEADER } from '../../client/components/app-router-headers'
import RenderResult, { type RenderResultMetadata } from '../render-result'

Expand All @@ -6,7 +7,7 @@ import RenderResult, { type RenderResultMetadata } from '../render-result'
*/
export class FlightRenderResult extends RenderResult {
constructor(
response: string | ReadableStream<Uint8Array>,
response: string | ReadableStream<Uint8Array> | Readable,
metadata: RenderResultMetadata = {},
waitUntil?: Promise<unknown>
) {
Expand Down
Loading
Loading