Skip to content

Commit 6a0d4fc

Browse files
committed
The patched fetch function should not buffer a streamed response
When our patched `fetch` function comes to the conclusion that it should cache a response, it currently buffers the full response body before returning a pseudo-cloned `Response` instance. This is especially a problem in chat applications, where LLM responses need to be streamed to the client immediately, without being buffered. Since those chat requests are usually POSTs though, the buffering in `createPatchedFetcher` did not create a problem because this was only applied to GET requests. Although use cases where GET requests are streamed do also exist, most prominently RSC requests. Those would have been already affected by the buffering. With the introduction of the Server Components HMR cache in #67527 (enabled per default in #67800), the patched `fetch` function was also buffering POST response bodies, so that they can be stored in the HMR cache. This made the buffering behaviour obvious because now Next.js applications using the AI SDK to stream responses were affected, see vercel/ai#2480 for example. With this PR, we are now returning the original response immediately, thus allowing streaming again, and cache a cloned response in the background. As an alternative, I considered to not cache POST requests in the Server Components HMR cache. But I dismissed this solution, because I still think that caching those requests is useful when editing server components. In addition, this solution would not have addressed the buffering issue for GET requests.
1 parent 6c522df commit 6a0d4fc

File tree

2 files changed

+145
-50
lines changed

2 files changed

+145
-50
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import { AsyncLocalStorage } from 'node:async_hooks'
2+
import type { RequestStore } from '../../client/components/request-async-storage.external'
3+
import type { StaticGenerationStore } from '../../client/components/static-generation-async-storage.external'
4+
import type { IncrementalCache } from './incremental-cache'
5+
import { createPatchedFetcher } from './patch-fetch'
6+
7+
describe('createPatchedFetcher', () => {
8+
it('should not buffer a streamed response', async () => {
9+
const mockFetch: jest.MockedFunction<typeof fetch> = jest.fn()
10+
let streamChunk: () => void
11+
12+
const readableStream = new ReadableStream({
13+
start(controller) {
14+
controller.enqueue(new TextEncoder().encode('stream start'))
15+
streamChunk = () => {
16+
controller.enqueue(new TextEncoder().encode('stream end'))
17+
controller.close()
18+
}
19+
},
20+
})
21+
22+
mockFetch.mockResolvedValue(new Response(readableStream))
23+
24+
const staticGenerationAsyncStorage =
25+
new AsyncLocalStorage<StaticGenerationStore>()
26+
27+
const patchedFetch = createPatchedFetcher(mockFetch, {
28+
// requestAsyncStorage does not need to provide a store for this test.
29+
requestAsyncStorage: new AsyncLocalStorage<RequestStore>(),
30+
staticGenerationAsyncStorage,
31+
})
32+
33+
let resolveIncrementalCacheSet: () => void
34+
35+
const incrementalCacheSetPromise = new Promise<void>((resolve) => {
36+
resolveIncrementalCacheSet = resolve
37+
})
38+
39+
const incrementalCache = {
40+
get: jest.fn(),
41+
set: jest.fn(() => resolveIncrementalCacheSet()),
42+
generateCacheKey: jest.fn(() => 'test-cache-key'),
43+
lock: jest.fn(),
44+
} as unknown as IncrementalCache
45+
46+
// We only need to provide a few of the StaticGenerationStore properties.
47+
const staticGenerationStore: Partial<StaticGenerationStore> = {
48+
page: '/',
49+
route: '/',
50+
incrementalCache,
51+
}
52+
53+
await staticGenerationAsyncStorage.run(
54+
staticGenerationStore as StaticGenerationStore,
55+
async () => {
56+
const response = await patchedFetch('https://example.com', {
57+
cache: 'force-cache',
58+
})
59+
60+
if (!response.body) {
61+
throw new Error(`Response body is ${JSON.stringify(response.body)}.`)
62+
}
63+
64+
const reader = response.body.getReader()
65+
let result = await reader.read()
66+
const textDecoder = new TextDecoder()
67+
expect(textDecoder.decode(result.value)).toBe('stream start')
68+
streamChunk()
69+
result = await reader.read()
70+
expect(textDecoder.decode(result.value)).toBe('stream end')
71+
72+
await incrementalCacheSetPromise
73+
74+
expect(incrementalCache.set).toHaveBeenCalledWith(
75+
'test-cache-key',
76+
{
77+
data: {
78+
body: btoa('stream startstream end'),
79+
headers: {},
80+
status: 200,
81+
url: '', // the mocked response does not have a URL
82+
},
83+
kind: 'FETCH',
84+
revalidate: 31536000, // default of one year
85+
},
86+
{
87+
fetchCache: true,
88+
fetchIdx: 1,
89+
fetchUrl: 'https://example.com/',
90+
revalidate: false,
91+
tags: [],
92+
}
93+
)
94+
}
95+
)
96+
// Setting a lower timeout than default, because the test will fail with a
97+
// timeout when we regress and buffer the response.
98+
}, 1000)
99+
})

packages/next/src/server/lib/patch-fetch.ts

Lines changed: 46 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ interface PatchableModule {
200200
requestAsyncStorage: RequestAsyncStorage
201201
}
202202

203-
function createPatchedFetcher(
203+
export function createPatchedFetcher(
204204
originFetch: Fetcher,
205205
{ staticGenerationAsyncStorage, requestAsyncStorage }: PatchableModule
206206
): PatchedFetcher {
@@ -487,17 +487,17 @@ function createPatchedFetcher(
487487
finalRevalidate === false
488488

489489
let cacheKey: string | undefined
490+
const { incrementalCache } = staticGenerationStore
490491

491492
if (
492-
staticGenerationStore.incrementalCache &&
493+
incrementalCache &&
493494
(isCacheableRevalidate || requestStore?.serverComponentsHmrCache)
494495
) {
495496
try {
496-
cacheKey =
497-
await staticGenerationStore.incrementalCache.generateCacheKey(
498-
fetchUrl,
499-
isRequestInput ? (input as RequestInit) : init
500-
)
497+
cacheKey = await incrementalCache.generateCacheKey(
498+
fetchUrl,
499+
isRequestInput ? (input as RequestInit) : init
500+
)
501501
} catch (err) {
502502
console.error(`Failed to generate cache key for`, input)
503503
}
@@ -575,52 +575,49 @@ function createPatchedFetcher(
575575
}
576576
if (
577577
res.status === 200 &&
578-
staticGenerationStore.incrementalCache &&
578+
incrementalCache &&
579579
cacheKey &&
580580
(isCacheableRevalidate || requestStore?.serverComponentsHmrCache)
581581
) {
582-
const bodyBuffer = Buffer.from(await res.arrayBuffer())
583-
584-
const cachedFetchData = {
585-
headers: Object.fromEntries(res.headers.entries()),
586-
body: bodyBuffer.toString('base64'),
587-
status: res.status,
588-
url: res.url,
589-
}
590-
591-
requestStore?.serverComponentsHmrCache?.set(
592-
cacheKey,
593-
cachedFetchData
594-
)
582+
res
583+
.clone()
584+
.arrayBuffer()
585+
.then(async (arrayBuffer) => {
586+
const bodyBuffer = Buffer.from(arrayBuffer)
587+
588+
const cachedFetchData = {
589+
headers: Object.fromEntries(res.headers.entries()),
590+
body: bodyBuffer.toString('base64'),
591+
status: res.status,
592+
url: res.url,
593+
}
595594

596-
if (isCacheableRevalidate) {
597-
try {
598-
await staticGenerationStore.incrementalCache.set(
595+
requestStore?.serverComponentsHmrCache?.set(
599596
cacheKey,
600-
{
601-
kind: 'FETCH',
602-
data: cachedFetchData,
603-
revalidate: normalizedRevalidate,
604-
},
605-
{
606-
fetchCache: true,
607-
revalidate: finalRevalidate,
608-
fetchUrl,
609-
fetchIdx,
610-
tags,
611-
}
597+
cachedFetchData
612598
)
613-
} catch (err) {
614-
console.warn(`Failed to set fetch cache`, input, err)
615-
}
616-
}
617599

618-
const response = new Response(bodyBuffer, {
619-
headers: new Headers(res.headers),
620-
status: res.status,
621-
})
622-
Object.defineProperty(response, 'url', { value: res.url })
623-
return response
600+
if (isCacheableRevalidate) {
601+
await incrementalCache.set(
602+
cacheKey,
603+
{
604+
kind: 'FETCH',
605+
data: cachedFetchData,
606+
revalidate: normalizedRevalidate,
607+
},
608+
{
609+
fetchCache: true,
610+
revalidate: finalRevalidate,
611+
fetchUrl,
612+
fetchIdx,
613+
tags,
614+
}
615+
)
616+
}
617+
})
618+
.catch((error) =>
619+
console.warn(`Failed to set fetch cache`, input, error)
620+
)
624621
}
625622
return res
626623
})
@@ -631,7 +628,7 @@ function createPatchedFetcher(
631628
let isForegroundRevalidate = false
632629
let isHmrRefreshCache = false
633630

634-
if (cacheKey && staticGenerationStore.incrementalCache) {
631+
if (cacheKey && incrementalCache) {
635632
let cachedFetchData: CachedFetchData | undefined
636633

637634
if (
@@ -645,12 +642,11 @@ function createPatchedFetcher(
645642
}
646643

647644
if (isCacheableRevalidate && !cachedFetchData) {
648-
handleUnlock =
649-
await staticGenerationStore.incrementalCache.lock(cacheKey)
645+
handleUnlock = await incrementalCache.lock(cacheKey)
650646

651647
const entry = staticGenerationStore.isOnDemandRevalidate
652648
? null
653-
: await staticGenerationStore.incrementalCache.get(cacheKey, {
649+
: await incrementalCache.get(cacheKey, {
654650
kindHint: 'fetch',
655651
revalidate: finalRevalidate,
656652
fetchUrl,

0 commit comments

Comments
 (0)