Skip to content

feat(cloudflare): Flush after waitUntil #16681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions packages/cloudflare/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ClientOptions, Options, ServerRuntimeClientOptions } from '@sentry/core';
import { applySdkMetadata, ServerRuntimeClient } from '@sentry/core';
import type { makeFlushLock } from './flush';
import type { CloudflareTransportOptions } from './transport';

/**
Expand All @@ -8,24 +9,42 @@ import type { CloudflareTransportOptions } from './transport';
* @see CloudflareClientOptions for documentation on configuration options.
* @see ServerRuntimeClient for usage documentation.
*/
export class CloudflareClient extends ServerRuntimeClient<CloudflareClientOptions> {
export class CloudflareClient extends ServerRuntimeClient {
private readonly _flushLock: ReturnType<typeof makeFlushLock> | void;

/**
* Creates a new Cloudflare SDK instance.
* @param options Configuration options for this SDK.
*/
public constructor(options: CloudflareClientOptions) {
applySdkMetadata(options, 'cloudflare');
options._metadata = options._metadata || {};
const { flushLock, ...serverOptions } = options;

const clientOptions: ServerRuntimeClientOptions = {
...options,
...serverOptions,
platform: 'javascript',
// TODO: Grab version information
runtime: { name: 'cloudflare' },
// TODO: Add server name
};

super(clientOptions);
this._flushLock = flushLock;
}

/**
* Flushes pending operations and ensures all data is processed.
* If a timeout is provided, the operation will be completed within the specified time limit.
*
* @param {number} [timeout] - Optional timeout in milliseconds to force the completion of the flush operation.
* @return {Promise<boolean>} A promise that resolves to a boolean indicating whether the flush operation was successful.
*/
public async flush(timeout?: number): Promise<boolean> {
if (this._flushLock) {
await this._flushLock.finalize();
}
return super.flush(timeout);
}
}

Expand All @@ -37,11 +56,15 @@ interface BaseCloudflareOptions {}
*
* @see @sentry/core Options for more information.
*/
export interface CloudflareOptions extends Options<CloudflareTransportOptions>, BaseCloudflareOptions {}
export interface CloudflareOptions extends Options<CloudflareTransportOptions>, BaseCloudflareOptions {
ctx?: ExecutionContext;
}

/**
* Configuration options for the Sentry Cloudflare SDK Client class
*
* @see CloudflareClient for more information.
*/
export interface CloudflareClientOptions extends ClientOptions<CloudflareTransportOptions>, BaseCloudflareOptions {}
export interface CloudflareClientOptions extends ClientOptions<CloudflareTransportOptions>, BaseCloudflareOptions {
flushLock?: ReturnType<typeof makeFlushLock>;
}
8 changes: 5 additions & 3 deletions packages/cloudflare/src/durableobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ function wrapMethodWithSentry<T extends (...args: any[]) => any>(
// see: https://github.com/getsentry/sentry-javascript/issues/13217
const context = wrapperOptions.context as ExecutionContext | undefined;

const waitUntil = context?.waitUntil?.bind?.(context);

const currentClient = scope.getClient();
if (!currentClient) {
const client = init(wrapperOptions.options);
const client = init({ ...wrapperOptions.options, ctx: context });
scope.setClient(client);
}

Expand All @@ -68,7 +70,7 @@ function wrapMethodWithSentry<T extends (...args: any[]) => any>(
});
throw e;
} finally {
context?.waitUntil(flush(2000));
waitUntil?.(flush(2000));
}
}

Expand All @@ -92,7 +94,7 @@ function wrapMethodWithSentry<T extends (...args: any[]) => any>(
});
throw e;
} finally {
context?.waitUntil(flush(2000));
waitUntil?.(flush(2000));
}
});
});
Expand Down
38 changes: 38 additions & 0 deletions packages/cloudflare/src/flush.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import type { ExecutionContext } from '@cloudflare/workers-types';

type FlushLock = {
readonly ready: Promise<void>;
readonly finalize: () => Promise<void>;
};

/**
* Enhances the given execution context by wrapping its `waitUntil` method with a proxy
* to monitor pending tasks, and provides a flusher function to ensure all tasks
* have been completed before executing any subsequent logic.
*
* @param {ExecutionContext} context - The execution context to be enhanced. If no context is provided, the function returns undefined.
* @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined.
*/
export function makeFlushLock(context: ExecutionContext): FlushLock {
let resolveAllDone: () => void = () => undefined;
const allDone = new Promise<void>(res => {
resolveAllDone = res;
});
let pending = 0;
const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil;
context.waitUntil = promise => {
pending++;
return originalWaitUntil(
promise.finally(() => {
if (--pending === 0) resolveAllDone();
}),
);
};
return Object.freeze({
ready: allDone,
finalize: () => {
if (pending === 0) resolveAllDone();
return allDone;
},
});
}
21 changes: 13 additions & 8 deletions packages/cloudflare/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
const [event, env, context] = args;
return withIsolationScope(isolationScope => {
const options = getFinalOptions(optionsCallback(env), env);
const waitUntil = context.waitUntil.bind(context);

const client = init(options);
const client = init({ ...options, ctx: context });
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);
Expand All @@ -99,7 +100,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
context.waitUntil(flush(2000));
waitUntil(flush(2000));
}
},
);
Expand All @@ -116,8 +117,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
const [emailMessage, env, context] = args;
return withIsolationScope(isolationScope => {
const options = getFinalOptions(optionsCallback(env), env);
const waitUntil = context.waitUntil.bind(context);

const client = init(options);
const client = init({ ...options, ctx: context });
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);
Expand All @@ -139,7 +141,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
context.waitUntil(flush(2000));
waitUntil(flush(2000));
}
},
);
Expand All @@ -157,8 +159,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM

return withIsolationScope(isolationScope => {
const options = getFinalOptions(optionsCallback(env), env);
const waitUntil = context.waitUntil.bind(context);

const client = init(options);
const client = init({ ...options, ctx: context });
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);
Expand All @@ -185,7 +188,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
context.waitUntil(flush(2000));
waitUntil(flush(2000));
}
},
);
Expand All @@ -204,7 +207,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
return withIsolationScope(async isolationScope => {
const options = getFinalOptions(optionsCallback(env), env);

const client = init(options);
const waitUntil = context.waitUntil.bind(context);

const client = init({ ...options, ctx: context });
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);
Expand All @@ -215,7 +220,7 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
context.waitUntil(flush(2000));
waitUntil(flush(2000));
}
});
},
Expand Down
8 changes: 5 additions & 3 deletions packages/cloudflare/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export function wrapRequestHandler(
// see: https://github.com/getsentry/sentry-javascript/issues/13217
const context = wrapperOptions.context as ExecutionContext | undefined;

const client = init(options);
const waitUntil = context?.waitUntil?.bind?.(context);

const client = init({ ...options, ctx: context });
isolationScope.setClient(client);

const urlObject = parseStringToURLObject(request.url);
Expand Down Expand Up @@ -65,7 +67,7 @@ export function wrapRequestHandler(
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
context?.waitUntil(flush(2000));
waitUntil?.(flush(2000));
}
}

Expand All @@ -89,7 +91,7 @@ export function wrapRequestHandler(
captureException(e, { mechanism: { handled: false, type: 'cloudflare' } });
throw e;
} finally {
context?.waitUntil(flush(2000));
waitUntil?.(flush(2000));
}
},
);
Expand Down
5 changes: 5 additions & 0 deletions packages/cloudflare/src/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from '@sentry/core';
import type { CloudflareClientOptions, CloudflareOptions } from './client';
import { CloudflareClient } from './client';
import { makeFlushLock } from './flush';
import { fetchIntegration } from './integrations/fetch';
import { makeCloudflareTransport } from './transport';
import { defaultStackParser } from './vendor/stacktrace';
Expand Down Expand Up @@ -41,11 +42,15 @@ export function init(options: CloudflareOptions): CloudflareClient | undefined {
options.defaultIntegrations = getDefaultIntegrations(options);
}

const flushLock = options.ctx ? makeFlushLock(options.ctx) : undefined;
delete options.ctx;

const clientOptions: CloudflareClientOptions = {
...options,
stackParser: stackParserFromStackParserOptions(options.stackParser || defaultStackParser),
integrations: getIntegrationsToSetup(options),
transport: options.transport || makeCloudflareTransport,
flushLock,
};

return initAndBind(CloudflareClient, clientOptions) as CloudflareClient;
Expand Down
55 changes: 55 additions & 0 deletions packages/cloudflare/test/durableobject.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { ExecutionContext } from '@cloudflare/workers-types';
import * as SentryCore from '@sentry/core';
import { describe, expect, it, onTestFinished, vi } from 'vitest';
import { instrumentDurableObjectWithSentry } from '../src/durableobject';
import { isInstrumented } from '../src/instrument';

describe('durable object', () => {
it('instrumentDurableObjectWithSentry generic functionality', () => {
const options = vi.fn();
const instrumented = instrumentDurableObjectWithSentry(options, vi.fn());
expect(instrumented).toBeTypeOf('function');
expect(() => Reflect.construct(instrumented, [])).not.toThrow();
expect(options).toHaveBeenCalledOnce();
});
it('all available durable object methods are instrumented', () => {
const testClass = vi.fn(() => ({
customMethod: vi.fn(),
fetch: vi.fn(),
alarm: vi.fn(),
webSocketMessage: vi.fn(),
webSocketClose: vi.fn(),
webSocketError: vi.fn(),
}));
const instrumented = instrumentDurableObjectWithSentry(vi.fn(), testClass as any);
const dObject: any = Reflect.construct(instrumented, []);
for (const method of Object.getOwnPropertyNames(dObject)) {
expect(isInstrumented(dObject[method]), `Method ${method} is instrumented`).toBeTruthy();
}
});
it('flush performs after all waitUntil promises are finished', async () => {
vi.useFakeTimers();
onTestFinished(() => {
vi.useRealTimers();
});
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');
const waitUntil = vi.fn();
const testClass = vi.fn(context => ({
fetch: () => {
context.waitUntil(new Promise(res => setTimeout(res)));
return new Response('test');
},
}));
const instrumented = instrumentDurableObjectWithSentry(vi.fn(), testClass as any);
const context = {
waitUntil,
} as unknown as ExecutionContext;
const dObject: any = Reflect.construct(instrumented, [context, {} as any]);
expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow();
expect(flush).not.toBeCalled();
expect(waitUntil).toHaveBeenCalledOnce();
vi.advanceTimersToNextTimer();
await Promise.all(waitUntil.mock.calls.map(([p]) => p));
expect(flush).toBeCalled();
});
});
30 changes: 30 additions & 0 deletions packages/cloudflare/test/flush.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { type ExecutionContext } from '@cloudflare/workers-types';
import { describe, expect, it, onTestFinished, vi } from 'vitest';
import { makeFlushLock } from '../src/flush';

describe('Flush buffer test', () => {
const waitUntilPromises: Promise<void>[] = [];
const mockExecutionContext: ExecutionContext = {
waitUntil: vi.fn(prmise => {
waitUntilPromises.push(prmise);
}),
passThroughOnException: vi.fn(),
};
it('should flush buffer immediately if no waitUntil were called', async () => {
const { finalize } = makeFlushLock(mockExecutionContext);
await expect(finalize()).resolves.toBeUndefined();
});
it('should flush buffer only after all waitUntil were finished', async () => {
vi.useFakeTimers();
onTestFinished(() => {
vi.useRealTimers();
});
const task = new Promise(resolve => setTimeout(resolve, 100));
const lock = makeFlushLock(mockExecutionContext);
mockExecutionContext.waitUntil(task);
void lock.finalize();
vi.advanceTimersToNextTimer();
await Promise.all(waitUntilPromises);
await expect(lock.ready).resolves.toBeUndefined();
});
});
Loading
Loading