Skip to content

Commit 1faa159

Browse files
authored
tracing: fix kafka header injection error for older kafka brokers (#5704)
* disable header injection if protocol error
1 parent f980b33 commit 1faa159

File tree

5 files changed

+157
-14
lines changed

5 files changed

+157
-14
lines changed

packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ const {
77
} = require('./helpers/instrument')
88
const shimmer = require('../../datadog-shimmer')
99

10+
const log = require('../../dd-trace/src/log')
11+
1012
// Create channels for Confluent Kafka JavaScript
1113
const channels = {
1214
producerStart: channel('apm:@confluentinc/kafka-javascript:produce:start'),
@@ -25,6 +27,8 @@ const channels = {
2527
batchConsumerCommit: channel('apm:@confluentinc/kafka-javascript:consume-batch:commit')
2628
}
2729

30+
const disabledHeaderWeakSet = new WeakSet()
31+
2832
// we need to store the offset per partition per topic for the consumer to track offsets for DSM
2933
const latestConsumerOffsets = new Map()
3034

@@ -206,7 +210,8 @@ function instrumentKafkaJS (kafkaJS) {
206210
channels.producerStart.publish({
207211
topic: payload?.topic,
208212
messages: payload?.messages || [],
209-
bootstrapServers: kafka._ddBrokers
213+
bootstrapServers: kafka._ddBrokers,
214+
disableHeaderInjection: disabledHeaderWeakSet.has(producer)
210215
})
211216

212217
const result = send.apply(this, arguments)
@@ -218,6 +223,16 @@ function instrumentKafkaJS (kafkaJS) {
218223
}),
219224
asyncResource.bind(err => {
220225
if (err) {
226+
// Fixes bug where we would inject message headers for kafka brokers
227+
// that don't support headers (version <0.11). On the error, we disable
228+
// header injection. Tnfortunately the error name / type is not more specific.
229+
// This approach is implemented by other tracers as well.
230+
if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') {
231+
disabledHeaderWeakSet.add(producer)
232+
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
233+
'Please look at broker logs for more information. ' +
234+
'Tracer message header injection for Kafka is disabled.')
235+
}
221236
channels.producerError.publish(err)
222237
}
223238
channels.producerFinish.publish(undefined)

packages/datadog-instrumentations/src/kafkajs.js

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ const {
77
} = require('./helpers/instrument')
88
const shimmer = require('../../datadog-shimmer')
99

10+
const log = require('../../dd-trace/src/log')
11+
1012
const producerStartCh = channel('apm:kafkajs:produce:start')
1113
const producerCommitCh = channel('apm:kafkajs:produce:commit')
1214
const producerFinishCh = channel('apm:kafkajs:produce:finish')
@@ -21,6 +23,8 @@ const batchConsumerStartCh = channel('apm:kafkajs:consume-batch:start')
2123
const batchConsumerFinishCh = channel('apm:kafkajs:consume-batch:finish')
2224
const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error')
2325

26+
const disabledHeaderWeakSet = new WeakSet()
27+
2428
function commitsFromEvent (event) {
2529
const { payload: { groupId, topics } } = event
2630
const commitList = []
@@ -65,22 +69,31 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
6569

6670
try {
6771
const { topic, messages = [] } = arguments[0]
68-
for (const message of messages) {
69-
if (message !== null && typeof message === 'object') {
70-
message.headers = message.headers || {}
71-
}
72-
}
73-
producerStartCh.publish({ topic, messages, bootstrapServers, clusterId })
74-
72+
producerStartCh.publish({
73+
topic,
74+
messages,
75+
bootstrapServers,
76+
clusterId,
77+
disableHeaderInjection: disabledHeaderWeakSet.has(producer)
78+
})
7579
const result = send.apply(this, arguments)
76-
7780
result.then(
7881
innerAsyncResource.bind(res => {
7982
producerFinishCh.publish(undefined)
8083
producerCommitCh.publish(res)
8184
}),
8285
innerAsyncResource.bind(err => {
8386
if (err) {
87+
// Fixes bug where we would inject message headers for kafka brokers that don't support headers
88+
// (version <0.11). On the error, we disable header injection.
89+
// Tnfortunately the error name / type is not more specific.
90+
// This approach is implemented by other tracers as well.
91+
if (err.name === 'KafkaJSProtocolError' && err.type === 'UNKNOWN') {
92+
disabledHeaderWeakSet.add(producer)
93+
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
94+
'Please look at broker logs for more information. ' +
95+
'Tracer message header injection for Kafka is disabled.')
96+
}
8497
producerErrorCh.publish(err)
8598
}
8699
producerFinishCh.publish(undefined)

packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,59 @@ describe('Plugin', () => {
628628
expect(topic).to.equal(testTopic)
629629
})
630630
})
631+
632+
describe('when using a kafka broker version that does not support message headers', () => {
633+
class KafkaJSError extends Error {
634+
constructor (message) {
635+
super(message)
636+
this.name = 'KafkaJSError'
637+
this.type = 'ERR_UNKNOWN'
638+
}
639+
}
640+
let error
641+
let producer
642+
let produceStub
643+
644+
beforeEach(async () => {
645+
// simulate a kafka error for the broker version not supporting message headers
646+
error = new KafkaJSError()
647+
error.message = 'Simulated KafkaJSError ERR_UNKNOWN from Producer.produce stub'
648+
producer = kafka.producer()
649+
await producer.connect()
650+
651+
// Spy on the produce method from the native library before it gets wrapped
652+
produceStub = sinon.stub(nativeApi.Producer.prototype, 'produce')
653+
.callsFake((topic, partition, message, key) => {
654+
throw error
655+
})
656+
})
657+
658+
afterEach(async () => {
659+
produceStub.restore()
660+
await producer.disconnect()
661+
})
662+
663+
it('should hit an error for the first send and not inject headers in later sends', async () => {
664+
const testMessages = [{ key: 'key1', value: 'test1' }]
665+
const testMessages2 = [{ key: 'key2', value: 'test2' }]
666+
667+
try {
668+
await producer.send({ topic: testTopic, messages: testMessages })
669+
expect.fail('First producer.send() should have thrown an error')
670+
} catch (e) {
671+
expect(e).to.equal(error)
672+
}
673+
// Verify headers were injected in the first attempt
674+
expect(testMessages[0].headers[0]).to.have.property('x-datadog-trace-id')
675+
676+
// restore the stub to allow the next send to succeed
677+
produceStub.restore()
678+
679+
const result = await producer.send({ topic: testTopic, messages: testMessages2 })
680+
expect(testMessages2[0].headers).to.be.null
681+
expect(result).to.not.be.undefined
682+
})
683+
})
631684
})
632685
})
633686
})

packages/datadog-plugin-kafkajs/src/producer.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class KafkajsProducerPlugin extends ProducerPlugin {
6767
}
6868
}
6969

70-
start ({ topic, messages, bootstrapServers, clusterId }) {
70+
start ({ topic, messages, bootstrapServers, clusterId, disableHeaderInjection }) {
7171
const span = this.startSpan({
7272
resource: topic,
7373
meta: {
@@ -85,10 +85,11 @@ class KafkajsProducerPlugin extends ProducerPlugin {
8585
}
8686
for (const message of messages) {
8787
if (message !== null && typeof message === 'object') {
88-
if (!message.headers) {
89-
message.headers = {}
88+
// message headers are not supported for kafka broker versions <0.11
89+
if (!disableHeaderInjection) {
90+
message.headers ??= {}
91+
this.tracer.inject(span, 'text_map', message.headers)
9092
}
91-
this.tracer.inject(span, 'text_map', message.headers)
9293
if (this.config.dsmEnabled) {
9394
const payloadSize = getMessageSize(message)
9495
const edgeTags = ['direction:out', `topic:${topic}`, 'type:kafka']
@@ -98,7 +99,9 @@ class KafkajsProducerPlugin extends ProducerPlugin {
9899
}
99100

100101
const dataStreamsContext = this.tracer.setCheckpoint(edgeTags, span, payloadSize)
101-
DsmPathwayCodec.encode(dataStreamsContext, message.headers)
102+
if (!disableHeaderInjection) {
103+
DsmPathwayCodec.encode(dataStreamsContext, message.headers)
104+
}
102105
}
103106
}
104107
}

packages/datadog-plugin-kafkajs/test/index.spec.js

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ describe('Plugin', () => {
4242
let kafka
4343
let tracer
4444
let Kafka
45+
let Broker
4546
let clusterIdAvailable
4647
let expectedProducerHash
4748
let expectedConsumerHash
@@ -54,13 +55,15 @@ describe('Plugin', () => {
5455

5556
describe('without configuration', () => {
5657
const messages = [{ key: 'key1', value: 'test2' }]
58+
const messages2 = [{ key: 'key2', value: 'test3' }]
5759

5860
beforeEach(async () => {
5961
process.env.DD_DATA_STREAMS_ENABLED = 'true'
6062
tracer = require('../../dd-trace')
6163
await agent.load('kafkajs')
6264
const lib = require(`../../../versions/kafkajs@${version}`).get()
6365
Kafka = lib.Kafka
66+
Broker = require(`../../../versions/kafkajs@${version}/node_modules/kafkajs/src/broker`)
6467
kafka = new Kafka({
6568
clientId: `kafkajs-test-${version}`,
6669
brokers: ['127.0.0.1:9092'],
@@ -158,6 +161,62 @@ describe('Plugin', () => {
158161
})
159162
}
160163

164+
describe('when using a kafka broker version that does not support message headers', function () {
165+
// kafkajs 1.4.0 is very slow when encountering errors
166+
this.timeout(30000)
167+
168+
// we should stub the kafka producer send method to throw a KafkaJSProtocolError
169+
class KafkaJSProtocolError extends Error {
170+
constructor (message) {
171+
super(message)
172+
this.name = 'KafkaJSProtocolError'
173+
this.type = 'UNKNOWN'
174+
}
175+
}
176+
let sendRequestStub
177+
let producer
178+
179+
const error = new KafkaJSProtocolError()
180+
error.message = 'Simulated KafkaJSProtocolError UNKNOWN from Broker.sendRequest stub'
181+
182+
beforeEach(async () => {
183+
// simulate a kafka error for the broker version not supporting message headers
184+
const otherKafka = new Kafka({
185+
clientId: `kafkajs-test-${version}`,
186+
brokers: ['127.0.0.1:9092'],
187+
retry: {
188+
retries: 0
189+
}
190+
})
191+
192+
sendRequestStub = sinon.stub(Broker.prototype, 'produce').rejects(error)
193+
194+
producer = otherKafka.producer({ transactionTimeout: 10 })
195+
await producer.connect()
196+
})
197+
198+
afterEach(() => {
199+
sendRequestStub.restore()
200+
})
201+
202+
it('should hit an error for the first send and not inject headers in later sends', async () => {
203+
try {
204+
await producer.send({ topic: testTopic, messages })
205+
expect(true).to.be.false('First producer.send() should have thrown an error')
206+
} catch (e) {
207+
expect(e).to.equal(error)
208+
}
209+
expect(messages[0].headers).to.have.property('x-datadog-trace-id')
210+
211+
// restore the stub to allow the next send to succeed
212+
sendRequestStub.restore()
213+
214+
const result2 = await producer.send({ topic: testTopic, messages: messages2 })
215+
expect(messages2[0].headers).to.be.undefined
216+
expect(result2[0].errorCode).to.equal(0)
217+
})
218+
})
219+
161220
withNamingSchema(
162221
async () => sendMessages(kafka, testTopic, messages),
163222
rawExpectedSchema.send

0 commit comments

Comments
 (0)