Skip to content

Commit 4bc9f41

Browse files
committed
feat(amqplibIntegration): Add amqplibIntegration to Node
1 parent e1783a6 commit 4bc9f41

File tree

21 files changed

+315
-18
lines changed

21 files changed

+315
-18
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,6 @@ packages/deno/lib.deno.d.ts
5858

5959
# gatsby
6060
packages/gatsby/gatsby-node.d.ts
61+
62+
# intellij
63+
*.iml

dev-packages/node-integration-tests/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"@types/mongodb": "^3.6.20",
3939
"@types/mysql": "^2.15.21",
4040
"@types/pg": "^8.6.5",
41+
"amqplib": "^0.10.4",
4142
"apollo-server": "^3.11.1",
4243
"axios": "^1.6.7",
4344
"connect": "^3.7.0",
@@ -65,6 +66,7 @@
6566
"yargs": "^16.2.0"
6667
},
6768
"devDependencies": {
69+
"@types/amqplib": "^0.10.5",
6870
"@types/node-cron": "^3.0.11",
6971
"@types/node-schedule": "^2.1.7",
7072
"globby": "11"
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
const amqpUsername = 'sentry';
2+
const amqpPassword = 'sentry';
3+
4+
export const AMQP_URL = `amqp://${amqpUsername}:${amqpPassword}@localhost:5672/`;
5+
export const ACKNOWLEDGEMENT = { noAck: false };
6+
7+
export const QUEUE_OPTIONS = {
8+
durable: true, // Make the queue durable
9+
exclusive: false, // Not exclusive
10+
autoDelete: false, // Don't auto-delete the queue
11+
arguments: {
12+
'x-message-ttl': 30000, // Message TTL of 30 seconds
13+
'x-max-length': 1000, // Maximum queue length of 1000 messages
14+
},
15+
};
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
version: '3'
2+
3+
services:
4+
rabbitmq:
5+
image: rabbitmq:management
6+
container_name: rabbitmq
7+
environment:
8+
- RABBITMQ_DEFAULT_USER=sentry
9+
- RABBITMQ_DEFAULT_PASS=sentry
10+
ports:
11+
- "5672:5672"
12+
- "15672:15672"
13+
14+
networks:
15+
default:
16+
driver: bridge
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { loggingTransport } from '@sentry-internal/node-integration-tests';
2+
import * as Sentry from '@sentry/node';
3+
4+
Sentry.init({
5+
dsn: 'https://public@dsn.ingest.sentry.io/1337',
6+
release: '1.0',
7+
tracesSampleRate: 1.0,
8+
transport: loggingTransport,
9+
});
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import * as Sentry from '@sentry/node';
2+
import './init';
3+
import { connectToRabbitMQ, consumeMessageFromQueue, createQueue, sendMessageToQueue } from './utils';
4+
5+
const queueName = 'queue1';
6+
7+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
8+
(async () => {
9+
const { connection, channel } = await connectToRabbitMQ();
10+
await createQueue(queueName, channel);
11+
12+
await Sentry.startSpan({ name: 'root span' }, async () => {
13+
sendMessageToQueue(queueName, channel, JSON.stringify({ foo: 'bar01' }));
14+
});
15+
16+
await consumeMessageFromQueue(queueName, channel);
17+
await channel.close();
18+
await connection.close();
19+
})();
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import type { TransactionEvent } from '@sentry/types';
2+
import { cleanupChildProcesses, createRunner } from '../../../utils/runner';
3+
4+
jest.setTimeout(30_000);
5+
6+
const EXPECTED_MESSAGE_SPAN_PRODUCER = expect.objectContaining({
7+
op: 'message',
8+
data: expect.objectContaining({
9+
'messaging.system': 'rabbitmq',
10+
'otel.kind': 'PRODUCER',
11+
'sentry.op': 'message',
12+
}),
13+
status: 'ok',
14+
});
15+
16+
const EXPECTED_MESSAGE_SPAN_CONSUMER = expect.objectContaining({
17+
op: 'message',
18+
data: expect.objectContaining({
19+
'messaging.system': 'rabbitmq',
20+
'otel.kind': 'CONSUMER',
21+
'sentry.op': 'message',
22+
'sentry.origin': 'auto.amqplib.otel.consumer',
23+
}),
24+
status: 'ok',
25+
});
26+
27+
describe('amqplib auto-instrumentation', () => {
28+
afterAll(async () => {
29+
cleanupChildProcesses();
30+
});
31+
32+
test('should be able to send and receive messages', done => {
33+
createRunner(__dirname, 'scenario-message.ts')
34+
.withDockerCompose({
35+
workingDirectory: [__dirname],
36+
readyMatches: ['Time to start RabbitMQ'],
37+
})
38+
.expect({
39+
transaction: (transaction: TransactionEvent) => {
40+
expect(transaction.transaction).toEqual('root span');
41+
expect(transaction.spans?.length).toEqual(1);
42+
expect(transaction.spans![0]).toMatchObject(EXPECTED_MESSAGE_SPAN_PRODUCER);
43+
},
44+
})
45+
.expect({
46+
transaction: (transaction: TransactionEvent) => {
47+
expect(transaction.transaction).toEqual('queue1 process');
48+
expect(transaction.contexts?.trace).toMatchObject(EXPECTED_MESSAGE_SPAN_CONSUMER);
49+
},
50+
})
51+
.start(done);
52+
});
53+
});
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import amqp from 'amqplib';
2+
import type { Channel, Connection } from 'amqplib';
3+
import { ACKNOWLEDGEMENT, AMQP_URL, QUEUE_OPTIONS } from './constants';
4+
5+
export type RabbitMQData = {
6+
connection: Connection;
7+
channel: Channel;
8+
};
9+
10+
export async function connectToRabbitMQ(): Promise<RabbitMQData> {
11+
const connection = await amqp.connect(AMQP_URL);
12+
const channel = await connection.createChannel();
13+
return { connection, channel };
14+
}
15+
16+
export async function createQueue(queueName: string, channel: Channel): Promise<void> {
17+
await channel.assertQueue(queueName, QUEUE_OPTIONS);
18+
}
19+
20+
export function sendMessageToQueue(queueName: string, channel: Channel, message: string): void {
21+
channel.sendToQueue(queueName, Buffer.from(message));
22+
}
23+
24+
async function consumer(queueName: string, channel: Channel): Promise<void> {
25+
await channel.consume(
26+
queueName,
27+
message => {
28+
if (message) {
29+
channel.ack(message);
30+
}
31+
},
32+
ACKNOWLEDGEMENT,
33+
);
34+
}
35+
36+
export async function consumeMessageFromQueue(queueName: string, channel: Channel): Promise<void> {
37+
await consumer(queueName, channel);
38+
}

dev-packages/node-integration-tests/utils/runner.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type {
1111
SerializedCheckIn,
1212
SerializedSession,
1313
SessionAggregates,
14+
TransactionEvent,
1415
} from '@sentry/types';
1516
import axios from 'axios';
1617
import { createBasicSentryServer } from './server';
@@ -151,7 +152,7 @@ type Expected =
151152
event: Partial<Event> | ((event: Event) => void);
152153
}
153154
| {
154-
transaction: Partial<Event> | ((event: Event) => void);
155+
transaction: Partial<TransactionEvent> | ((event: TransactionEvent) => void);
155156
}
156157
| {
157158
session: Partial<SerializedSession> | ((event: SerializedSession) => void);
@@ -317,7 +318,7 @@ export function createRunner(...paths: string[]) {
317318
}
318319

319320
if ('transaction' in expected) {
320-
const event = item[1] as Event;
321+
const event = item[1] as TransactionEvent;
321322
if (typeof expected.transaction === 'function') {
322323
expected.transaction(event);
323324
} else {
@@ -483,6 +484,7 @@ export function createRunner(...paths: string[]) {
483484
method: 'get' | 'post',
484485
path: string,
485486
headers: Record<string, string> = {},
487+
data?: any, // axios accept any as data
486488
): Promise<T | undefined> {
487489
try {
488490
await waitFor(() => scenarioServerPort !== undefined);
@@ -497,7 +499,7 @@ export function createRunner(...paths: string[]) {
497499
if (method === 'get') {
498500
await axios.get(url, { headers });
499501
} else {
500-
await axios.post(url, { headers });
502+
await axios.post(url, data, { headers });
501503
}
502504
} catch (e) {
503505
return;
@@ -506,7 +508,7 @@ export function createRunner(...paths: string[]) {
506508
} else if (method === 'get') {
507509
return (await axios.get(url, { headers })).data;
508510
} else {
509-
return (await axios.post(url, { headers })).data;
511+
return (await axios.post(url, data, { headers })).data;
510512
}
511513
},
512514
};

packages/astro/src/index.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export {
1313
addIntegration,
1414
addOpenTelemetryInstrumentation,
1515
addRequestDataToEvent,
16+
amqplibIntegration,
1617
anrIntegration,
1718
captureCheckIn,
1819
captureConsoleIntegration,

packages/aws-serverless/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ export {
110110
addOpenTelemetryInstrumentation,
111111
zodErrorsIntegration,
112112
profiler,
113+
amqplibIntegration,
113114
} from '@sentry/node';
114115

115116
export {

packages/bun/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ export {
131131
addOpenTelemetryInstrumentation,
132132
zodErrorsIntegration,
133133
profiler,
134+
amqplibIntegration,
134135
} from '@sentry/node';
135136

136137
export {

packages/google-cloud-serverless/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ export {
110110
addOpenTelemetryInstrumentation,
111111
zodErrorsIntegration,
112112
profiler,
113+
amqplibIntegration,
113114
} from '@sentry/node';
114115

115116
export {

packages/node/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
"@opentelemetry/context-async-hooks": "^1.25.1",
7070
"@opentelemetry/core": "^1.25.1",
7171
"@opentelemetry/instrumentation": "^0.53.0",
72+
"@opentelemetry/instrumentation-amqplib": "^0.42.0",
7273
"@opentelemetry/instrumentation-connect": "0.39.0",
7374
"@opentelemetry/instrumentation-express": "0.42.0",
7475
"@opentelemetry/instrumentation-fastify": "0.39.0",

packages/node/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export { koaIntegration, setupKoaErrorHandler } from './integrations/tracing/koa
2828
export { connectIntegration, setupConnectErrorHandler } from './integrations/tracing/connect';
2929
export { spotlightIntegration } from './integrations/spotlight';
3030
export { genericPoolIntegration } from './integrations/tracing/genericPool';
31+
export { amqplibIntegration } from './integrations/tracing/amqplib';
3132

3233
export { SentryContextManager } from './otel/contextManager';
3334
export { generateInstrumentOnce } from './otel/instrument';
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import type { Span } from '@opentelemetry/api';
2+
import { AmqplibInstrumentation, type AmqplibInstrumentationConfig } from '@opentelemetry/instrumentation-amqplib';
3+
import { defineIntegration } from '@sentry/core';
4+
import type { IntegrationFn } from '@sentry/types';
5+
import { generateInstrumentOnce } from '../../otel/instrument';
6+
import { addOriginToSpan } from '../../utils/addOriginToSpan';
7+
8+
const INTEGRATION_NAME = 'Amqplib';
9+
10+
const config: AmqplibInstrumentationConfig = {
11+
consumeEndHook: (span: Span) => {
12+
addOriginToSpan(span, 'auto.amqplib.otel.consumer');
13+
},
14+
publishConfirmHook: (span: Span) => {
15+
addOriginToSpan(span, 'auto.amqplib.otel.publisher');
16+
},
17+
};
18+
19+
export const instrumentAmqplib = generateInstrumentOnce(INTEGRATION_NAME, () => new AmqplibInstrumentation(config));
20+
21+
const _amqplibIntegration = (() => {
22+
return {
23+
name: INTEGRATION_NAME,
24+
setupOnce() {
25+
instrumentAmqplib();
26+
},
27+
};
28+
}) satisfies IntegrationFn;
29+
30+
export const amqplibIntegration = defineIntegration(_amqplibIntegration);

packages/node/src/integrations/tracing/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { Integration } from '@sentry/types';
22
import { instrumentHttp } from '../http';
33

4+
import { amqplibIntegration, instrumentAmqplib } from './amqplib';
45
import { connectIntegration, instrumentConnect } from './connect';
56
import { expressIntegration, instrumentExpress } from './express';
67
import { fastifyIntegration, instrumentFastify } from './fastify';
@@ -41,6 +42,7 @@ export function getAutoPerformanceIntegrations(): Integration[] {
4142
connectIntegration(),
4243
genericPoolIntegration(),
4344
kafkaIntegration(),
45+
amqplibIntegration(),
4446
];
4547
}
4648

@@ -67,5 +69,6 @@ export function getOpenTelemetryInstrumentationToPreload(): (((options?: any) =>
6769
instrumentGraphql,
6870
instrumentRedis,
6971
instrumentGenericPool,
72+
instrumentAmqplib,
7073
];
7174
}

packages/remix/src/index.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export {
1818
addIntegration,
1919
addOpenTelemetryInstrumentation,
2020
addRequestDataToEvent,
21+
amqplibIntegration,
2122
anrIntegration,
2223
captureCheckIn,
2324
captureConsoleIntegration,

packages/solidstart/src/server/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export {
99
addIntegration,
1010
addOpenTelemetryInstrumentation,
1111
addRequestDataToEvent,
12+
amqplibIntegration,
1213
anrIntegration,
1314
captureCheckIn,
1415
captureConsoleIntegration,

packages/sveltekit/src/server/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export {
99
addIntegration,
1010
addOpenTelemetryInstrumentation,
1111
addRequestDataToEvent,
12+
amqplibIntegration,
1213
anrIntegration,
1314
captureCheckIn,
1415
captureConsoleIntegration,

0 commit comments

Comments
 (0)