Skip to content

Fix kafka clusterId gathering possible to crash #5837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 144 additions & 155 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
const batchConsumerFinishCh = channel('apm:kafkajs:consume-batch:finish')
const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error')

const disabledHeaderWeakSet = new WeakSet()

function commitsFromEvent (event) {
const { payload: { groupId, topics } } = event
const commitList = []
Expand Down Expand Up @@ -52,75 +50,91 @@

shimmer.wrap(Kafka.prototype, 'producer', createProducer => function () {
const producer = createProducer.apply(this, arguments)
const send = producer.send
const originalSend = producer.send
const bootstrapServers = this._brokers

const kafkaClusterIdPromise = getKafkaClusterId(this)

producer.send = function () {
const wrappedSend = (clusterId) => {
const { topic, messages = [] } = arguments[0]
let clusterId
let disableHeaderInjection = false

const ctx = {
bootstrapServers,
clusterId,
disableHeaderInjection: disabledHeaderWeakSet.has(producer),
messages,
topic
}
// TODO: Change this by hooking into kafka's connect method.
// That receives the clusterId and we can avoid the admin call.
// That way the clusterId is also immediately available for all calls.
producer.on(producer.events.CONNECT, () => {
getKafkaClusterId(this).then((id) => {
clusterId = id
})
})

for (const message of messages) {
if (message !== null && typeof message === 'object' && !ctx.disableHeaderInjection) {
message.headers = message.headers || {}
}
producer.send = function send (...args) {
// Do not manipulate user input by copying the messages
let malformedMessage = false
if (!args[0] || !Array.isArray(args[0].messages)) {
malformedMessage = true
} else if (!disableHeaderInjection) {
args[0] = {
...args[0],
messages: args[0].messages.map((message) => {
if (typeof message !== 'object' || message === null) {
malformedMessage = true
return message
}
return { ...message, headers: { ...message.headers } }
})
}
}

return producerStartCh.runStores(ctx, () => {
try {
const result = send.apply(this, arguments)
result.then(
(res) => {
ctx.result = res
producerFinishCh.publish(ctx)
producerCommitCh.publish(ctx)
},
(err) => {
ctx.error = err
if (err) {
// Fixes bug where we would inject message headers for kafka brokers that don't support headers
// (version <0.11). On the error, we disable header injection.
// Unfortunately the error name / type is not more specific.
// This approach is implemented by other tracers as well.
if (err.name === 'KafkaJSProtocolError' && err.type === 'UNKNOWN') {
disabledHeaderWeakSet.add(producer)
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
'Please look at broker logs for more information. ' +
'Tracer message header injection for Kafka is disabled.')
}
producerErrorCh.publish(err)
}
producerFinishCh.publish(ctx)
})
const topic = args[0]?.topic
const messages = args[0]?.messages

return result
} catch (e) {
ctx.error = e
producerErrorCh.publish(ctx)
producerFinishCh.publish(ctx)
throw e
}
})
const ctx = {
bootstrapServers,
clusterId,
disableHeaderInjection,
messages,
topic,
malformedMessage
}

if (isPromise(kafkaClusterIdPromise)) {
// promise is not resolved
return kafkaClusterIdPromise.then((clusterId) => {
return wrappedSend(clusterId)
})
}
return producerStartCh.runStores(ctx, () => {
try {
const result = originalSend.apply(this, args)
result.then(
(res) => {
ctx.result = res
producerFinishCh.publish(ctx)
producerCommitCh.publish(ctx)
},
(err) => {
ctx.error = err
// TODO: Read the broker's versions (minimum version 3) by hooking into that code.
// That way all messages would pass and it's clear from the beginning on if headers
// are allowed or not.
// Currently we only know that after the first message is sent, which the customer
// now has to resend.
if (err) {
// Fixes bug where we would inject message headers for kafka brokers that don't support headers
// (version <0.11). On the error, we disable header injection.
// Unfortunately the error name / type is not more specific.
// This approach is implemented by other tracers as well.
if (err.name === 'KafkaJSProtocolError' && err.type === 'UNKNOWN') {
disableHeaderInjection = true
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
'Please look at broker logs for more information. ' +
'Tracer message header injection for Kafka is disabled.')
}
producerErrorCh.publish(err)
}
producerFinishCh.publish(ctx)
})

// promise is already resolved
return wrappedSend(kafkaClusterIdPromise)
return result
} catch (e) {
ctx.error = e
producerErrorCh.publish(ctx)
producerFinishCh.publish(ctx)
throw e
}
})
}
return producer
})
Expand All @@ -130,133 +144,108 @@
return createConsumer.apply(this, arguments)
}

const kafkaClusterIdPromise = getKafkaClusterId(this)
let clusterId
const { groupId } = arguments[0]

const eachMessageExtractor = (args, clusterId) => {
const eachMessageExtractor = (args) => {
const { topic, partition, message } = args[0]
return { topic, partition, message, groupId, clusterId }
}

const eachBatchExtractor = (args, clusterId) => {
const eachBatchExtractor = (args) => {
const { batch } = args[0]
const { topic, partition, messages } = batch
return { topic, partition, messages, groupId, clusterId }
}

const consumer = createConsumer.apply(this, arguments)

consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent)

const run = consumer.run
const groupId = arguments[0].groupId

consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) {
const wrapConsume = (clusterId) => {
return run({
eachMessage: wrappedCallback(
eachMessage,
consumerStartCh,
consumerFinishCh,
consumerErrorCh,
eachMessageExtractor,
clusterId
),
eachBatch: wrappedCallback(
eachBatch,
batchConsumerStartCh,
batchConsumerFinishCh,
batchConsumerErrorCh,
eachBatchExtractor,
clusterId
),
...runArgs
})
}
consumer.on(consumer.events.CONNECT, () => {
getKafkaClusterId(this).then((id) => {
clusterId = id
})
})

if (isPromise(kafkaClusterIdPromise)) {
// promise is not resolved
return kafkaClusterIdPromise.then((clusterId) => {
return wrapConsume(clusterId)
})
}
consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent)

// promise is already resolved
return wrapConsume(kafkaClusterIdPromise)
const originalRun = consumer.run

consumer.run = function run ({ eachMessage, eachBatch, ...runArgs }) {
return originalRun({
eachMessage: wrappedCallback(
eachMessage,
consumerStartCh,
consumerFinishCh,
consumerErrorCh,
eachMessageExtractor,

Check failure on line 180 in packages/datadog-instrumentations/src/kafkajs.js

View workflow job for this annotation

GitHub Actions / lint

Unexpected trailing comma
),
eachBatch: wrappedCallback(
eachBatch,
batchConsumerStartCh,
batchConsumerFinishCh,
batchConsumerErrorCh,
eachBatchExtractor,

Check failure on line 187 in packages/datadog-instrumentations/src/kafkajs.js

View workflow job for this annotation

GitHub Actions / lint

Unexpected trailing comma
),
...runArgs
})
}
return consumer
})
return Kafka
})

const wrappedCallback = (fn, startCh, finishCh, errorCh, extractArgs, clusterId) => {
return typeof fn === 'function'
? function (...args) {
const extractedArgs = extractArgs(args, clusterId)
const ctx = {
extractedArgs
}
const wrappedCallback = (fn, startCh, finishCh, errorCh, extractArgs) => {
if (typeof fn !== 'function') {
return fn
}
return function (...args) {
const ctx = {
extractedArgs: extractArgs(args)
}

return startCh.runStores(ctx, () => {
try {
const result = fn.apply(this, args)
if (result && typeof result.then === 'function') {
result.then(
(res) => {
ctx.result = res
finishCh.publish(ctx)
},
(err) => {
ctx.error = err
if (err) {
errorCh.publish(ctx)
}
finishCh.publish(ctx)
})
} else {
finishCh.publish(ctx)
}
return result
} catch (e) {
ctx.error = e
errorCh.publish(ctx)
return startCh.runStores(ctx, () => {
try {
const result = fn.apply(this, args)
if (typeof result?.then === 'function') {
result.then(
(res) => {
ctx.result = res
finishCh.publish(ctx)
},
(err) => {
ctx.error = err
errorCh.publish(ctx)
finishCh.publish(ctx)
})
} else {
finishCh.publish(ctx)
throw e
}
})
}
: fn
return result
} catch (e) {
ctx.error = e
errorCh.publish(ctx)
finishCh.publish(ctx)
throw e
}
})
}
}

const getKafkaClusterId = (kafka) => {
if (kafka._ddKafkaClusterId) {
return kafka._ddKafkaClusterId
}
const admin = kafka.admin?.()

if (!kafka.admin) {
return null
}

const admin = kafka.admin()

if (!admin.describeCluster) {
return null
if (!admin?.describeCluster) {
return Promise.resolve()
}

return admin.connect()
.then(() => {
return admin.describeCluster()
})
.then((clusterInfo) => {
const clusterId = clusterInfo?.clusterId
kafka._ddKafkaClusterId = clusterId
admin.disconnect()
return clusterId
admin.disconnect().catch(() => {})
return clusterInfo?.clusterId
}).catch(() => {
admin.disconnect().catch(() => {})
})
.catch((error) => {
throw error
})
}

function isPromise (obj) {
return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'
}
Loading
Loading