Skip to content

Commit 91636a8

Browse files
authored
fix: Updated amqplib instrumentation to properly parse host/port from connect (#2461)
1 parent 2b67623 commit 91636a8

File tree

7 files changed

+166
-94
lines changed

7 files changed

+166
-94
lines changed

lib/instrumentation/amqplib/amqplib.js

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ const {
99
OperationSpec,
1010
params: { DatastoreParameters }
1111
} = require('../../shim/specs')
12-
const url = require('url')
1312
const wrapModel = require('./channel-model')
14-
const { setCallback } = require('./utils')
13+
const { setCallback, parseConnectionArgs } = require('./utils')
1514
const wrapChannel = require('./channel')
15+
const { amqpConnection } = require('../../symbols')
1616

1717
module.exports.instrumentPromiseAPI = instrumentChannelAPI
1818
module.exports.instrumentCallbackAPI = instrumentCallbackAPI
@@ -73,31 +73,52 @@ function instrumentAMQP(shim, amqp, promiseMode) {
7373
/**
7474
*
7575
* Instruments the connect method
76+
* We have to both wrap and record because
77+
* we need the host/port for all subsequent calls on the model/channel
78+
* but record only completes in an active transaction
7679
*
7780
* @param {Shim} shim instance of shim
7881
* @param {object} amqp amqplib object
7982
* @param {boolean} promiseMode is this promise based?
8083
*/
8184
function wrapConnect(shim, amqp, promiseMode) {
82-
shim.record(amqp, 'connect', function recordConnect(shim, connect, name, args) {
83-
let [connArgs] = args
84-
const params = new DatastoreParameters()
85+
shim.wrap(amqp, 'connect', function wrapConnect(shim, connect) {
86+
return function wrappedConnect() {
87+
const args = shim.argsToArray.apply(shim, arguments)
88+
const [connArgs] = args
89+
const params = parseConnectionArgs({ shim, connArgs })
90+
const cb = args[args.length - 1]
91+
if (!promiseMode) {
92+
args[args.length - 1] = function wrappedCallback() {
93+
const cbArgs = shim.argsToArray.apply(shim, arguments)
94+
const [, c] = cbArgs
95+
c.connection[amqpConnection] = params
96+
return cb.apply(this, cbArgs)
97+
}
98+
}
8599

86-
if (shim.isString(connArgs)) {
87-
connArgs = url.parse(connArgs)
88-
params.host = connArgs.hostname
89-
if (connArgs.port) {
90-
params.port = connArgs.port
100+
const result = connect.apply(this, args)
101+
if (promiseMode) {
102+
return result.then((c) => {
103+
c.connection[amqpConnection] = params
104+
return c
105+
})
91106
}
107+
return result
92108
}
109+
})
93110

111+
shim.record(amqp, 'connect', function recordConnect(shim, connect, name, args) {
112+
const [connArgs] = args
113+
const params = parseConnectionArgs({ shim, connArgs })
94114
return new OperationSpec({
95115
name: 'amqplib.connect',
96116
callback: setCallback(shim, promiseMode),
97117
promise: promiseMode,
98-
parameters: params,
99-
stream: null,
100-
recorder: null
118+
parameters: new DatastoreParameters({
119+
host: params.host,
120+
port_path_or_id: params.port
121+
})
101122
})
102123
})
103124
}

lib/instrumentation/amqplib/channel-model.js

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
'use strict'
77
const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs')
8+
const { amqpConnection } = require('../../symbols')
89
const CHANNEL_METHODS = [
910
'close',
1011
'open',
@@ -22,13 +23,7 @@ const CHANNEL_METHODS = [
2223
'prefetch',
2324
'recover'
2425
]
25-
const {
26-
describeMessage,
27-
setCallback,
28-
parseConnect,
29-
getParametersFromMessage,
30-
TEMP_RE
31-
} = require('./utils')
26+
const { describeMessage, setCallback, getParametersFromMessage, TEMP_RE } = require('./utils')
3227

3328
/**
3429
*
@@ -89,7 +84,7 @@ function recordPurge({ shim, proto, promiseMode }) {
8984

9085
function recordGet({ shim, proto, promiseMode }) {
9186
shim.recordConsume(proto, 'get', function wrapGet() {
92-
const { host, port } = parseConnect(this?.connection?.stream)
87+
const { host, port } = this?.connection?.[amqpConnection] || {}
9388
return new MessageSpec({
9489
destinationName: shim.FIRST,
9590
callback: setCallback(shim, promiseMode),
@@ -115,7 +110,7 @@ function recordGet({ shim, proto, promiseMode }) {
115110

116111
function recordConsume({ shim, proto, promiseMode }) {
117112
shim.recordSubscribedConsume(proto, 'consume', function consume() {
118-
const { host, port } = parseConnect(this?.connection?.stream)
113+
const { host, port } = this?.connection?.[amqpConnection] || {}
119114
return new MessageSubscribeSpec({
120115
name: 'amqplib.Channel#consume',
121116
queue: shim.FIRST,

lib/instrumentation/amqplib/channel.js

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55

66
'use strict'
77
const { MessageSpec } = require('../../shim/specs')
8-
const { parseConnect, getParameters, TEMP_RE } = require('./utils')
8+
const { amqpConnection } = require('../../symbols')
9+
const { getParameters, TEMP_RE } = require('./utils')
910

1011
/**
1112
*
@@ -47,24 +48,26 @@ module.exports = function wrapChannel(shim) {
4748
}
4849
})
4950

50-
shim.recordProduce(proto, 'sendMessage', function recordSendMessage(shim, fn, n, args) {
51-
const fields = args[0]
52-
if (!fields) {
53-
return null
54-
}
55-
const isDefault = fields.exchange === ''
56-
let exchange = 'Default'
57-
if (!isDefault) {
58-
exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange
59-
}
60-
const { host, port } = parseConnect(this?.connection?.stream)
51+
shim.recordProduce(proto, 'sendMessage', recordSendMessage)
52+
}
53+
54+
function recordSendMessage(shim, fn, n, args) {
55+
const fields = args[0]
56+
if (!fields) {
57+
return null
58+
}
59+
const isDefault = fields.exchange === ''
60+
let exchange = 'Default'
61+
if (!isDefault) {
62+
exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange
63+
}
64+
const { host, port } = this?.connection?.[amqpConnection] || {}
6165

62-
return new MessageSpec({
63-
destinationName: exchange,
64-
destinationType: shim.EXCHANGE,
65-
routingKey: fields.routingKey,
66-
headers: fields.headers,
67-
parameters: getParameters({ parameters: Object.create(null), fields, host, port })
68-
})
66+
return new MessageSpec({
67+
destinationName: exchange,
68+
destinationType: shim.EXCHANGE,
69+
routingKey: fields.routingKey,
70+
headers: fields.headers,
71+
parameters: getParameters({ parameters: Object.create(null), fields, host, port })
6972
})
7073
}

lib/instrumentation/amqplib/utils.js

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ const {
88
MessageSpec,
99
params: { QueueMessageParameters }
1010
} = require('../../shim/specs')
11-
const { amqpConnection } = require('../../symbols')
1211
const TEMP_RE = /^amq\./
1312

1413
/**
@@ -100,25 +99,6 @@ function getParametersFromMessage({ message, host, port }) {
10099
return parameters
101100
}
102101

103-
/**
104-
* Extracts the host/port from the amqp socket connection.
105-
* Stores on connection as symbol to only parse once.
106-
*
107-
* @param {Socket} socket amqp connection
108-
* @returns {object} {host, port } of connection
109-
*/
110-
function parseConnect(socket) {
111-
if (socket[amqpConnection]) {
112-
return socket[amqpConnection]
113-
}
114-
const host = ['127.0.0.1', '::1', '[::1]'].includes(socket?.remoteAddress)
115-
? 'localhost'
116-
: socket?.remoteAddress
117-
const port = socket?.remotePort
118-
socket[amqpConnection] = { host, port }
119-
return { host, port }
120-
}
121-
122102
/**
123103
* Helper to set the appropriate value of the callback property
124104
* in the spec. If it's a promise set to null otherwise set it to `shim.LAST`
@@ -131,11 +111,33 @@ function setCallback(shim, promiseMode) {
131111
return promiseMode ? null : shim.LAST
132112
}
133113

114+
/**
115+
* Parses the connection args to return host/port
116+
*
117+
* @param {string|object} connArgs connection arguments
118+
* @returns {object} {host, port }
119+
*/
120+
function parseConnectionArgs({ shim, connArgs }) {
121+
const params = {}
122+
if (shim.isString(connArgs)) {
123+
connArgs = new URL(connArgs)
124+
params.host = connArgs.hostname
125+
if (connArgs.port) {
126+
params.port = parseInt(connArgs.port, 10)
127+
}
128+
} else {
129+
params.port = connArgs.port || (connArgs.protocol === 'amqp' ? 5672 : 5671)
130+
params.host = connArgs.hostname
131+
}
132+
133+
return params
134+
}
135+
134136
module.exports = {
135137
describeMessage,
136138
getParameters,
137139
getParametersFromMessage,
138-
parseConnect,
140+
parseConnectionArgs,
139141
setCallback,
140142
TEMP_RE
141143
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2024 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
'use strict'
7+
const test = require('node:test')
8+
const assert = require('node:assert')
9+
const { parseConnectionArgs } = require('../../../../lib/instrumentation/amqplib/utils')
10+
11+
test('should parse host port if connection args is a string', () => {
12+
const stub = {
13+
isString() {
14+
return true
15+
}
16+
}
17+
const params = parseConnectionArgs({ shim: stub, connArgs: 'amqp://host:5388/' })
18+
assert.equal(params.host, 'host')
19+
assert.equal(params.port, 5388)
20+
})
21+
22+
test('should parse host port if connection is an object', () => {
23+
const stub = {
24+
isString() {
25+
return false
26+
}
27+
}
28+
const params = parseConnectionArgs({ shim: stub, connArgs: { hostname: 'host', port: 5388 } })
29+
assert.equal(params.host, 'host')
30+
assert.equal(params.port, 5388)
31+
})
32+
33+
test('should default port to 5672 if protocol is amqp:', () => {
34+
const stub = {
35+
isString() {
36+
return false
37+
}
38+
}
39+
const params = parseConnectionArgs({
40+
shim: stub,
41+
connArgs: { hostname: 'host', protocol: 'amqp' }
42+
})
43+
assert.equal(params.host, 'host')
44+
assert.equal(params.port, 5672)
45+
})
46+
47+
test('should default port to 5671 if protocol is amqps:', () => {
48+
const stub = {
49+
isString() {
50+
return false
51+
}
52+
}
53+
const params = parseConnectionArgs({
54+
shim: stub,
55+
connArgs: { hostname: 'host', protocol: 'amqps' }
56+
})
57+
assert.equal(params.host, 'host')
58+
assert.equal(params.port, 5671)
59+
})

test/versioned/amqplib/callback.tap.js

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,16 @@ tap.test('amqplib callback instrumentation', function (t) {
7575
})
7676

7777
t.test('connect in a transaction', function (t) {
78-
helper.runInTransaction(agent, function () {
79-
t.doesNotThrow(function () {
80-
amqplib.connect(amqpUtils.CON_STRING, null, function (err, _conn) {
81-
t.error(err, 'should not break connection')
82-
if (!t.passing()) {
83-
t.bailout('Can not connect to RabbitMQ, stopping tests.')
84-
}
85-
_conn.close(t.end)
86-
})
87-
}, 'should not error when connecting')
88-
89-
// If connect threw, we need to end the test immediately.
90-
if (!t.passing()) {
91-
t.end()
92-
}
78+
helper.runInTransaction(agent, function (tx) {
79+
amqplib.connect(amqpUtils.CON_STRING, null, function (err, _conn) {
80+
t.error(err, 'should not break connection')
81+
const [segment] = tx.trace.root.children
82+
t.equal(segment.name, 'amqplib.connect')
83+
const attrs = segment.getAttributes()
84+
t.equal(attrs.host, 'localhost')
85+
t.equal(attrs.port_path_or_id, 5672)
86+
_conn.close(t.end)
87+
})
9388
})
9489
})
9590

test/versioned/amqplib/promises.tap.js

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,23 +75,20 @@ tap.test('amqplib promise instrumentation', function (t) {
7575
})
7676

7777
t.test('connect in a transaction', function (t) {
78-
helper.runInTransaction(agent, function () {
79-
t.doesNotThrow(function () {
80-
amqplib.connect(amqpUtils.CON_STRING).then(
81-
function (_conn) {
82-
_conn.close().then(t.end)
83-
},
84-
function (err) {
85-
t.error(err, 'should not break connection')
86-
t.bailout('Can not connect to RabbitMQ, stopping tests.')
87-
}
88-
)
89-
}, 'should not error when connecting')
90-
91-
// If connect threw, we need to end the test immediately.
92-
if (!t.passing()) {
93-
t.end()
94-
}
78+
helper.runInTransaction(agent, function (tx) {
79+
amqplib.connect(amqpUtils.CON_STRING).then(
80+
function (_conn) {
81+
const [segment] = tx.trace.root.children
82+
t.equal(segment.name, 'amqplib.connect')
83+
const attrs = segment.getAttributes()
84+
t.equal(attrs.host, 'localhost')
85+
t.equal(attrs.port_path_or_id, 5672)
86+
_conn.close().then(t.end)
87+
},
88+
function (err) {
89+
t.error(err, 'should not break connection')
90+
}
91+
)
9592
})
9693
})
9794

0 commit comments

Comments
 (0)