Skip to content

Commit 0b739c5

Browse files
soapprojectlforst
andauthored
fix(fetch): Fix memory leak when handling endless streaming (#13809)
Co-authored-by: Luca Forstner <luca.forstner@sentry.io>
1 parent febdfc6 commit 0b739c5

File tree

2 files changed

+48
-31
lines changed
  • dev-packages/e2e-tests/test-applications/react-router-6/tests
  • packages/utils/src/instrument

2 files changed

+48
-31
lines changed

dev-packages/e2e-tests/test-applications/react-router-6/tests/sse.test.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ test('Waits for sse streaming when sse has been explicitly aborted', async ({ pa
4545
await fetchButton.click();
4646

4747
const rootSpan = await transactionPromise;
48-
console.log(JSON.stringify(rootSpan, null, 2));
4948
const sseFetchCall = rootSpan.spans?.filter(span => span.description === 'sse fetch call')[0] as SpanJSON;
5049
const httpGet = rootSpan.spans?.filter(span => span.description === 'GET http://localhost:8080/sse')[0] as SpanJSON;
5150

@@ -71,7 +70,7 @@ test('Waits for sse streaming when sse has been explicitly aborted', async ({ pa
7170
expect(consoleBreadcrumb?.message).toBe('Could not fetch sse AbortError: BodyStreamBuffer was aborted');
7271
});
7372

74-
test('Aborts when stream takes longer than 5s', async ({ page }) => {
73+
test('Aborts when stream takes longer than 5s, by not updating the span duration', async ({ page }) => {
7574
await page.goto('/sse');
7675

7776
const transactionPromise = waitForTransaction('react-router-6', async transactionEvent => {
@@ -102,5 +101,5 @@ test('Aborts when stream takes longer than 5s', async ({ page }) => {
102101
const resolveBodyDuration = Math.round((httpGet.timestamp as number) - httpGet.start_timestamp);
103102

104103
expect(resolveDuration).toBe(0);
105-
expect(resolveBodyDuration).toBe(7);
104+
expect(resolveBodyDuration).toBe(0);
106105
});

packages/utils/src/instrument/fetch.ts

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -116,40 +116,57 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat
116116
}
117117

118118
async function resolveResponse(res: Response | undefined, onFinishedResolving: () => void): Promise<void> {
119-
if (res && res.body && res.body.getReader) {
120-
const responseReader = res.body.getReader();
121-
122-
// eslint-disable-next-line no-inner-declarations
123-
async function consumeChunks({ done }: { done: boolean }): Promise<void> {
124-
if (!done) {
125-
try {
126-
// abort reading if read op takes more than 5s
127-
const result = await Promise.race([
128-
responseReader.read(),
129-
new Promise<{ done: boolean }>(res => {
130-
setTimeout(() => {
131-
res({ done: true });
132-
}, 5000);
133-
}),
134-
]);
135-
await consumeChunks(result);
136-
} catch (error) {
137-
// handle error if needed
119+
if (res && res.body) {
120+
const body = res.body;
121+
const responseReader = body.getReader();
122+
123+
// Define a maximum duration after which we just cancel
124+
const maxFetchDurationTimeout = setTimeout(
125+
() => {
126+
body.cancel().then(null, () => {
127+
// noop
128+
});
129+
},
130+
90 * 1000, // 90s
131+
);
132+
133+
let readingActive = true;
134+
while (readingActive) {
135+
let chunkTimeout;
136+
try {
137+
// abort reading if read op takes more than 5s
138+
chunkTimeout = setTimeout(() => {
139+
body.cancel().then(null, () => {
140+
// noop on error
141+
});
142+
}, 5000);
143+
144+
// This .read() call will reject/throw when we abort due to timeouts through `body.cancel()`
145+
const { done } = await responseReader.read();
146+
147+
clearTimeout(chunkTimeout);
148+
149+
if (done) {
150+
onFinishedResolving();
151+
readingActive = false;
138152
}
139-
} else {
140-
return Promise.resolve();
153+
} catch (error) {
154+
readingActive = false;
155+
} finally {
156+
clearTimeout(chunkTimeout);
141157
}
142158
}
143159

144-
return responseReader
145-
.read()
146-
.then(consumeChunks)
147-
.then(onFinishedResolving)
148-
.catch(() => undefined);
160+
clearTimeout(maxFetchDurationTimeout);
161+
162+
responseReader.releaseLock();
163+
body.cancel().then(null, () => {
164+
// noop on error
165+
});
149166
}
150167
}
151168

152-
async function streamHandler(response: Response): Promise<void> {
169+
function streamHandler(response: Response): void {
153170
// clone response for awaiting stream
154171
let clonedResponseForResolving: Response;
155172
try {
@@ -158,7 +175,8 @@ async function streamHandler(response: Response): Promise<void> {
158175
return;
159176
}
160177

161-
await resolveResponse(clonedResponseForResolving, () => {
178+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
179+
resolveResponse(clonedResponseForResolving, () => {
162180
triggerHandlers('fetch-body-resolved', {
163181
endTimestamp: timestampInSeconds() * 1000,
164182
response,

0 commit comments

Comments
 (0)