Skip to content

Commit 41f2bfa

Browse files
committed
store blobs rather than texts
1 parent 2d435bf commit 41f2bfa

File tree

4 files changed

+87
-53
lines changed

4 files changed

+87
-53
lines changed

packages/open-next/src/adapters/composable-cache.ts

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
1-
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
1+
import type {
2+
ComposableCacheEntry,
3+
ComposableCacheHandler,
4+
StoredComposableCacheEntry,
5+
} from "types/cache";
26
import { writeTags } from "utils/cache";
3-
import { fromReadableStream, toReadableStream } from "utils/stream";
47
import { debug } from "./logger";
58

6-
const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();
9+
const pendingWritePromiseMap = new Map<
10+
string,
11+
Promise<StoredComposableCacheEntry>
12+
>();
713

814
export default {
915
async get(cacheKey: string) {
1016
try {
1117
// We first check if we have a pending write for this cache key
1218
// If we do, we return the pending promise instead of fetching the cache
1319
const stored = pendingWritePromiseMap.get(cacheKey);
14-
if (stored) return stored;
20+
if (stored) {
21+
return stored.then((val) => ({
22+
...val,
23+
value: val.value.stream(),
24+
}));
25+
}
1526

1627
const result = await globalThis.incrementalCache.get(
1728
cacheKey,
@@ -50,7 +61,7 @@ export default {
5061

5162
return {
5263
...result.value,
53-
value: toReadableStream(result.value.value),
64+
value: result.value.value.stream(),
5465
};
5566
} catch (e) {
5667
debug("Cannot read composable cache entry");
@@ -59,39 +70,35 @@ export default {
5970
},
6071

6172
async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
62-
const teedPromise = pendingEntry.then((entry) => {
63-
// Optimization: We avoid consuming and stringifying the stream here,
64-
// because it creates double copies just to be discarded when this function
65-
// ends. This avoids unnecessary memory usage, and reduces GC pressure.
66-
const [stream1, stream2] = entry.value.tee();
67-
return [
68-
{ ...entry, value: stream1 },
69-
{ ...entry, value: stream2 },
70-
] as const;
73+
// Convert ReadableStream to Blob first
74+
const blobPromise = pendingEntry.then(async (entry) => {
75+
const reader = entry.value.getReader();
76+
const chunks: Uint8Array[] = [];
77+
let result: ReadableStreamReadResult<Uint8Array>;
78+
while (!(result = await reader.read()).done) {
79+
chunks.push(result.value);
80+
}
81+
reader.releaseLock();
82+
return { ...entry, value: new Blob(chunks) };
7183
});
7284

73-
pendingWritePromiseMap.set(
74-
cacheKey,
75-
teedPromise.then(([entry]) => entry),
76-
);
85+
// Store a stream from the blob in the pending map for concurrent get() calls
86+
pendingWritePromiseMap.set(cacheKey, blobPromise);
7787

78-
const [, entryForStorage] = await teedPromise.finally(() => {
88+
const entryWithBlob = await blobPromise.finally(() => {
7989
pendingWritePromiseMap.delete(cacheKey);
8090
});
8191

8292
await globalThis.incrementalCache.set(
8393
cacheKey,
84-
{
85-
...entryForStorage,
86-
value: await fromReadableStream(entryForStorage.value),
87-
},
94+
entryWithBlob,
8895
"composable",
8996
);
9097

9198
if (globalThis.tagCache.mode === "original") {
9299
const storedTags = await globalThis.tagCache.getByPath(cacheKey);
93100
const tagsToWrite = [];
94-
for (const tag of entryForStorage.tags) {
101+
for (const tag of entryWithBlob.tags) {
95102
if (!storedTags.includes(tag)) {
96103
tagsToWrite.push({ tag, path: cacheKey });
97104
}

packages/open-next/src/types/cache.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import type { ReadableStream } from "node:stream/web";
2-
31
interface CachedFetchValue {
42
kind: "FETCH";
53
data: {
@@ -152,7 +150,7 @@ export interface ComposableCacheEntry {
152150
}
153151

154152
export type StoredComposableCacheEntry = Omit<ComposableCacheEntry, "value"> & {
155-
value: string;
153+
value: Blob;
156154
};
157155

158156
export interface ComposableCacheHandler {

packages/open-next/src/utils/stream.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@ export async function fromReadableStream(
2020
return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8");
2121
}
2222

23-
// Use Buffer.concat which is more efficient than manual allocation and copy
24-
// It handles the allocation and copy in optimized native code
25-
const buffer = Buffer.concat(chunks, totalLength);
23+
// Pre-allocate buffer with exact size to avoid reallocation
24+
const buffer = Buffer.alloc(totalLength);
25+
let offset = 0;
26+
for (const chunk of chunks) {
27+
buffer.set(chunk, offset);
28+
offset += chunk.length;
29+
}
2630

2731
return buffer.toString(base64 ? "base64" : "utf8");
2832
}

packages/tests-unit/tests/adapters/composable-cache.test.ts

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
11
import ComposableCache from "@opennextjs/aws/adapters/composable-cache";
2-
import {
3-
fromReadableStream,
4-
toReadableStream,
5-
} from "@opennextjs/aws/utils/stream";
62
import { vi } from "vitest";
73

84
describe("Composable cache handler", () => {
@@ -19,7 +15,7 @@ describe("Composable cache handler", () => {
1915
timestamp: Date.now(),
2016
expire: Date.now() + 1000,
2117
revalidate: 3600,
22-
value: "test-value",
18+
value: new Blob(["test-value"]),
2319
},
2420
lastModified: Date.now(),
2521
}),
@@ -132,7 +128,7 @@ describe("Composable cache handler", () => {
132128
type: "route",
133129
body: "{}",
134130
tags: [],
135-
value: "test-value",
131+
value: new Blob(["test-value"]),
136132
},
137133
lastModified: Date.now(),
138134
});
@@ -185,7 +181,7 @@ describe("Composable cache handler", () => {
185181

186182
it("should return pending write promise if available", async () => {
187183
const pendingEntry = Promise.resolve({
188-
value: toReadableStream("pending-value"),
184+
value: new Blob(["pending-value"]).stream(),
189185
tags: ["tag1"],
190186
stale: 0,
191187
timestamp: Date.now(),
@@ -214,8 +210,9 @@ describe("Composable cache handler", () => {
214210

215211
it("should set cache entry and handle tags in original mode", async () => {
216212
tagCache.mode = "original";
213+
const blob = new Blob(["test-value"]);
217214
const entry = {
218-
value: toReadableStream("test-value"),
215+
value: blob.stream(),
219216
tags: ["tag1", "tag2"],
220217
stale: 0,
221218
timestamp: Date.now(),
@@ -229,7 +226,7 @@ describe("Composable cache handler", () => {
229226
"test-key",
230227
expect.objectContaining({
231228
tags: ["tag1", "tag2"],
232-
value: "test-value",
229+
value: expect.any(Blob),
233230
}),
234231
"composable",
235232
);
@@ -241,7 +238,7 @@ describe("Composable cache handler", () => {
241238
tagCache.getByPath.mockResolvedValueOnce(["tag1"]);
242239

243240
const entry = {
244-
value: toReadableStream("test-value"),
241+
value: new Blob(["test-value"]).stream(),
245242
tags: ["tag1", "tag2", "tag3"],
246243
stale: 0,
247244
timestamp: Date.now(),
@@ -262,7 +259,7 @@ describe("Composable cache handler", () => {
262259
tagCache.getByPath.mockResolvedValueOnce(["tag1", "tag2"]);
263260

264261
const entry = {
265-
value: toReadableStream("test-value"),
262+
value: new Blob(["test-value"]).stream(),
266263
tags: ["tag1", "tag2"],
267264
stale: 0,
268265
timestamp: Date.now(),
@@ -279,7 +276,7 @@ describe("Composable cache handler", () => {
279276
tagCache.mode = "nextMode";
280277

281278
const entry = {
282-
value: toReadableStream("test-value"),
279+
value: new Blob(["test-value"]).stream(),
283280
tags: ["tag1", "tag2"],
284281
stale: 0,
285282
timestamp: Date.now(),
@@ -293,9 +290,10 @@ describe("Composable cache handler", () => {
293290
expect(tagCache.writeTags).not.toHaveBeenCalled();
294291
});
295292

296-
it("should convert ReadableStream to string", async () => {
293+
it("should store Blob directly", async () => {
294+
const blob = new Blob(["test-content"]);
297295
const entry = {
298-
value: toReadableStream("test-content"),
296+
value: blob.stream(),
299297
tags: ["tag1"],
300298
stale: 0,
301299
timestamp: Date.now(),
@@ -308,7 +306,7 @@ describe("Composable cache handler", () => {
308306
expect(incrementalCache.set).toHaveBeenCalledWith(
309307
"test-key",
310308
expect.objectContaining({
311-
value: "test-content",
309+
value: expect.any(Blob),
312310
}),
313311
"composable",
314312
);
@@ -437,8 +435,9 @@ describe("Composable cache handler", () => {
437435
describe("integration tests", () => {
438436
it("should handle complete cache lifecycle", async () => {
439437
// Set a cache entry
438+
const blob = new Blob(["integration-test"]);
440439
const entry = {
441-
value: toReadableStream("integration-test"),
440+
value: blob.stream(),
442441
tags: ["integration-tag"],
443442
stale: 0,
444443
timestamp: Date.now(),
@@ -452,7 +451,7 @@ describe("Composable cache handler", () => {
452451
expect(incrementalCache.set).toHaveBeenCalledWith(
453452
"integration-key",
454453
expect.objectContaining({
455-
value: "integration-test",
454+
value: expect.any(Blob),
456455
tags: ["integration-tag"],
457456
}),
458457
"composable",
@@ -462,7 +461,7 @@ describe("Composable cache handler", () => {
462461
incrementalCache.get.mockResolvedValueOnce({
463462
value: {
464463
...entry,
465-
value: "integration-test",
464+
value: blob,
466465
},
467466
lastModified: Date.now(),
468467
});
@@ -474,13 +473,21 @@ describe("Composable cache handler", () => {
474473
expect(result?.tags).toEqual(["integration-tag"]);
475474

476475
// Convert the stream back to verify content
477-
const content = await fromReadableStream(result!.value);
476+
const reader = result!.value.getReader();
477+
const chunks: Uint8Array[] = [];
478+
let readResult: ReadableStreamReadResult<Uint8Array>;
479+
while (!(readResult = await reader.read()).done) {
480+
chunks.push(readResult.value);
481+
}
482+
const content = new TextDecoder().decode(
483+
new Uint8Array(chunks.flatMap((c) => Array.from(c))),
484+
);
478485
expect(content).toBe("integration-test");
479486
});
480487

481488
it("should handle concurrent get/set operations", async () => {
482489
const entry1 = {
483-
value: toReadableStream("concurrent-1"),
490+
value: new Blob(["concurrent-1"]).stream(),
484491
tags: ["tag1"],
485492
stale: 0,
486493
timestamp: Date.now(),
@@ -489,7 +496,7 @@ describe("Composable cache handler", () => {
489496
};
490497

491498
const entry2 = {
492-
value: toReadableStream("concurrent-2"),
499+
value: new Blob(["concurrent-2"]).stream(),
493500
tags: ["tag2"],
494501
stale: 0,
495502
timestamp: Date.now(),
@@ -513,10 +520,28 @@ describe("Composable cache handler", () => {
513520
expect(results[2]).toBeDefined();
514521
expect(results[3]).toBeDefined();
515522

516-
const content1 = await fromReadableStream(results[2]!.value);
523+
// Convert stream 1 to text
524+
const reader1 = results[2]!.value.getReader();
525+
const chunks1: Uint8Array[] = [];
526+
let readResult1: ReadableStreamReadResult<Uint8Array>;
527+
while (!(readResult1 = await reader1.read()).done) {
528+
chunks1.push(readResult1.value);
529+
}
530+
const content1 = new TextDecoder().decode(
531+
new Uint8Array(chunks1.flatMap((c) => Array.from(c))),
532+
);
517533
expect(content1).toBe("concurrent-1");
518534

519-
const content2 = await fromReadableStream(results[3]!.value);
535+
// Convert stream 2 to text
536+
const reader2 = results[3]!.value.getReader();
537+
const chunks2: Uint8Array[] = [];
538+
let readResult2: ReadableStreamReadResult<Uint8Array>;
539+
while (!(readResult2 = await reader2.read()).done) {
540+
chunks2.push(readResult2.value);
541+
}
542+
const content2 = new TextDecoder().decode(
543+
new Uint8Array(chunks2.flatMap((c) => Array.from(c))),
544+
);
520545
expect(content2).toBe("concurrent-2");
521546
});
522547
});

0 commit comments

Comments
 (0)