Skip to content

Commit 718a251

Browse files
Merge pull request #2516 from newrelic/NR-447530-update-kafka-producer-instrumentation
Update Kafka producer instrumentation
2 parents 91df77e + 462a857 commit 718a251

File tree

4 files changed

+243
-8
lines changed

4 files changed

+243
-8
lines changed

functional_test/src/test/java/com/newrelic/agent/instrumentation/kafka/KafkaTest.java

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ private void consumeMessage(KafkaConsumer<String, String> consumer) {
154154
private void processRecord(ConsumerRecord<String, String> record) {
155155
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.CUSTOM_HIGH,
156156
true, "kafka", "processRecord");
157+
158+
final Iterator<Header> traceparentIterator = record.headers().headers("traceparent").iterator();
159+
Assert.assertTrue("W3C traceparent header should be present", traceparentIterator.hasNext());
160+
161+
final Iterator<Header> tracestateIterator = record.headers().headers("tracestate").iterator();
162+
Assert.assertTrue("W3C tracestate header should be present", tracestateIterator.hasNext());
163+
157164
final Iterator<Header> nrIterator = record.headers().headers("newrelic").iterator();
158165
if (nrIterator.hasNext()) {
159166
final Header nrHeader = nrIterator.next();
@@ -164,6 +171,139 @@ private void processRecord(ConsumerRecord<String, String> record) {
164171
}
165172
}
166173

174+
@Test
175+
public void produceConsumeTestExcludeNewRelicHeader() throws Exception {
176+
EnvironmentHolderSettingsGenerator envHolderSettings = new EnvironmentHolderSettingsGenerator(CONFIG_FILE, "exclude_newrelic_header_test", CLASS_LOADER);
177+
EnvironmentHolder holder = new EnvironmentHolder(envHolderSettings);
178+
holder.setupEnvironment();
179+
kafkaUnitRule.getKafkaUnit().createTopic(testTopic, 1);
180+
final KafkaConsumer<String, String> consumer = setupConsumer();
181+
182+
final CountDownLatch latch = new CountDownLatch(2);
183+
final ConcurrentLinkedQueue<TransactionData> finishedTransactions = new ConcurrentLinkedQueue<>();
184+
TransactionListener transactionListener = (transactionData, transactionStats) -> {
185+
finishedTransactions.add(transactionData);
186+
latch.countDown();
187+
};
188+
ServiceFactory.getTransactionService().addTransactionListener(transactionListener);
189+
190+
try {
191+
produceMessage();
192+
final Future<?> submit = executorService.submit(() -> consumeMessageExcludeNewRelicHeader(consumer));
193+
submit.get(30, TimeUnit.SECONDS);
194+
latch.await(30, TimeUnit.SECONDS);
195+
196+
Assert.assertEquals(2, finishedTransactions.size());
197+
198+
TransactionData firstTransaction = finishedTransactions.poll();
199+
TransactionData secondTransaction = finishedTransactions.poll();
200+
201+
TransactionData conTxn = null;
202+
Assert.assertNotNull(firstTransaction);
203+
if (firstTransaction.getInboundDistributedTracePayload() != null) {
204+
conTxn = firstTransaction;
205+
} else {
206+
Assert.assertNotNull(secondTransaction);
207+
if (secondTransaction.getInboundDistributedTracePayload() != null) {
208+
conTxn = secondTransaction;
209+
}
210+
}
211+
212+
Assert.assertNotNull("Consumer transaction should have an inbound distributed trace payload", conTxn);
213+
Assert.assertNotNull("Inbound distributed trace payload should not be null", conTxn.getInboundDistributedTracePayload());
214+
} finally {
215+
ServiceFactory.getTransactionService().removeTransactionListener(transactionListener);
216+
consumer.close();
217+
}
218+
}
219+
220+
@Trace(dispatcher = true)
221+
private void consumeMessageExcludeNewRelicHeader(KafkaConsumer<String, String> consumer) {
222+
final ConsumerRecords<String, String> records = consumer.poll(1000);
223+
Assert.assertEquals(1, records.count());
224+
225+
for (ConsumerRecord<String, String> record : records) {
226+
processRecordExcludeNewRelicHeader(record);
227+
}
228+
}
229+
230+
private void processRecordExcludeNewRelicHeader(ConsumerRecord<String, String> record) {
231+
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.CUSTOM_HIGH,
232+
true, "kafka", "processRecord");
233+
234+
// Verify W3C Trace Context headers are present
235+
final Iterator<Header> traceparentIterator = record.headers().headers("traceparent").iterator();
236+
Assert.assertTrue("W3C traceparent header should be present", traceparentIterator.hasNext());
237+
238+
final Iterator<Header> tracestateIterator = record.headers().headers("tracestate").iterator();
239+
Assert.assertTrue("W3C tracestate header should be present", tracestateIterator.hasNext());
240+
241+
// Verify legacy newrelic header is NOT present when exclude_newrelic_header is true
242+
final Iterator<Header> nrIterator = record.headers().headers("newrelic").iterator();
243+
if (nrIterator.hasNext()) {
244+
Assert.fail("newrelic header should NOT be present when exclude_newrelic_header is true");
245+
}
246+
247+
// Accept W3C distributed trace headers (traceparent + tracestate)
248+
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(
249+
com.newrelic.api.agent.TransportType.Kafka,
250+
new KafkaHeadersAdapter(record.headers()));
251+
}
252+
253+
// Adapter to expose Kafka headers as NewRelic Headers for DT propagation
254+
private static class KafkaHeadersAdapter implements com.newrelic.api.agent.Headers {
255+
private final org.apache.kafka.common.header.Headers headers;
256+
257+
KafkaHeadersAdapter(org.apache.kafka.common.header.Headers headers) {
258+
this.headers = headers;
259+
}
260+
261+
@Override
262+
public com.newrelic.api.agent.HeaderType getHeaderType() {
263+
return com.newrelic.api.agent.HeaderType.MESSAGE;
264+
}
265+
266+
@Override
267+
public String getHeader(String name) {
268+
Iterator<Header> it = headers.headers(name).iterator();
269+
return it.hasNext() ? new String(it.next().value(), StandardCharsets.UTF_8) : null;
270+
}
271+
272+
@Override
273+
public Collection<String> getHeaders(String name) {
274+
Collection<String> result = new java.util.ArrayList<>();
275+
for (Header h : headers.headers(name)) {
276+
result.add(new String(h.value(), StandardCharsets.UTF_8));
277+
}
278+
return result;
279+
}
280+
281+
@Override
282+
public void setHeader(String name, String value) {
283+
headers.remove(name);
284+
headers.add(name, value.getBytes(StandardCharsets.UTF_8));
285+
}
286+
287+
@Override
288+
public void addHeader(String name, String value) {
289+
headers.add(name, value.getBytes(StandardCharsets.UTF_8));
290+
}
291+
292+
@Override
293+
public Collection<String> getHeaderNames() {
294+
Collection<String> names = new java.util.HashSet<>();
295+
for (Header h : headers) {
296+
names.add(h.key());
297+
}
298+
return names;
299+
}
300+
301+
@Override
302+
public boolean containsHeader(String name) {
303+
return headers.headers(name).iterator().hasNext();
304+
}
305+
}
306+
167307
private KafkaConsumer<String, String> setupConsumer() {
168308
final Properties props = new Properties();
169309
props.put("bootstrap.servers", kafkaUnitRule.getKafkaUnit().getKafkaConnect());

functional_test/src/test/resources/configs/span_events_test.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,13 @@ transaction_events_disabled_attribute_filtering:
7878

7979
span_events.attributes.enabled: true
8080

81-
span_events.attributes.exclude: txAttrib5,spanAttrib1
81+
span_events.attributes.exclude: txAttrib5,spanAttrib1
82+
83+
exclude_newrelic_header_test:
84+
85+
distributed_tracing:
86+
enabled: true
87+
exclude_newrelic_header: true
88+
89+
span_events:
90+
enabled: true
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
*
3+
* * Copyright 2025 New Relic Corporation. All rights reserved.
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package com.nr.instrumentation.kafka;
9+
10+
import com.newrelic.api.agent.HeaderType;
11+
import com.newrelic.api.agent.Headers;
12+
import org.apache.kafka.common.header.Header;
13+
14+
import java.util.ArrayList;
15+
import java.util.Collection;
16+
import java.util.HashSet;
17+
import java.util.Iterator;
18+
import java.util.Objects;
19+
20+
public class HeadersWrapper implements Headers {
21+
22+
private final org.apache.kafka.common.header.Headers delegate;
23+
24+
public HeadersWrapper(org.apache.kafka.common.header.Headers headers) {
25+
this.delegate = headers;
26+
}
27+
28+
@Override
29+
public HeaderType getHeaderType() {
30+
return HeaderType.MESSAGE;
31+
}
32+
33+
@Override
34+
public String getHeader(String name) {
35+
String value = null;
36+
Iterator<Header> iterator = delegate.headers(name).iterator();
37+
if (iterator.hasNext()) {
38+
byte[] bytes = iterator.next().value();
39+
if (bytes != null) {
40+
value = new String(bytes);
41+
}
42+
}
43+
return value;
44+
}
45+
46+
@Override
47+
public Collection<String> getHeaders(String name) {
48+
Collection<String> headers = new ArrayList<>();
49+
Iterator<Header> iterator = delegate.headers(name).iterator();
50+
while (iterator.hasNext()) {
51+
byte[] bytes = iterator.next().value();
52+
if (bytes != null) {
53+
headers.add(new String(bytes));
54+
}
55+
}
56+
return headers;
57+
}
58+
59+
@Override
60+
public void setHeader(String name, String value) {
61+
delegate.remove(name);
62+
delegate.add(name, value.getBytes());
63+
}
64+
65+
@Override
66+
public void addHeader(String name, String value) {
67+
delegate.add(name, value.getBytes());
68+
}
69+
70+
@Override
71+
public Collection<String> getHeaderNames() {
72+
Collection<String> headerNames = new HashSet<>();
73+
for(Header header : delegate) {
74+
headerNames.add(header.key());
75+
}
76+
return headerNames;
77+
}
78+
79+
@Override
80+
public boolean containsHeader(String name) {
81+
for(Header header : delegate) {
82+
if (Objects.equals(name,header.key())) {
83+
return true;
84+
}
85+
}
86+
return false;
87+
}
88+
}

instrumentation/kafka-clients-spans-0.11.0.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
package org.apache.kafka.clients.producer;
99

1010
import com.newrelic.agent.bridge.AgentBridge;
11-
import com.newrelic.agent.bridge.NoOpDistributedTracePayload;
1211
import com.newrelic.agent.bridge.Transaction;
13-
import com.newrelic.api.agent.DistributedTracePayload;
12+
import com.newrelic.api.agent.Headers;
13+
import com.newrelic.api.agent.NewRelic;
1414
import com.newrelic.api.agent.Trace;
1515
import com.newrelic.api.agent.weaver.Weave;
1616
import com.newrelic.api.agent.weaver.Weaver;
17+
import com.nr.instrumentation.kafka.HeadersWrapper;
1718

18-
import java.nio.charset.StandardCharsets;
1919
import java.util.concurrent.Future;
2020

2121
@Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer")
@@ -25,10 +25,8 @@ public class KafkaProducer_Instrumentation<K, V> {
2525
private Future<RecordMetadata> doSend(ProducerRecord record, Callback callback) {
2626
final Transaction transaction = AgentBridge.getAgent().getTransaction(false);
2727
if (transaction != null) {
28-
DistributedTracePayload payload = transaction.createDistributedTracePayload();
29-
if (!(payload instanceof NoOpDistributedTracePayload)) {
30-
record.headers().add("newrelic", payload.text().getBytes(StandardCharsets.UTF_8));
31-
}
28+
Headers dtHeaders = new HeadersWrapper(record.headers());
29+
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(dtHeaders);
3230
}
3331
return Weaver.callOriginal();
3432
}

0 commit comments

Comments
 (0)