-
Notifications
You must be signed in to change notification settings - Fork 399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Added kafkajs producer instrumentation #2236
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2236 +/- ##
=======================================
Coverage 96.31% 96.31%
=======================================
Files 281 283 +2
Lines 44859 44915 +56
=======================================
+ Hits 43206 43261 +55
- Misses 1653 1654 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall great start. I saw you asked in slack about
MessageBroker/Kafka/Topic/Named/{topic_name}/Serialization/Value
MessageBroker/Kafka/Topic/Named/{topic_name}/Serialization/Key
One comment about missing instrumentation and then suggestions for fleshing out tests
const data = args[0] | ||
return new MessageSpec({ | ||
destinationName: data.topic, | ||
destinationType: shim.TOPIC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll have to include headers so we can injected our DT/CAT headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Headers is optional according to our messaging/kafka spec. I removed adding them because the headers are unique per message, e.g.:
producer.send({
topic: 'topic',
messages: [
{ key: 'key', value: 'value', headers: {"stuff"},
{ key: 'key', value: 'value', headers: {"diff stuff"},
]
})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fine if they'e optional but you need to pass them in so we can add our headers and I think we're going to have to build logic to insert per message headers otherwise we're not going to get DT
|
||
agent.on('transactionFinished', (tx) => { | ||
const headers = {} | ||
tx.traceContext.addTraceContextHeaders(headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be done in your producer code. So this isn't asserting that DT is actually working. it looks like if you don't pass in headers to a message below it gets into this state. so if you include headers on line 155 these assertions still work without manually adding trace context headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain the difference with this test?
node-newrelic/test/unit/distributed_tracing/tracecontext.test.js
Lines 988 to 1010 in 957529e
t.test('should not create tracestate when trusted_account_key missing', (t) => { | |
const { agent } = t.context | |
agent.config.account_id = '12345' | |
agent.config.primary_application_id = 'appId' | |
agent.config.trusted_account_key = null | |
agent.config.distributed_tracing.enabled = true | |
agent.config.span_events.enabled = true | |
helper.runInTransaction(agent, function (txn) { | |
const headers = {} | |
txn.traceContext.addTraceContextHeaders(headers) | |
t.ok(headers.traceparent) | |
t.notOk(headers.tracestate) | |
t.equal(supportabilitySpy.callCount, 2) | |
// eslint-disable-next-line max-len | |
t.equal(supportabilitySpy.firstCall.args[0], 'TraceContext/TraceState/Create/Exception') | |
txn.end() | |
t.end() | |
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a generic DT test. the instrumentation will do the addTraceContextHeaders
indirectly in shim. so this bit is not needed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
versioned tests are failing because you're missing adding feature flag
diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js
index 7ef76ea03..cf4aa2121 100644
--- a/test/versioned/kafkajs/kafka.tap.js
+++ b/test/versioned/kafkajs/kafka.tap.js
@@ -14,7 +14,11 @@ const utils = require('./utils')
const broker = `${params.kafka_host}:${params.kafka_port}`
tap.beforeEach(async (t) => {
- t.context.agent = helper.instrumentMockedAgent()
+ t.context.agent = helper.instrumentMockedAgent({
+ feature_flag: {
+ kafkajs_instrumentation: true
+ }
+ })
test/versioned/kafkajs/kafka.tap.js
Outdated
// 3. The produced Kafka data includes the distributed trace data that was | ||
// provided to the service handling the request. | ||
|
||
t.plan(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test no longer has 10 assertions
t.equal(tx.isDistributedTrace, true) | ||
|
||
const headers = {} | ||
tx.traceContext.addTraceContextHeaders(headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of doing this you could assert these values in the eachMessage
handler. they are buffers obviously so you'd have to do const tracestate = actualMessage.headers.tracestate.toString()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The traceparent will be something generated from within asn it won't be the traceparent that was accepted but the first part should be the same 42@nr=0-0-account_1-app_1
t.equal(tx.isDistributedTrace, true) | ||
|
||
const headers = {} | ||
tx.traceContext.addTraceContextHeaders(headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can merge this but I'll clean up when consumer instrumentation is added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What needs to be cleaned up? The method is poorly named. It is not adding headers to the transaction. It is retrieving them from the transaction and adding them to the passed in object.
This PR resolves #2217.