Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: address memory leak caused by too many event listeners on AbortSignal #49

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/turbo-stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ test("should encode and decode NaN", async () => {
expect(output).toEqual(input);
});

test("should encode and decode Number.NaN", async () => {
const input = Number.NaN;
const output = await quickDecode(encode(input));
expect(output).toEqual(input);
});

test("should encode and decode Infinity", async () => {
const input = Infinity;
const output = await quickDecode(encode(input));
Expand Down Expand Up @@ -546,3 +552,23 @@ test("should encode and decode objects with multiple promises rejecting to the s
);
expect(Array.from(encoded.matchAll(/"baz"/g))).toHaveLength(1);
});

test("should allow many nested promises without a memory leak", async () => {
const depth = 2000;
type Nested = { i: number; next: Promise<Nested> | null };
const input: Nested = { i: 0, next: null };
let current: Nested = input;
for (let i = 1; i < depth; i++) {
const next = { i, next: null };
current.next = Promise.resolve(next);
current = next;
}

const decoded = await decode(encode(input));
let currentDecoded: Nested = decoded.value as Nested;
while (currentDecoded.next) {
currentDecoded = await currentDecoded.next;
}
expect(currentDecoded.i).toBe(depth - 1);
await decoded.done;
});
191 changes: 100 additions & 91 deletions src/turbo-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async function decodeInitial(
throw new SyntaxError();
}

let line;
let line: unknown;
try {
line = JSON.parse(read.value);
} catch (reason) {
Expand Down Expand Up @@ -95,7 +95,7 @@ async function decodeDeferred(
throw new Error(`Deferred ID ${deferredId} not found in stream`);
}
const lineData = line.slice(colonIndex + 1);
let jsonLine;
let jsonLine: unknown;
try {
jsonLine = JSON.parse(lineData);
} catch (reason) {
Expand All @@ -115,7 +115,7 @@ async function decodeDeferred(
throw new Error(`Deferred ID ${deferredId} not found in stream`);
}
const lineData = line.slice(colonIndex + 1);
let jsonLine;
let jsonLine: unknown;
try {
jsonLine = JSON.parse(lineData);
} catch (reason) {
Expand Down Expand Up @@ -169,82 +169,106 @@ export function encode(
}

const seenPromises = new WeakSet<Promise<unknown>>();
while (Object.keys(encoder.deferred).length > 0) {
for (const [deferredId, deferred] of Object.entries(encoder.deferred)) {
if (seenPromises.has(deferred)) continue;
seenPromises.add(
(encoder.deferred[Number(deferredId)] = raceSignal(
deferred,
encoder.signal
)
.then(
(resolved) => {
const id = flatten.call(encoder, resolved);
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
)
);
encoder.index++;
lastSentIndex++;
} else if (id < 0) {
controller.enqueue(
textEncoder.encode(`${TYPE_PROMISE}${deferredId}:${id}\n`)
);
} else {
const values = encoder.stringified
.slice(lastSentIndex + 1)
.join(",");
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:[${values}]\n`
)
);
lastSentIndex = encoder.stringified.length - 1;
}
},
(reason) => {
if (
!reason ||
typeof reason !== "object" ||
!(reason instanceof Error)
) {
reason = new Error("An unknown error occurred");
}
if (Object.keys(encoder.deferred).length) {
let raceDone!: () => void;
const racePromise = new Promise<never>((resolve, reject) => {
raceDone = resolve as () => void;
if (signal) {
const rejectPromise = () =>
reject(signal.reason || new Error("Signal was aborted."));
if (signal.aborted) {
rejectPromise();
} else {
signal.addEventListener("abort", (event) => {
rejectPromise();
});
}
}
});
while (Object.keys(encoder.deferred).length > 0) {
for (const [deferredId, deferred] of Object.entries(
encoder.deferred
)) {
if (seenPromises.has(deferred)) continue;
seenPromises.add(
// biome-ignore lint/suspicious/noAssignInExpressions: <explanation>
(encoder.deferred[Number(deferredId)] = Promise.race([
racePromise,
deferred,
])
.then(
(resolved) => {
const id = flatten.call(encoder, resolved);
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
)
);
encoder.index++;
lastSentIndex++;
} else if (id < 0) {
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:${id}\n`
)
);
} else {
const values = encoder.stringified
.slice(lastSentIndex + 1)
.join(",");
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:[${values}]\n`
)
);
lastSentIndex = encoder.stringified.length - 1;
}
},
(reason) => {
if (
!reason ||
typeof reason !== "object" ||
!(reason instanceof Error)
) {
reason = new Error("An unknown error occurred");
}

const id = flatten.call(encoder, reason);
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
)
);
encoder.index++;
lastSentIndex++;
} else if (id < 0) {
controller.enqueue(
textEncoder.encode(`${TYPE_ERROR}${deferredId}:${id}\n`)
);
} else {
const values = encoder.stringified
.slice(lastSentIndex + 1)
.join(",");
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[${values}]\n`
)
);
lastSentIndex = encoder.stringified.length - 1;
const id = flatten.call(encoder, reason);
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
)
);
encoder.index++;
lastSentIndex++;
} else if (id < 0) {
controller.enqueue(
textEncoder.encode(`${TYPE_ERROR}${deferredId}:${id}\n`)
);
} else {
const values = encoder.stringified
.slice(lastSentIndex + 1)
.join(",");
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[${values}]\n`
)
);
lastSentIndex = encoder.stringified.length - 1;
}
}
}
)
.finally(() => {
delete encoder.deferred[Number(deferredId)];
}))
);
)
.finally(() => {
delete encoder.deferred[Number(deferredId)];
}))
);
}
await Promise.race(Object.values(encoder.deferred));
}
await Promise.race(Object.values(encoder.deferred));

raceDone();
}
await Promise.all(Object.values(encoder.deferred));

Expand All @@ -254,18 +278,3 @@ export function encode(

return readable;
}

function raceSignal(promise: Promise<unknown>, signal?: AbortSignal) {
if (!signal) return promise;
if (signal.aborted)
return Promise.reject(signal.reason || new Error("Signal was aborted."));

const abort = new Promise<unknown>((resolve, reject) => {
signal.addEventListener("abort", (event) => {
reject(signal.reason || new Error("Signal was aborted."));
});
promise.then(resolve).catch(reject);
});
abort.catch(() => {});
return Promise.race([abort, promise]);
}
Loading