Skip to content

Commit c20a3c4

Browse files
0xbad0c0d3cod1k
andauthored
feat(cloudflare): Flush after waitUntil (#16681)
This PR aim is to send events if all (if any was scheduled) waitUntil promises were finished. Otherwise you may loose events. This fixes: #16559 --------- Co-authored-by: cod1k <cod1k@centro.team>
1 parent 510ba3e commit c20a3c4

File tree

11 files changed

+367
-43
lines changed

11 files changed

+367
-43
lines changed

packages/cloudflare/src/client.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { ClientOptions, Options, ServerRuntimeClientOptions } from '@sentry/core';
22
import { applySdkMetadata, ServerRuntimeClient } from '@sentry/core';
3+
import type { makeFlushLock } from './flush';
34
import type { CloudflareTransportOptions } from './transport';
45

56
/**
@@ -8,24 +9,42 @@ import type { CloudflareTransportOptions } from './transport';
89
* @see CloudflareClientOptions for documentation on configuration options.
910
* @see ServerRuntimeClient for usage documentation.
1011
*/
11-
export class CloudflareClient extends ServerRuntimeClient<CloudflareClientOptions> {
12+
export class CloudflareClient extends ServerRuntimeClient {
13+
private readonly _flushLock: ReturnType<typeof makeFlushLock> | void;
14+
1215
/**
1316
* Creates a new Cloudflare SDK instance.
1417
* @param options Configuration options for this SDK.
1518
*/
1619
public constructor(options: CloudflareClientOptions) {
1720
applySdkMetadata(options, 'cloudflare');
1821
options._metadata = options._metadata || {};
22+
const { flushLock, ...serverOptions } = options;
1923

2024
const clientOptions: ServerRuntimeClientOptions = {
21-
...options,
25+
...serverOptions,
2226
platform: 'javascript',
2327
// TODO: Grab version information
2428
runtime: { name: 'cloudflare' },
2529
// TODO: Add server name
2630
};
2731

2832
super(clientOptions);
33+
this._flushLock = flushLock;
34+
}
35+
36+
/**
37+
* Flushes pending operations and ensures all data is processed.
38+
* If a timeout is provided, the operation will be completed within the specified time limit.
39+
*
40+
* @param {number} [timeout] - Optional timeout in milliseconds to force the completion of the flush operation.
41+
* @return {Promise<boolean>} A promise that resolves to a boolean indicating whether the flush operation was successful.
42+
*/
43+
public async flush(timeout?: number): Promise<boolean> {
44+
if (this._flushLock) {
45+
await this._flushLock.finalize();
46+
}
47+
return super.flush(timeout);
2948
}
3049
}
3150

@@ -56,11 +75,15 @@ interface BaseCloudflareOptions {
5675
*
5776
* @see @sentry/core Options for more information.
5877
*/
59-
export interface CloudflareOptions extends Options<CloudflareTransportOptions>, BaseCloudflareOptions {}
78+
export interface CloudflareOptions extends Options<CloudflareTransportOptions>, BaseCloudflareOptions {
79+
ctx?: ExecutionContext;
80+
}
6081

6182
/**
6283
* Configuration options for the Sentry Cloudflare SDK Client class
6384
*
6485
* @see CloudflareClient for more information.
6586
*/
66-
export interface CloudflareClientOptions extends ClientOptions<CloudflareTransportOptions>, BaseCloudflareOptions {}
87+
export interface CloudflareClientOptions extends ClientOptions<CloudflareTransportOptions>, BaseCloudflareOptions {
88+
flushLock?: ReturnType<typeof makeFlushLock>;
89+
}

packages/cloudflare/src/durableobject.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@ function wrapMethodWithSentry<T extends (...args: any[]) => any>(
4747
// see: https://github.com/getsentry/sentry-javascript/issues/13217
4848
const context = wrapperOptions.context as ExecutionContext | undefined;
4949

50+
const waitUntil = context?.waitUntil?.bind?.(context);
51+
5052
const currentClient = scope.getClient();
5153
if (!currentClient) {
52-
const client = init(wrapperOptions.options);
54+
const client = init({ ...wrapperOptions.options, ctx: context });
5355
scope.setClient(client);
5456
}
5557

@@ -68,7 +70,7 @@ function wrapMethodWithSentry<T extends (...args: any[]) => any>(
6870
});
6971
throw e;
7072
} finally {
71-
context?.waitUntil(flush(2000));
73+
waitUntil?.(flush(2000));
7274
}
7375
}
7476

@@ -92,7 +94,7 @@ function wrapMethodWithSentry<T extends (...args: any[]) => any>(
9294
});
9395
throw e;
9496
} finally {
95-
context?.waitUntil(flush(2000));
97+
waitUntil?.(flush(2000));
9698
}
9799
});
98100
});

packages/cloudflare/src/flush.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import type { ExecutionContext } from '@cloudflare/workers-types';
2+
3+
type FlushLock = {
4+
readonly ready: Promise<void>;
5+
readonly finalize: () => Promise<void>;
6+
};
7+
8+
/**
9+
* Enhances the given execution context by wrapping its `waitUntil` method with a proxy
10+
* to monitor pending tasks, and provides a flusher function to ensure all tasks
11+
* have been completed before executing any subsequent logic.
12+
*
13+
* @param {ExecutionContext} context - The execution context to be enhanced. If no context is provided, the function returns undefined.
14+
* @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined.
15+
*/
16+
export function makeFlushLock(context: ExecutionContext): FlushLock {
17+
let resolveAllDone: () => void = () => undefined;
18+
const allDone = new Promise<void>(res => {
19+
resolveAllDone = res;
20+
});
21+
let pending = 0;
22+
const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil;
23+
context.waitUntil = promise => {
24+
pending++;
25+
return originalWaitUntil(
26+
promise.finally(() => {
27+
if (--pending === 0) resolveAllDone();
28+
}),
29+
);
30+
};
31+
return Object.freeze({
32+
ready: allDone,
33+
finalize: () => {
34+
if (pending === 0) resolveAllDone();
35+
return allDone;
36+
},
37+
});
38+
}

packages/cloudflare/src/handler.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
7474
const [event, env, context] = args;
7575
return withIsolationScope(isolationScope => {
7676
const options = getFinalOptions(optionsCallback(env), env);
77+
const waitUntil = context.waitUntil.bind(context);
7778

78-
const client = init(options);
79+
const client = init({ ...options, ctx: context });
7980
isolationScope.setClient(client);
8081

8182
addCloudResourceContext(isolationScope);
@@ -99,7 +100,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
99100
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
100101
throw e;
101102
} finally {
102-
context.waitUntil(flush(2000));
103+
waitUntil(flush(2000));
103104
}
104105
},
105106
);
@@ -116,8 +117,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
116117
const [emailMessage, env, context] = args;
117118
return withIsolationScope(isolationScope => {
118119
const options = getFinalOptions(optionsCallback(env), env);
120+
const waitUntil = context.waitUntil.bind(context);
119121

120-
const client = init(options);
122+
const client = init({ ...options, ctx: context });
121123
isolationScope.setClient(client);
122124

123125
addCloudResourceContext(isolationScope);
@@ -139,7 +141,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
139141
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
140142
throw e;
141143
} finally {
142-
context.waitUntil(flush(2000));
144+
waitUntil(flush(2000));
143145
}
144146
},
145147
);
@@ -157,8 +159,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
157159

158160
return withIsolationScope(isolationScope => {
159161
const options = getFinalOptions(optionsCallback(env), env);
162+
const waitUntil = context.waitUntil.bind(context);
160163

161-
const client = init(options);
164+
const client = init({ ...options, ctx: context });
162165
isolationScope.setClient(client);
163166

164167
addCloudResourceContext(isolationScope);
@@ -185,7 +188,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
185188
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
186189
throw e;
187190
} finally {
188-
context.waitUntil(flush(2000));
191+
waitUntil(flush(2000));
189192
}
190193
},
191194
);
@@ -204,7 +207,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
204207
return withIsolationScope(async isolationScope => {
205208
const options = getFinalOptions(optionsCallback(env), env);
206209

207-
const client = init(options);
210+
const waitUntil = context.waitUntil.bind(context);
211+
212+
const client = init({ ...options, ctx: context });
208213
isolationScope.setClient(client);
209214

210215
addCloudResourceContext(isolationScope);
@@ -215,7 +220,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
215220
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
216221
throw e;
217222
} finally {
218-
context.waitUntil(flush(2000));
223+
waitUntil(flush(2000));
219224
}
220225
});
221226
},

packages/cloudflare/src/request.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ export function wrapRequestHandler(
3535
// see: https://github.com/getsentry/sentry-javascript/issues/13217
3636
const context = wrapperOptions.context as ExecutionContext | undefined;
3737

38-
const client = init(options);
38+
const waitUntil = context?.waitUntil?.bind?.(context);
39+
40+
const client = init({ ...options, ctx: context });
3941
isolationScope.setClient(client);
4042

4143
const urlObject = parseStringToURLObject(request.url);
@@ -65,7 +67,7 @@ export function wrapRequestHandler(
6567
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
6668
throw e;
6769
} finally {
68-
context?.waitUntil(flush(2000));
70+
waitUntil?.(flush(2000));
6971
}
7072
}
7173

@@ -89,7 +91,7 @@ export function wrapRequestHandler(
8991
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
9092
throw e;
9193
} finally {
92-
context?.waitUntil(flush(2000));
94+
waitUntil?.(flush(2000));
9395
}
9496
},
9597
);

packages/cloudflare/src/sdk.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
} from '@sentry/core';
1313
import type { CloudflareClientOptions, CloudflareOptions } from './client';
1414
import { CloudflareClient } from './client';
15+
import { makeFlushLock } from './flush';
1516
import { fetchIntegration } from './integrations/fetch';
1617
import { setupOpenTelemetryTracer } from './opentelemetry/tracer';
1718
import { makeCloudflareTransport } from './transport';
@@ -44,11 +45,15 @@ export function init(options: CloudflareOptions): CloudflareClient | undefined {
4445
options.defaultIntegrations = getDefaultIntegrations(options);
4546
}
4647

48+
const flushLock = options.ctx ? makeFlushLock(options.ctx) : undefined;
49+
delete options.ctx;
50+
4751
const clientOptions: CloudflareClientOptions = {
4852
...options,
4953
stackParser: stackParserFromStackParserOptions(options.stackParser || defaultStackParser),
5054
integrations: getIntegrationsToSetup(options),
5155
transport: options.transport || makeCloudflareTransport,
56+
flushLock,
5257
};
5358

5459
/**
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import type { ExecutionContext } from '@cloudflare/workers-types';
2+
import * as SentryCore from '@sentry/core';
3+
import { describe, expect, it, onTestFinished, vi } from 'vitest';
4+
import { instrumentDurableObjectWithSentry } from '../src/durableobject';
5+
import { isInstrumented } from '../src/instrument';
6+
7+
describe('durable object', () => {
8+
it('instrumentDurableObjectWithSentry generic functionality', () => {
9+
const options = vi.fn();
10+
const instrumented = instrumentDurableObjectWithSentry(options, vi.fn());
11+
expect(instrumented).toBeTypeOf('function');
12+
expect(() => Reflect.construct(instrumented, [])).not.toThrow();
13+
expect(options).toHaveBeenCalledOnce();
14+
});
15+
it('all available durable object methods are instrumented', () => {
16+
const testClass = vi.fn(() => ({
17+
customMethod: vi.fn(),
18+
fetch: vi.fn(),
19+
alarm: vi.fn(),
20+
webSocketMessage: vi.fn(),
21+
webSocketClose: vi.fn(),
22+
webSocketError: vi.fn(),
23+
}));
24+
const instrumented = instrumentDurableObjectWithSentry(vi.fn(), testClass as any);
25+
const dObject: any = Reflect.construct(instrumented, []);
26+
for (const method of Object.getOwnPropertyNames(dObject)) {
27+
expect(isInstrumented(dObject[method]), `Method ${method} is instrumented`).toBeTruthy();
28+
}
29+
});
30+
it('flush performs after all waitUntil promises are finished', async () => {
31+
vi.useFakeTimers();
32+
onTestFinished(() => {
33+
vi.useRealTimers();
34+
});
35+
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');
36+
const waitUntil = vi.fn();
37+
const testClass = vi.fn(context => ({
38+
fetch: () => {
39+
context.waitUntil(new Promise(res => setTimeout(res)));
40+
return new Response('test');
41+
},
42+
}));
43+
const instrumented = instrumentDurableObjectWithSentry(vi.fn(), testClass as any);
44+
const context = {
45+
waitUntil,
46+
} as unknown as ExecutionContext;
47+
const dObject: any = Reflect.construct(instrumented, [context, {} as any]);
48+
expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow();
49+
expect(flush).not.toBeCalled();
50+
expect(waitUntil).toHaveBeenCalledOnce();
51+
vi.advanceTimersToNextTimer();
52+
await Promise.all(waitUntil.mock.calls.map(([p]) => p));
53+
expect(flush).toBeCalled();
54+
});
55+
});
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { type ExecutionContext } from '@cloudflare/workers-types';
2+
import { describe, expect, it, onTestFinished, vi } from 'vitest';
3+
import { makeFlushLock } from '../src/flush';
4+
5+
describe('Flush buffer test', () => {
6+
const waitUntilPromises: Promise<void>[] = [];
7+
const mockExecutionContext: ExecutionContext = {
8+
waitUntil: vi.fn(prmise => {
9+
waitUntilPromises.push(prmise);
10+
}),
11+
passThroughOnException: vi.fn(),
12+
};
13+
it('should flush buffer immediately if no waitUntil were called', async () => {
14+
const { finalize } = makeFlushLock(mockExecutionContext);
15+
await expect(finalize()).resolves.toBeUndefined();
16+
});
17+
it('should flush buffer only after all waitUntil were finished', async () => {
18+
vi.useFakeTimers();
19+
onTestFinished(() => {
20+
vi.useRealTimers();
21+
});
22+
const task = new Promise(resolve => setTimeout(resolve, 100));
23+
const lock = makeFlushLock(mockExecutionContext);
24+
mockExecutionContext.waitUntil(task);
25+
void lock.finalize();
26+
vi.advanceTimersToNextTimer();
27+
await Promise.all(waitUntilPromises);
28+
await expect(lock.ready).resolves.toBeUndefined();
29+
});
30+
});

0 commit comments

Comments
 (0)