Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added kafkajs producer instrumentation #2236

Merged
merged 6 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions lib/instrumentation/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,4 @@

'use strict'

// eslint-disable-next-line no-unused-vars
module.exports = function initialize(_agent, kafkajs, _moduleName, shim) {
// Put instrumentation code here for kafkajs.Kafka.producer and kafkajs.Kafka.consumer
}
module.exports = require('./kafkajs/index')
22 changes: 22 additions & 0 deletions lib/instrumentation/kafkajs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

const instrumentProducer = require('./producer')

// eslint-disable-next-line no-unused-vars
module.exports = function initialize(agent, kafkajs, _moduleName, shim) {
if (agent.config.feature_flag.kafkajs_instrumentation === false) {
shim.logger.debug(
'`config.feature_flag.kafkajs_instrumentation is false, skipping instrumentation of kafkajs`'
)
return
}

shim.setLibrary(shim.KAFKA)
bizob2828 marked this conversation as resolved.
Show resolved Hide resolved

instrumentProducer({ shim, kafkajs })
}
57 changes: 57 additions & 0 deletions lib/instrumentation/kafkajs/producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

const { MessageSpec } = require('../../shim/specs')
const getByPath = require('../../util/get')

module.exports = function instrumentProducer({ shim, kafkajs }) {
shim.wrap(kafkajs.Kafka.prototype, 'producer', function nrProducerWrapper(shim, orig) {
return function nrProducer() {
const params = shim.argsToArray.apply(shim, arguments)
const producer = orig.apply(this, params)

// The `.producer()` method returns an object with `send` and `sendBatch`
// methods. The `send` method is merely a wrapper around `sendBatch`, but
// we cannot simply wrap `sendBatch` because the `send` method does not
// use the object scoped instance (i.e. `this.sendBatch`); it utilizes
// the closure scoped instance of `sendBatch`. So we must wrap each
// method.

shim.recordProduce(producer, 'send', function nrSend(shim, fn, n, args) {
const data = args[0]
const firstMessage = getByPath(data, 'messages[0]')

if (firstMessage) {
firstMessage.headers = firstMessage.headers ?? {}
}

return new MessageSpec({
destinationName: data.topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
})
})

shim.recordProduce(producer, 'sendBatch', function nrSendBatch(shim, fn, n, args) {
const data = args[0]
const firstMessage = getByPath(data, 'topicMessages[0].messages[0]')

if (firstMessage) {
firstMessage.headers = firstMessage.headers ?? {}
}

return new MessageSpec({
destinationName: data.topicMessages[0].topic,
bizob2828 marked this conversation as resolved.
Show resolved Hide resolved
destinationType: shim.TOPIC,
headers: firstMessage.headers
})
})

return producer
}
})
}
3 changes: 2 additions & 1 deletion test/lib/agent_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,13 @@ helper.loadTestAgent = (t, conf, setState = true) => {

/**
* Create a transactional scope in which instrumentation that will only add
* trace segments to existing transactions will funciton.
* trace segments to existing transactions will function.
*
* If the agent hasn't been started, set to a state that can collect transactions.
*
* @param {Agent} agent The agent whose tracer should be used to create the
* transaction.
* @param {string} [type='web'] Indicates the class of the transaction.
* @param {Function} callback The function to be run within the transaction.
*/
helper.runInTransaction = (agent, type, callback) => {
Expand Down
184 changes: 168 additions & 16 deletions test/versioned/kafkajs/kafka.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ const utils = require('./utils')
const broker = `${params.kafka_host}:${params.kafka_port}`

tap.beforeEach(async (t) => {
t.context.agent = helper.instrumentMockedAgent()
t.context.agent = helper.instrumentMockedAgent({
feature_flag: {
kafkajs_instrumentation: true
}
})

const { Kafka, logLevel } = require('kafkajs')
t.context.Kafka = Kafka
Expand Down Expand Up @@ -43,24 +47,172 @@ tap.afterEach(async (t) => {
await t.context.producer.disconnect()
})

tap.test('stub', async (t) => {
const { consumer, producer, topic } = t.context
tap.test('send records correctly', (t) => {
t.plan(4)

const { agent, consumer, producer, topic } = t.context
const message = 'test message'

await consumer.subscribe({ topics: [topic], fromBeginning: true })
const testPromise = new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
resolve()
}
agent.on('transactionFinished', (tx) => {
const name = `MessageBroker/Kafka/Topic/Produce/Named/${topic}`
const segment = tx.agent.tracer.getSegment()

const foundSegment = segment.children.find((s) => s.name.endsWith(topic))
bizob2828 marked this conversation as resolved.
Show resolved Hide resolved
t.equal(foundSegment.name, name)

const metric = tx.metrics.getMetric(name)
t.equal(metric.callCount, 1)

t.end()
})

helper.runInTransaction(agent, async (tx) => {
await consumer.subscribe({ topic, fromBeginning: true })
const promise = new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
t.match(actualMessage.headers['x-foo'].toString(), 'foo')
resolve()
}
})
})
await utils.waitForConsumersToJoinGroup({ consumer })
await producer.send({
acks: 1,
topic,
messages: [
{
key: 'key',
value: message,
headers: {
'x-foo': 'foo'
}
}
]
})
await promise

tx.end()
})
})

tap.test('send passes along DT headers', (t) => {
// The intent of this test is to verify the scenario:
//
// 1. A service receives a request
// 2. The service builds a payload for Kafka
// 3. The produced Kafka data includes the distributed trace data that was
// provided to the service handling the request.

t.plan(5)

const now = Date.now
Date.now = () => 1717426365982
t.teardown(() => {
Date.now = now
})

const { agent, consumer, producer, topic } = t.context
const messages = ['one', 'two', 'three']

// These agent.config lines are utilized to simulate the inbound
// distributed trace that we are trying to validate.
agent.config.account_id = 'account_1'
agent.config.primary_application_id = 'app_1'
agent.config.trusted_account_key = 42

agent.on('transactionFinished', (tx) => {
t.equal(tx.isDistributedTrace, true)

const headers = {}
tx.traceContext.addTraceContextHeaders(headers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be done in your producer code. So this isn't asserting that DT is actually working. it looks like if you don't pass in headers to a message below it gets into this state. so if you include headers on line 155 these assertions still work without manually adding trace context headers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain the difference with this test?

t.test('should not create tracestate when trusted_account_key missing', (t) => {
const { agent } = t.context
agent.config.account_id = '12345'
agent.config.primary_application_id = 'appId'
agent.config.trusted_account_key = null
agent.config.distributed_tracing.enabled = true
agent.config.span_events.enabled = true
helper.runInTransaction(agent, function (txn) {
const headers = {}
txn.traceContext.addTraceContextHeaders(headers)
t.ok(headers.traceparent)
t.notOk(headers.tracestate)
t.equal(supportabilitySpy.callCount, 2)
// eslint-disable-next-line max-len
t.equal(supportabilitySpy.firstCall.args[0], 'TraceContext/TraceState/Create/Exception')
txn.end()
t.end()
})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a generic DT test. the instrumentation will do the addTraceContextHeaders indirectly in shim. so this bit is not needed here.

bizob2828 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of doing this you could assert these values in the eachMessage handler. they are buffers obviously so you'd have to do const tracestate = actualMessage.headers.tracestate.toString()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The traceparent will be something generated from within asn it won't be the traceparent that was accepted but the first part should be the same 42@nr=0-0-account_1-app_1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this but I'll clean up when consumer instrumentation is added

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What needs to be cleaned up? The method is poorly named. It is not adding headers to the transaction. It is retrieving them from the transaction and adding them to the passed in object.

t.equal(headers.tracestate.startsWith('42@nr=0-0-account_1-app_1-'), true)

t.end()
})

helper.runInTransaction(agent, async (tx) => {
await consumer.subscribe({ topic, fromBeginning: true })

let msgCount = 0
const promise = new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(messages.includes(actualMessage.value.toString()), true)
bizob2828 marked this conversation as resolved.
Show resolved Hide resolved
msgCount += 1
if (msgCount === 3) {
resolve()
}
}
})
})

await utils.waitForConsumersToJoinGroup({ consumer })
await producer.send({
acks: 1,
topic,
messages: messages.map((m) => {
return { key: 'key', value: m }
})
})

await promise

tx.end()
})
await utils.waitForConsumersToJoinGroup({ consumer })
await producer.send({
acks: 1,
topic,
messages: [{ key: 'key', value: message }]
})

tap.test('sendBatch records correctly', (t) => {
t.plan(5)

const { agent, consumer, producer, topic } = t.context
const message = 'test message'

agent.on('transactionFinished', (tx) => {
const name = `MessageBroker/Kafka/Topic/Produce/Named/${topic}`
bizob2828 marked this conversation as resolved.
Show resolved Hide resolved
const segment = tx.agent.tracer.getSegment()

const foundSegment = segment.children.find((s) => s.name.endsWith(topic))
t.equal(foundSegment.name, name)

const metric = tx.metrics.getMetric(name)
t.equal(metric.callCount, 1)

t.equal(tx.isDistributedTrace, true)

t.end()
})

helper.runInTransaction(agent, async (tx) => {
await consumer.subscribe({ topic, fromBeginning: true })
const promise = new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
t.match(actualMessage.headers['x-foo'].toString(), 'foo')
resolve()
}
})
})
await utils.waitForConsumersToJoinGroup({ consumer })
await producer.sendBatch({
acks: 1,
topicMessages: [
{
topic,
messages: [
{
key: 'key',
value: message,
headers: { 'x-foo': 'foo' }
}
]
}
]
})
await promise

tx.end()
})
await testPromise
})
Loading