Skip to content

Commit 12a239f

Browse files
authored
Send process tags once per payload (#9657)
* Send process tags once per payload * suggestions
1 parent b776287 commit 12a239f

File tree

8 files changed

+47
-43
lines changed

8 files changed

+47
-43
lines changed
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package datadog.communication.serialization;
22

3-
// TODO @FunctionalInterface
3+
@FunctionalInterface
44
public interface Mapper<T> {
55
void map(T data, Writable packer);
6+
7+
default void reset() {}
68
}

communication/src/main/java/datadog/communication/serialization/msgpack/MsgPackWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public <T> boolean format(T message, Mapper<T> mapper) {
9090
// max capacity, then reject the message
9191
if (buffer.flush()) {
9292
try {
93+
mapper.reset();
9394
mapper.map(message, this);
9495
buffer.mark();
9596
return true;

dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteMapper.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ public interface RemoteMapper extends Mapper<List<? extends CoreSpan<?>>> {
3636

3737
int messageBufferSize();
3838

39-
void reset();
40-
4139
String endpoint();
4240

4341
class NoopRemoteMapper implements RemoteMapper {
@@ -55,9 +53,6 @@ public int messageBufferSize() {
5553
return 0;
5654
}
5755

58-
@Override
59-
public void reset() {}
60-
6156
@Override
6257
public String endpoint() {
6358
return null;

dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import datadog.communication.serialization.Writable;
88
import datadog.communication.serialization.msgpack.MsgPackWriter;
99
import datadog.trace.api.Config;
10-
import datadog.trace.api.ProcessTags;
1110
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
1211
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
1312
import datadog.trace.common.writer.Payload;
@@ -35,6 +34,7 @@ public final class TraceMapperV0_4 implements TraceMapper {
3534
: null;
3635

3736
private final int size;
37+
private boolean firstSpanWritten;
3838

3939
public TraceMapperV0_4(int size) {
4040
this.size = size;
@@ -47,21 +47,19 @@ public TraceMapperV0_4() {
4747
private static final class MetaWriter implements MetadataConsumer {
4848

4949
private Writable writable;
50-
private boolean firstSpanInChunk;
51-
private boolean lastSpanInChunk;
50+
private boolean firstSpanInTrace;
51+
private boolean lastSpanInTrace;
52+
private boolean firstSpanInPayload;
5253

5354
MetaWriter withWritable(Writable writable) {
5455
this.writable = writable;
5556
return this;
5657
}
5758

58-
MetaWriter forFirstSpanInChunk(final boolean firstSpanInChunk) {
59-
this.firstSpanInChunk = firstSpanInChunk;
60-
return this;
61-
}
62-
63-
MetaWriter forLastSpanInChunk(final boolean lastSpanInChunk) {
64-
this.lastSpanInChunk = lastSpanInChunk;
59+
MetaWriter forSpan(boolean firstInTrace, boolean lastInTrace, boolean firstInPayload) {
60+
this.firstSpanInTrace = firstInTrace;
61+
this.lastSpanInTrace = lastInTrace;
62+
this.firstSpanInPayload = firstInPayload;
6563
return this;
6664
}
6765

@@ -70,9 +68,8 @@ public void accept(Metadata metadata) {
7068
if (TAG_CACHE != null) TAG_CACHE.recalibrate();
7169
if (VALUE_CACHE != null) VALUE_CACHE.recalibrate();
7270

73-
final boolean writeSamplingPriority = firstSpanInChunk || lastSpanInChunk;
74-
final UTF8BytesString processTags =
75-
firstSpanInChunk ? ProcessTags.getTagsForSerialization() : null;
71+
final boolean writeSamplingPriority = firstSpanInTrace || lastSpanInTrace;
72+
final UTF8BytesString processTags = firstSpanInPayload ? metadata.processTags() : null;
7673
int metaSize =
7774
metadata.getBaggage().size()
7875
+ metadata.getTags().size()
@@ -301,12 +298,12 @@ public void map(List<? extends CoreSpan<?>> trace, final Writable writable) {
301298
span.processTagsAndBaggage(
302299
metaWriter
303300
.withWritable(writable)
304-
.forFirstSpanInChunk(i == 0)
305-
.forLastSpanInChunk(i == trace.size() - 1));
301+
.forSpan(i == 0, i == trace.size() - 1, !firstSpanWritten));
306302
if (!metaStruct.isEmpty()) {
307303
/* 13 */
308304
metaStructWriter.withWritable(writable).write(metaStruct);
309305
}
306+
firstSpanWritten = true;
310307
}
311308
}
312309

@@ -321,7 +318,9 @@ public int messageBufferSize() {
321318
}
322319

323320
@Override
324-
public void reset() {}
321+
public void reset() {
322+
firstSpanWritten = false;
323+
}
325324

326325
@Override
327326
public String endpoint() {

dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_5.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import datadog.communication.serialization.Writable;
88
import datadog.communication.serialization.WritableFormatter;
99
import datadog.communication.serialization.msgpack.MsgPackWriter;
10-
import datadog.trace.api.ProcessTags;
1110
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
1211
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
1312
import datadog.trace.common.writer.Payload;
@@ -33,6 +32,7 @@ public final class TraceMapperV0_5 implements TraceMapper {
3332

3433
private final MetaWriter metaWriter = new MetaWriter();
3534
private final int size;
35+
private boolean firstSpanWritten;
3636

3737
public TraceMapperV0_5() {
3838
this(2 << 20);
@@ -79,10 +79,10 @@ public void map(final List<? extends CoreSpan<?>> trace, final Writable writable
7979
span.processTagsAndBaggage(
8080
metaWriter
8181
.withWritable(writable)
82-
.forFirstSpanInChunk(i == 0)
83-
.forLastSpanInChunk(i == trace.size() - 1));
82+
.forSpan(i == 0, i == trace.size() - 1, !firstSpanWritten));
8483
/* 12 */
8584
writeDictionaryEncoded(writable, span.getType());
85+
firstSpanWritten = true;
8686
}
8787
}
8888

@@ -115,6 +115,7 @@ public int messageBufferSize() {
115115
public void reset() {
116116
dictionary.reset();
117117
encoding.clear();
118+
firstSpanWritten = false;
118119
}
119120

120121
@Override
@@ -181,29 +182,26 @@ private List<ByteBuffer> toList() {
181182
private final class MetaWriter implements MetadataConsumer {
182183

183184
private Writable writable;
184-
private boolean firstSpanInChunk;
185-
private boolean lastSpanInChunk;
185+
private boolean firstSpanInTrace;
186+
private boolean lastSpanInTrace;
187+
private boolean firstSpanInPayload;
186188

187189
MetaWriter withWritable(final Writable writable) {
188190
this.writable = writable;
189191
return this;
190192
}
191193

192-
MetaWriter forFirstSpanInChunk(final boolean firstSpanInChunk) {
193-
this.firstSpanInChunk = firstSpanInChunk;
194-
return this;
195-
}
196-
197-
MetaWriter forLastSpanInChunk(final boolean lastSpanInChunk) {
198-
this.lastSpanInChunk = lastSpanInChunk;
194+
MetaWriter forSpan(boolean firstInTrace, boolean lastInTrace, boolean firstInPayload) {
195+
this.firstSpanInTrace = firstInTrace;
196+
this.lastSpanInTrace = lastInTrace;
197+
this.firstSpanInPayload = firstInPayload;
199198
return this;
200199
}
201200

202201
@Override
203202
public void accept(Metadata metadata) {
204-
final boolean writeSamplingPriority = firstSpanInChunk || lastSpanInChunk;
205-
final UTF8BytesString processTags =
206-
firstSpanInChunk ? ProcessTags.getTagsForSerialization() : null;
203+
final boolean writeSamplingPriority = firstSpanInTrace || lastSpanInTrace;
204+
final UTF8BytesString processTags = firstSpanInPayload ? metadata.processTags() : null;
207205
int metaSize =
208206
metadata.getBaggage().size()
209207
+ metadata.getTags().size()

dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentWriterCombinedTest.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package datadog.trace.common.writer
22

3+
import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED
4+
35
import datadog.trace.api.DDSpanId
46
import datadog.trace.api.DDTraceId
7+
import datadog.trace.api.ProcessTags
58
import datadog.trace.api.StatsDClient
69
import datadog.trace.api.sampling.PrioritySampling
710
import datadog.trace.api.datastreams.NoopPathwayContext
@@ -194,6 +197,9 @@ class DDAgentWriterCombinedTest extends DDCoreSpecification {
194197
@Timeout(30)
195198
def "test default buffer size for #agentVersion"() {
196199
setup:
200+
// disable process tags since they are only written on the first span and it will break the trace size estimation
201+
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false")
202+
ProcessTags.reset()
197203
def api = Mock(DDAgentApi)
198204
def discovery = Mock(DDAgentFeaturesDiscovery)
199205
def writer = DDAgentWriter.builder()

dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV04PayloadTest.groovy

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ class TraceMapperV04PayloadTest extends DDSpecification {
5555
if (!packer.format(trace, traceMapper)) {
5656
verifier.skipLargeTrace()
5757
tracesFitInBuffer = false
58+
// in the real like the mapper is always reset each trace.
59+
// here we need to force it when we fail since the buffer will be reset as well
60+
traceMapper.reset()
5861
}
5962
}
6063
packer.flush()
@@ -239,7 +242,7 @@ class TraceMapperV04PayloadTest extends DDSpecification {
239242
if (expectedTraces.isEmpty() && messageCount == 0) {
240243
return
241244
}
242-
boolean hasProcessTags = false
245+
int processTagsCount = 0
243246
try {
244247
Payload payload = mapper.newPayload().withBody(messageCount, buffer)
245248
payload.writeTo(this)
@@ -351,7 +354,7 @@ class TraceMapperV04PayloadTest extends DDSpecification {
351354
assertTrue(Config.get().isExperimentalPropagateProcessTagsEnabled())
352355
assertEquals(0, k)
353356
assertEquals(ProcessTags.tagsForSerialization.toString(), entry.getValue())
354-
hasProcessTags = true
357+
processTagsCount++
355358
} else {
356359
Object tag = expectedSpan.getTag(entry.getKey())
357360
if (null != tag) {
@@ -379,10 +382,10 @@ class TraceMapperV04PayloadTest extends DDSpecification {
379382
} catch (IOException e) {
380383
Assertions.fail(e.getMessage())
381384
} finally {
382-
assert hasProcessTags == Config.get().isExperimentalPropagateProcessTagsEnabled()
383385
mapper.reset()
384386
captured.position(0)
385387
captured.limit(captured.capacity())
388+
assert processTagsCount == (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0)
386389
}
387390
}
388391

dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV05PayloadTest.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class TraceMapperV05PayloadTest extends DDSpecification {
216216

217217
@Override
218218
void accept(int messageCount, ByteBuffer buffer) {
219-
def hasProcessTags = false
219+
def processTagsCount = 0
220220
try {
221221
Payload payload = mapper.newPayload().withBody(messageCount, buffer)
222222
payload.writeTo(this)
@@ -268,7 +268,7 @@ class TraceMapperV05PayloadTest extends DDSpecification {
268268
} else if(DDTags.ORIGIN_KEY.equals(entry.getKey())) {
269269
assertEquals(expectedSpan.getOrigin(), entry.getValue())
270270
} else if (DDTags.PROCESS_TAGS.equals(entry.getKey())) {
271-
hasProcessTags = true
271+
processTagsCount++
272272
assertTrue(Config.get().isExperimentalPropagateProcessTagsEnabled())
273273
assertEquals(0, k)
274274
assertEquals(ProcessTags.tagsForSerialization.toString(), entry.getValue())
@@ -338,7 +338,7 @@ class TraceMapperV05PayloadTest extends DDSpecification {
338338
} catch (IOException e) {
339339
Assert.fail(e.getMessage())
340340
} finally {
341-
assert hasProcessTags == Config.get().isExperimentalPropagateProcessTagsEnabled()
341+
assert processTagsCount == (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0)
342342
mapper.reset()
343343
captured.position(0)
344344
captured.limit(captured.capacity())

0 commit comments

Comments
 (0)