Skip to content

Commit 41c1cc6

Browse files
authored
chore: Added producer and consumer metrics to kafkajs instrumentation (#2407)
1 parent 09636a4 commit 41c1cc6

File tree

6 files changed

+207
-115
lines changed

6 files changed

+207
-115
lines changed

lib/instrumentation/kafkajs/consumer.js

Lines changed: 64 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
const { kafkaCtx } = require('../../symbols')
88
const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs')
99
const { DESTINATIONS } = require('../../config/attribute-filter')
10+
const recordMethodMetric = require('./record-method-metric')
11+
const recordLinkingMetrics = require('./record-linking-metrics')
1012
const CONSUMER_METHODS = [
1113
'connect',
1214
'disconnect',
@@ -19,58 +21,65 @@ const CONSUMER_METHODS = [
1921
]
2022
const SEGMENT_PREFIX = 'kafkajs.Kafka.consumer#'
2123

22-
module.exports = function instrumentConsumer({ shim, kafkajs, recordMethodMetric }) {
23-
const { agent } = shim
24-
shim.wrap(kafkajs.Kafka.prototype, 'consumer', function wrapConsumer(shim, orig) {
25-
return function wrappedConsumer() {
26-
const args = shim.argsToArray.apply(shim, arguments)
27-
const consumer = orig.apply(this, args)
28-
consumer.on(consumer.events.REQUEST, function listener(data) {
29-
// storing broker for when we add `host`, `port` to messaging spans
30-
consumer[kafkaCtx] = {
31-
clientId: data?.payload?.clientId,
32-
broker: data?.payload.broker
33-
}
24+
module.exports = wrapConsumer
25+
26+
function wrapConsumer(shim, orig) {
27+
return function wrappedConsumer() {
28+
const args = shim.argsToArray.apply(shim, arguments)
29+
const consumer = orig.apply(this, args)
30+
consumer[kafkaCtx] = this[kafkaCtx]
31+
32+
consumer.on(consumer.events.REQUEST, function listener(data) {
33+
consumer[kafkaCtx].clientId = data?.payload?.clientId
34+
})
35+
shim.record(consumer, CONSUMER_METHODS, function wrapper(shim, fn, name) {
36+
return new RecorderSpec({
37+
name: `${SEGMENT_PREFIX}${name}`,
38+
promise: true
3439
})
35-
shim.record(consumer, CONSUMER_METHODS, function wrapper(shim, fn, name) {
36-
return new RecorderSpec({
37-
name: `${SEGMENT_PREFIX}${name}`,
38-
promise: true
39-
})
40+
})
41+
shim.recordSubscribedConsume(
42+
consumer,
43+
'run',
44+
new MessageSubscribeSpec({
45+
name: `${SEGMENT_PREFIX}#run`,
46+
destinationType: shim.TOPIC,
47+
promise: true,
48+
consumer: shim.FIRST,
49+
functions: ['eachMessage'],
50+
messageHandler: handler({ consumer })
4051
})
41-
shim.recordSubscribedConsume(
42-
consumer,
43-
'run',
44-
new MessageSubscribeSpec({
45-
name: `${SEGMENT_PREFIX}#run`,
46-
destinationType: shim.TOPIC,
47-
promise: true,
48-
consumer: shim.FIRST,
49-
functions: ['eachMessage'],
50-
messageHandler: handler({ consumer, recordMethodMetric })
51-
})
52-
)
52+
)
5353

54-
shim.wrap(consumer, 'run', function wrapRun(shim, fn) {
55-
return function wrappedRun() {
56-
const runArgs = shim.argsToArray.apply(shim, arguments)
57-
if (runArgs?.[0]?.eachBatch) {
58-
runArgs[0].eachBatch = shim.wrap(
59-
runArgs[0].eachBatch,
60-
function wrapEachBatch(shim, eachBatch) {
61-
return function wrappedEachBatch() {
62-
recordMethodMetric({ agent, name: 'eachBatch' })
63-
return eachBatch.apply(this, arguments)
64-
}
65-
}
66-
)
54+
shim.wrap(consumer, 'run', wrapRun)
55+
return consumer
56+
}
57+
}
58+
59+
function wrapRun(shim, fn) {
60+
const agent = shim.agent
61+
return function wrappedRun() {
62+
const runArgs = shim.argsToArray.apply(shim, arguments)
63+
const brokers = this[kafkaCtx].brokers
64+
if (runArgs?.[0]?.eachBatch) {
65+
runArgs[0].eachBatch = shim.wrap(
66+
runArgs[0].eachBatch,
67+
function wrapEachBatch(shim, eachBatch) {
68+
return function wrappedEachBatch() {
69+
recordMethodMetric({ agent, name: 'eachBatch' })
70+
recordLinkingMetrics({
71+
agent,
72+
brokers,
73+
topic: arguments[0].batch.topic,
74+
producer: false
75+
})
76+
return eachBatch.apply(this, arguments)
6777
}
68-
return fn.apply(this, runArgs)
6978
}
70-
})
71-
return consumer
79+
)
7280
}
73-
})
81+
return fn.apply(this, runArgs)
82+
}
7483
}
7584

7685
/**
@@ -80,10 +89,9 @@ module.exports = function instrumentConsumer({ shim, kafkajs, recordMethodMetric
8089
*
8190
* @param {object} params to function
8291
* @param {object} params.consumer consumer being instrumented
83-
* @param {function} params.recordMethodMetric helper method for logging tracking metrics
8492
* @returns {function} message handler for setting metrics and spec for the consumer transaction
8593
*/
86-
function handler({ consumer, recordMethodMetric }) {
94+
function handler({ consumer }) {
8795
/**
8896
* Message handler that extracts the topic and headers from message being consumed.
8997
*
@@ -96,10 +104,18 @@ function handler({ consumer, recordMethodMetric }) {
96104
*/
97105
return function messageHandler(shim, args) {
98106
recordMethodMetric({ agent: shim.agent, name: 'eachMessage' })
107+
99108
const [data] = args
100109
const { topic } = data
101110
const segment = shim.getActiveSegment()
102111

112+
recordLinkingMetrics({
113+
agent: shim.agent,
114+
brokers: consumer[kafkaCtx].brokers,
115+
topic,
116+
producer: false
117+
})
118+
103119
if (segment?.transaction) {
104120
const tx = segment.transaction
105121
const byteLength = data?.message.value?.byteLength

lib/instrumentation/kafkajs/index.js

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
const instrumentProducer = require('./producer')
99
const instrumentConsumer = require('./consumer')
10-
const { KAFKA } = require('../../metrics/names')
10+
const { ClassWrapSpec } = require('../../shim/specs')
11+
const { kafkaCtx } = require('../../symbols')
1112

1213
module.exports = function initialize(agent, kafkajs, _moduleName, shim) {
1314
if (agent.config.feature_flag.kafkajs_instrumentation === false) {
@@ -18,17 +19,16 @@ module.exports = function initialize(agent, kafkajs, _moduleName, shim) {
1819
}
1920

2021
shim.setLibrary(shim.KAFKA)
21-
instrumentConsumer({ shim, kafkajs, recordMethodMetric })
22-
instrumentProducer({ shim, kafkajs, recordMethodMetric })
23-
}
2422

25-
/**
26-
* Convenience method for logging the tracking metrics for producer and consumer
27-
*
28-
* @param {object} params to function
29-
* @param {Agent} params.agent instance of agent
30-
* @param {string} params.name name of function getting instrumented
31-
*/
32-
function recordMethodMetric({ agent, name }) {
33-
agent.metrics.getOrCreateMetric(`${KAFKA.PREFIX}/${name}`).incrementCallCount()
23+
shim.wrapClass(
24+
kafkajs,
25+
'Kafka',
26+
new ClassWrapSpec({
27+
post: function nrConstructorWrapper(shim, wrappedClass, name, args) {
28+
this[kafkaCtx] = { brokers: args[0].brokers }
29+
shim.wrap(this, 'producer', instrumentProducer)
30+
shim.wrap(this, 'consumer', instrumentConsumer)
31+
}
32+
})
33+
)
3434
}

lib/instrumentation/kafkajs/producer.js

Lines changed: 68 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,68 +7,84 @@
77

88
const { MessageSpec } = require('../../shim/specs')
99
const getByPath = require('../../util/get')
10+
const recordMethodMetric = require('./record-method-metric')
11+
const recordLinkingMetrics = require('./record-linking-metrics')
12+
const { kafkaCtx } = require('../../symbols')
1013

11-
module.exports = function instrumentProducer({ shim, kafkajs, recordMethodMetric }) {
12-
const { agent } = shim
13-
shim.wrap(kafkajs.Kafka.prototype, 'producer', function nrProducerWrapper(shim, orig) {
14-
return function nrProducer() {
15-
const params = shim.argsToArray.apply(shim, arguments)
16-
const producer = orig.apply(this, params)
14+
module.exports = nrProducerWrapper
1715

18-
// The `.producer()` method returns an object with `send` and `sendBatch`
19-
// methods. The `send` method is merely a wrapper around `sendBatch`, but
20-
// we cannot simply wrap `sendBatch` because the `send` method does not
21-
// use the object scoped instance (i.e. `this.sendBatch`); it utilizes
22-
// the closure scoped instance of `sendBatch`. So we must wrap each
23-
// method.
16+
function nrProducerWrapper(shim, orig) {
17+
return function nrProducer() {
18+
const params = shim.argsToArray.apply(shim, arguments)
19+
const producer = orig.apply(this, params)
20+
producer[kafkaCtx] = this[kafkaCtx]
2421

25-
shim.recordProduce(producer, 'send', function nrSend(shim, fn, name, args) {
26-
recordMethodMetric({ agent, name })
27-
const data = args[0]
28-
return new MessageSpec({
29-
promise: true,
30-
destinationName: data.topic,
31-
destinationType: shim.TOPIC,
32-
messageHeaders: (inject) => {
33-
return data.messages.map((msg) => {
34-
if (msg.headers) {
35-
return inject(msg.headers)
36-
}
37-
msg.headers = {}
38-
return inject(msg.headers)
39-
})
40-
}
41-
})
42-
})
22+
// The `.producer()` method returns an object with `send` and `sendBatch`
23+
// methods. The `send` method is merely a wrapper around `sendBatch`, but
24+
// we cannot simply wrap `sendBatch` because the `send` method does not
25+
// use the object scoped instance (i.e. `this.sendBatch`); it utilizes
26+
// the closure scoped instance of `sendBatch`. So we must wrap each
27+
// method.
28+
shim.recordProduce(producer, 'send', nrSend)
29+
shim.recordProduce(producer, 'sendBatch', nrSendBatch)
30+
31+
return producer
32+
}
33+
}
34+
35+
function nrSend(shim, fn, name, args) {
36+
const agent = shim.agent
37+
recordMethodMetric({ agent, name })
38+
const data = args[0]
4339

44-
shim.recordProduce(producer, 'sendBatch', function nrSendBatch(shim, fn, name, args) {
45-
recordMethodMetric({ agent, name })
46-
const data = args[0]
47-
const firstMessage = getByPath(data, 'topicMessages[0].messages[0]')
40+
recordLinkingMetrics({ agent, brokers: this[kafkaCtx].brokers, topic: data.topic })
4841

49-
if (firstMessage) {
50-
firstMessage.headers = firstMessage.headers ?? {}
42+
return new MessageSpec({
43+
promise: true,
44+
destinationName: data.topic,
45+
destinationType: shim.TOPIC,
46+
messageHeaders: (inject) => {
47+
return data.messages.map((msg) => {
48+
if (msg.headers) {
49+
return inject(msg.headers)
5150
}
51+
msg.headers = {}
52+
return inject(msg.headers)
53+
})
54+
}
55+
})
56+
}
5257

53-
return new MessageSpec({
54-
promise: true,
55-
destinationName: data.topicMessages[0].topic,
56-
destinationType: shim.TOPIC,
57-
messageHeaders: (inject) => {
58-
return data.topicMessages.map((tm) => {
59-
return tm.messages.map((m) => {
60-
if (m.headers) {
61-
return inject(m.headers)
62-
}
63-
m.headers = {}
64-
return inject(m.headers)
65-
})
66-
})
58+
function nrSendBatch(shim, fn, name, args) {
59+
const agent = shim.agent
60+
recordMethodMetric({ agent, name })
61+
const data = args[0]
62+
const firstMessage = getByPath(data, 'topicMessages[0].messages[0]')
63+
64+
recordLinkingMetrics({
65+
agent,
66+
brokers: this[kafkaCtx].brokers,
67+
topic: data.topicMessages[0].topic
68+
})
69+
70+
if (firstMessage) {
71+
firstMessage.headers = firstMessage.headers ?? {}
72+
}
73+
74+
return new MessageSpec({
75+
promise: true,
76+
destinationName: data.topicMessages[0].topic,
77+
destinationType: shim.TOPIC,
78+
messageHeaders: (inject) => {
79+
return data.topicMessages.map((tm) => {
80+
return tm.messages.map((m) => {
81+
if (m.headers) {
82+
return inject(m.headers)
6783
}
84+
m.headers = {}
85+
return inject(m.headers)
6886
})
6987
})
70-
71-
return producer
7288
}
7389
})
7490
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright 2024 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
'use strict'
7+
8+
module.exports = recordLinkingMetrics
9+
10+
function recordLinkingMetrics({ agent, brokers, topic, producer = true }) {
11+
const kind = producer === true ? 'Produce' : 'Consume'
12+
for (const broker of brokers) {
13+
agent.metrics
14+
.getOrCreateMetric(`MessageBroker/Kafka/Nodes/${broker}/${kind}/Named/${topic}`)
15+
.incrementCallCount()
16+
}
17+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2024 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
'use strict'
7+
8+
const { KAFKA } = require('../../metrics/names')
9+
10+
module.exports = recordMethodMetric
11+
12+
/**
13+
* Convenience method for logging the tracking metrics for producer and consumer
14+
*
15+
* @param {object} params to function
16+
* @param {Agent} params.agent instance of agent
17+
* @param {string} params.name name of function getting instrumented
18+
*/
19+
function recordMethodMetric({ agent, name }) {
20+
agent.metrics.getOrCreateMetric(`${KAFKA.PREFIX}/${name}`).incrementCallCount()
21+
}

0 commit comments

Comments
 (0)