Skip to content

Commit

Permalink
fix: address memory leak caused by too many event listeners on AbortS…
Browse files Browse the repository at this point in the history
…ignal (#49)
  • Loading branch information
jacob-ebey authored Sep 11, 2024
1 parent c1e5ef6 commit 628504e
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 91 deletions.
26 changes: 26 additions & 0 deletions src/turbo-stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ test("should encode and decode NaN", async () => {
expect(output).toEqual(input);
});

test("should encode and decode Number.NaN", async () => {
const input = Number.NaN;
const output = await quickDecode(encode(input));
expect(output).toEqual(input);
});

test("should encode and decode Infinity", async () => {
const input = Infinity;
const output = await quickDecode(encode(input));
Expand Down Expand Up @@ -546,3 +552,23 @@ test("should encode and decode objects with multiple promises rejecting to the s
);
expect(Array.from(encoded.matchAll(/"baz"/g))).toHaveLength(1);
});

test("should allow many nested promises without a memory leak", async () => {
const depth = 2000;
type Nested = { i: number; next: Promise<Nested> | null };
const input: Nested = { i: 0, next: null };
let current: Nested = input;
for (let i = 1; i < depth; i++) {
const next = { i, next: null };
current.next = Promise.resolve(next);
current = next;
}

const decoded = await decode(encode(input));
let currentDecoded: Nested = decoded.value as Nested;
while (currentDecoded.next) {
currentDecoded = await currentDecoded.next;
}
expect(currentDecoded.i).toBe(depth - 1);
await decoded.done;
});
191 changes: 100 additions & 91 deletions src/turbo-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async function decodeInitial(
throw new SyntaxError();
}

let line;
let line: unknown;
try {
line = JSON.parse(read.value);
} catch (reason) {
Expand Down Expand Up @@ -95,7 +95,7 @@ async function decodeDeferred(
throw new Error(`Deferred ID ${deferredId} not found in stream`);
}
const lineData = line.slice(colonIndex + 1);
let jsonLine;
let jsonLine: unknown;
try {
jsonLine = JSON.parse(lineData);
} catch (reason) {
Expand All @@ -115,7 +115,7 @@ async function decodeDeferred(
throw new Error(`Deferred ID ${deferredId} not found in stream`);
}
const lineData = line.slice(colonIndex + 1);
let jsonLine;
let jsonLine: unknown;
try {
jsonLine = JSON.parse(lineData);
} catch (reason) {
Expand Down Expand Up @@ -169,82 +169,106 @@ export function encode(
}

const seenPromises = new WeakSet<Promise<unknown>>();
while (Object.keys(encoder.deferred).length > 0) {
for (const [deferredId, deferred] of Object.entries(encoder.deferred)) {
if (seenPromises.has(deferred)) continue;
seenPromises.add(
(encoder.deferred[Number(deferredId)] = raceSignal(
deferred,
encoder.signal
)
.then(
(resolved) => {
const id = flatten.call(encoder, resolved);
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
)
);
encoder.index++;
lastSentIndex++;
} else if (id < 0) {
controller.enqueue(
textEncoder.encode(`${TYPE_PROMISE}${deferredId}:${id}\n`)
);
} else {
const values = encoder.stringified
.slice(lastSentIndex + 1)
.join(",");
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:[${values}]\n`
)
);
lastSentIndex = encoder.stringified.length - 1;
}
},
(reason) => {
if (
!reason ||
typeof reason !== "object" ||
!(reason instanceof Error)
) {
reason = new Error("An unknown error occurred");
}
if (Object.keys(encoder.deferred).length) {
let raceDone!: () => void;
const racePromise = new Promise<never>((resolve, reject) => {
raceDone = resolve as () => void;
if (signal) {
const rejectPromise = () =>
reject(signal.reason || new Error("Signal was aborted."));
if (signal.aborted) {
rejectPromise();
} else {
signal.addEventListener("abort", (event) => {
rejectPromise();
});
}
}
});
while (Object.keys(encoder.deferred).length > 0) {
for (const [deferredId, deferred] of Object.entries(
encoder.deferred
)) {
if (seenPromises.has(deferred)) continue;
seenPromises.add(
// biome-ignore lint/suspicious/noAssignInExpressions: <explanation>
(encoder.deferred[Number(deferredId)] = Promise.race([
racePromise,
deferred,
])
.then(
(resolved) => {
const id = flatten.call(encoder, resolved);
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
)
);
encoder.index++;
lastSentIndex++;
} else if (id < 0) {
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:${id}\n`
)
);
} else {
const values = encoder.stringified
.slice(lastSentIndex + 1)
.join(",");
controller.enqueue(
textEncoder.encode(
`${TYPE_PROMISE}${deferredId}:[${values}]\n`
)
);
lastSentIndex = encoder.stringified.length - 1;
}
},
(reason) => {
if (
!reason ||
typeof reason !== "object" ||
!(reason instanceof Error)
) {
reason = new Error("An unknown error occurred");
}

const id = flatten.call(encoder, reason);
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
)
);
encoder.index++;
lastSentIndex++;
} else if (id < 0) {
controller.enqueue(
textEncoder.encode(`${TYPE_ERROR}${deferredId}:${id}\n`)
);
} else {
const values = encoder.stringified
.slice(lastSentIndex + 1)
.join(",");
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[${values}]\n`
)
);
lastSentIndex = encoder.stringified.length - 1;
const id = flatten.call(encoder, reason);
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
)
);
encoder.index++;
lastSentIndex++;
} else if (id < 0) {
controller.enqueue(
textEncoder.encode(`${TYPE_ERROR}${deferredId}:${id}\n`)
);
} else {
const values = encoder.stringified
.slice(lastSentIndex + 1)
.join(",");
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[${values}]\n`
)
);
lastSentIndex = encoder.stringified.length - 1;
}
}
}
)
.finally(() => {
delete encoder.deferred[Number(deferredId)];
}))
);
)
.finally(() => {
delete encoder.deferred[Number(deferredId)];
}))
);
}
await Promise.race(Object.values(encoder.deferred));
}
await Promise.race(Object.values(encoder.deferred));

raceDone();
}
await Promise.all(Object.values(encoder.deferred));

Expand All @@ -254,18 +278,3 @@ export function encode(

return readable;
}

function raceSignal(promise: Promise<unknown>, signal?: AbortSignal) {
if (!signal) return promise;
if (signal.aborted)
return Promise.reject(signal.reason || new Error("Signal was aborted."));

const abort = new Promise<unknown>((resolve, reject) => {
signal.addEventListener("abort", (event) => {
reject(signal.reason || new Error("Signal was aborted."));
});
promise.then(resolve).catch(reject);
});
abort.catch(() => {});
return Promise.race([abort, promise]);
}

0 comments on commit 628504e

Please sign in to comment.