1+ import datadog.trace.common.writer.ListWriter
2+
13import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
24import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
35import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
@@ -53,13 +55,57 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
5355
5456 public static final LinkedHashMap<String , String > PRODUCER_PATHWAY_EDGE_TAGS
5557
58+ // filter out Kafka poll, since the function is called in a loop, giving inconsistent results
59+ final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter () {
60+ @Override
61+ boolean accept (List<DDSpan > trace ) {
62+ return ! (trace. size() == 1 &&
63+ trace. get(0 ). getResourceName(). toString(). equals(" kafka.poll" ))
64+ }
65+ }
66+
67+ final ListWriter.Filter dropEmptyKafkaPoll = new ListWriter.Filter () {
68+ @Override
69+ boolean accept (List<DDSpan > trace ) {
70+ return ! (trace. size() == 1 &&
71+ trace. get(0 ). getResourceName(). toString(). equals(" kafka.poll" ) &&
72+ trace. get(0 ). getTag(InstrumentationTags . KAFKA_RECORDS_COUNT ). equals(0 ))
73+ }
74+ }
75+
76+ // TraceID, start times & names changed based on the configuration, so overriding the sort to give consistent test results
77+ private static class SortKafkaTraces implements Comparator<List<DDSpan > > {
78+ @Override
79+ int compare (List<DDSpan > o1 , List<DDSpan > o2 ) {
80+ return rootSpanTrace(o1) - rootSpanTrace(o2)
81+ }
82+
83+ int rootSpanTrace (List<DDSpan > trace ) {
84+ assert ! trace. isEmpty()
85+ def rootSpan = trace. get(0 ). localRootSpan
86+ switch (rootSpan. operationName. toString()) {
87+ case " parent" :
88+ return 3
89+ case " kafka.poll" :
90+ return 2
91+ default :
92+ return 1
93+ }
94+ }
95+ }
96+
97+
5698 static {
5799 PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<> (3 )
58100 PRODUCER_PATHWAY_EDGE_TAGS . put(" direction" , " out" )
59101 PRODUCER_PATHWAY_EDGE_TAGS . put(" topic" , SHARED_TOPIC )
60102 PRODUCER_PATHWAY_EDGE_TAGS . put(" type" , " kafka" )
61103 }
62104
105+ def setup () {
106+ TEST_WRITER . setFilter(dropKafkaPoll)
107+ }
108+
63109 @Override
64110 int version () {
65111 0
@@ -124,9 +170,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
124170 }
125171 }
126172
127- if (isDataStreamsEnabled()) {
128- }
129-
130173 cleanup :
131174 producer. close()
132175 }
@@ -137,6 +180,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
137180 if (isDataStreamsEnabled()) {
138181 senderProps. put(ProducerConfig . METADATA_MAX_AGE_CONFIG , 1000 )
139182 }
183+ TEST_WRITER . setFilter(dropEmptyKafkaPoll)
140184 KafkaProducer<String , String > producer = new KafkaProducer<> (senderProps, new StringSerializer (), new StringSerializer ())
141185 String clusterId = " "
142186 if (isDataStreamsEnabled()) {
@@ -203,28 +247,37 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
203247 received. value() == greeting
204248 received. key() == null
205249
206- assertTraces( 2 , SORT_TRACES_BY_ID ) {
207- trace( 3 ) {
208- basicSpan(it, " parent " )
209- basicSpan(it, " producer callback " , span( 0 ) )
210- producerSpan(it, senderProps, span( 0 ), false )
211- }
250+ int nTraces = isDataStreamsEnabled() ? 3 : 2
251+ int produceTraceIdx = nTraces - 1
252+ TEST_WRITER . waitForTraces(nTraces )
253+ def traces = ( Arrays . asList( TEST_WRITER . toArray()) as List< List< DDSpan > > )
254+ Collections . sort(traces, new SortKafkaTraces () )
255+ assertTraces(nTraces, new SortKafkaTraces ()) {
212256 if (hasQueueSpan()) {
213257 trace(2 ) {
214- consumerSpan(it, consumerProperties, trace (1 )[ 1 ] )
215- queueSpan(it, trace(0 )[2 ])
258+ consumerSpan(it, consumerProperties, span (1 ))
259+ queueSpan(it, trace(produceTraceIdx )[2 ])
216260 }
217261 } else {
218262 trace(1 ) {
219- consumerSpan(it, consumerProperties, trace(0 )[2 ])
263+ consumerSpan(it, consumerProperties, trace(produceTraceIdx )[2 ])
220264 }
221265 }
266+ if (isDataStreamsEnabled()) {
267+ trace(1 , {
268+ pollSpan(it)
269+ })
270+ }
271+ trace(3 ) {
272+ basicSpan(it, " parent" )
273+ basicSpan(it, " producer callback" , span(0 ))
274+ producerSpan(it, senderProps, span(0 ), false )
275+ }
222276 }
223-
224277 def headers = received. headers()
225278 headers. iterator(). hasNext()
226- new String (headers. headers(" x-datadog-trace-id" ). iterator(). next(). value()) == " ${ TEST_WRITER[0 ][2].traceId} "
227- new String (headers. headers(" x-datadog-parent-id" ). iterator(). next(). value()) == " ${ TEST_WRITER[0 ][2].spanId} "
279+ new String (headers. headers(" x-datadog-trace-id" ). iterator(). next(). value()) == " ${ traces[produceTraceIdx ][2].traceId} "
280+ new String (headers. headers(" x-datadog-parent-id" ). iterator(). next(). value()) == " ${ traces[produceTraceIdx ][2].spanId} "
228281
229282 if (isDataStreamsEnabled()) {
230283 StatsGroup first = TEST_DATA_STREAMS_WRITER . groups. find { it. parentHash == 0 }
@@ -1069,6 +1122,27 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10691122 }
10701123 }
10711124
1125+ def pollSpan (
1126+ TraceAssert trace ,
1127+ int recordCount = 1 ,
1128+ DDSpan parentSpan = null ,
1129+ Range offset = 0 .. 0 ,
1130+ boolean tombstone = false ,
1131+ boolean distributedRootSpan = ! hasQueueSpan()
1132+ ) {
1133+ trace. span {
1134+ serviceName Config . get(). getServiceName()
1135+ operationName " kafka.poll"
1136+ resourceName " kafka.poll"
1137+ errored false
1138+ measured false
1139+ tags {
1140+ " $InstrumentationTags . KAFKA_RECORDS_COUNT " recordCount
1141+ defaultTags(true )
1142+ }
1143+ }
1144+ }
1145+
10721146 def waitForKafkaMetadataUpdate (KafkaTemplate kafkaTemplate ) {
10731147 kafkaTemplate. flush()
10741148 Producer<String , String > wrappedProducer = kafkaTemplate. getTheProducer()
0 commit comments