Skip to content

Surface process tags in dsm payloads and use them for base hash calculation #8836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.datadoghq.sketch.ddsketch.encoding.VarEncodingHelper;
import datadog.context.propagation.CarrierVisitor;
import datadog.trace.api.Config;
import datadog.trace.api.ProcessTags;
import datadog.trace.api.WellKnownTags;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.PathwayContext;
Expand Down Expand Up @@ -363,6 +364,10 @@ public static long getBaseHash(WellKnownTags wellKnownTags) {
if (primaryTag != null) {
builder.append(primaryTag);
}
CharSequence processTags = ProcessTags.getTagsForSerialization();
if (processTags != null) {
builder.append(processTags);
}
return FNV64Hash.generateHash(builder.toString(), FNV64Hash.Version.v1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import datadog.communication.serialization.WritableFormatter;
import datadog.communication.serialization.msgpack.MsgPackWriter;
import datadog.trace.api.Config;
import datadog.trace.api.ProcessTags;
import datadog.trace.api.WellKnownTags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.common.metrics.Sink;
import java.util.Collection;
import java.util.List;
Expand All @@ -33,6 +35,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter
private static final byte[] BACKLOG_VALUE = "Value".getBytes(ISO_8859_1);
private static final byte[] BACKLOG_TAGS = "Tags".getBytes(ISO_8859_1);
private static final byte[] PRODUCTS_MASK = "ProductMask".getBytes(ISO_8859_1);
private static final byte[] PROCESS_TAGS = "ProcessTags".getBytes(ISO_8859_1);

private static final int INITIAL_CAPACITY = 512 * 1024;

Expand Down Expand Up @@ -80,7 +83,9 @@ public long getProductsMask() {

@Override
public void writePayload(Collection<StatsBucket> data, String serviceNameOverride) {
writer.startMap(8);
final List<UTF8BytesString> processTags = ProcessTags.getTagsAsUTF8ByteStringList();
final boolean hasProcessTags = processTags != null;
writer.startMap(8 + (hasProcessTags ? 1 : 0));
/* 1 */
writer.writeUTF8(ENV);
writer.writeUTF8(wellKnownTags.getEnv());
Expand Down Expand Up @@ -139,6 +144,13 @@ public void writePayload(Collection<StatsBucket> data, String serviceNameOverrid
writer.writeUTF8(PRODUCTS_MASK);
writer.writeLong(getProductsMask());

/* 9 */
if (hasProcessTags) {
writer.writeUTF8(PROCESS_TAGS);
writer.startArray(processTags.size());
processTags.forEach(writer::writeUTF8);
}

buffer.mark();
sink.accept(buffer.messageCount(), buffer.slice());
buffer.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery
import datadog.communication.ddagent.SharedCommunicationObjects
import datadog.communication.http.OkHttpUtils
import datadog.trace.api.Config
import datadog.trace.api.ProcessTags
import datadog.trace.api.TraceConfig
import datadog.trace.api.WellKnownTags
import datadog.trace.api.time.ControllableTimeSource
Expand All @@ -21,6 +22,7 @@ import spock.lang.Shared
import spock.util.concurrent.PollingConditions

import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED
import static java.util.concurrent.TimeUnit.SECONDS

/**
Expand Down Expand Up @@ -103,8 +105,11 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert unpacker.unpackString() == serviceNameOverride
}

def "Write bucket to mock server"() {
given:
def "Write bucket to mock server with process tags enabled #processTagsEnabled"() {
setup:
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "$processTagsEnabled")
ProcessTags.reset()

def conditions = new PollingConditions(timeout: 2)

def testOkhttpClient = OkHttpUtils.buildHttpClient(HttpUrl.get(server.address), 5000L)
Expand Down Expand Up @@ -152,16 +157,23 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert requestBodies.size() == 1
}

validateMessage(requestBodies[0])
validateMessage(requestBodies[0], processTagsEnabled)

cleanup:
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false")
ProcessTags.reset()

where:
processTagsEnabled << [true, false]
}

def validateMessage(byte[] message) {
def validateMessage(byte[] message, boolean processTagsEnabled) {
GzipSource gzipSource = new GzipSource(Okio.source(new ByteArrayInputStream(message)))

BufferedSource bufferedSource = Okio.buffer(gzipSource)
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream())

assert unpacker.unpackMapHeader() == 8
assert unpacker.unpackMapHeader() == 8 + (processTagsEnabled ? 1 : 0)
assert unpacker.unpackString() == "Env"
assert unpacker.unpackString() == "test"
assert unpacker.unpackString() == "Service"
Expand Down Expand Up @@ -265,6 +277,16 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert unpacker.unpackString() == "ProductMask"
assert unpacker.unpackLong() == 1

def processTags = ProcessTags.getTagsAsStringList()
assert unpacker.hasNext() == (processTags != null)
if (processTags != null) {
assert unpacker.unpackString() == "ProcessTags"
assert unpacker.unpackArrayHeader() == processTags.size()
processTags.each {
assert unpacker.unpackString() == it
}
}

return true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package datadog.trace.core.datastreams
import datadog.communication.ddagent.DDAgentFeaturesDiscovery
import datadog.trace.api.Config
import datadog.trace.api.DDTraceId
import datadog.trace.api.ProcessTags
import datadog.trace.api.TraceConfig
import datadog.trace.api.WellKnownTags
import datadog.trace.api.datastreams.StatsPoint
Expand All @@ -17,6 +18,7 @@ import java.util.function.Consumer

import static datadog.context.Context.root
import static datadog.trace.api.TracePropagationStyle.DATADOG
import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED
import static datadog.trace.api.config.GeneralConfig.PRIMARY_TAG
import static datadog.trace.api.datastreams.DataStreamsContext.create
import static datadog.trace.api.datastreams.DataStreamsContext.fromTags
Expand Down Expand Up @@ -428,6 +430,23 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
firstBaseHash != secondBaseHash
}

def "Process Tags used in hash calculation"() {
when:
def firstBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags)

injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "true")
ProcessTags.reset()
ProcessTags.addTag("000", "first")
def secondBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags)

then:
firstBaseHash != secondBaseHash
assert ProcessTags.getTagsForSerialization().startsWithAny("000:first,")
cleanup:
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false")
ProcessTags.reset()
}

def "Check context extractor decorator behavior"() {
given:
def sink = Mock(Sink)
Expand Down
53 changes: 38 additions & 15 deletions internal-api/src/main/java/datadog/trace/api/ProcessTags.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand All @@ -19,12 +20,14 @@ public class ProcessTags {
private static boolean enabled = Config.get().isExperimentalPropagateProcessTagsEnabled();

private static class Lazy {
static final Map<String, String> TAGS = loadTags();
// the tags are used to compute a hash for dsm hence that map must be sorted.
static final SortedMap<String, String> TAGS = loadTags();
static volatile UTF8BytesString serializedForm;
static volatile List<String> listForm;
static volatile List<UTF8BytesString> utf8ListForm;
static volatile List<String> stringListForm;

private static Map<String, String> loadTags() {
Map<String, String> tags = new LinkedHashMap<>();
private static SortedMap<String, String> loadTags() {
SortedMap<String, String> tags = new TreeMap<>();
if (enabled) {
try {
fillBaseTags(tags);
Expand Down Expand Up @@ -86,15 +89,21 @@ private static void fillJbossTags(Map<String, String> tags) {
}

static void calculate() {
if (listForm != null || TAGS.isEmpty()) {
if (serializedForm != null || TAGS.isEmpty()) {
return;
}
synchronized (Lazy.TAGS) {
final Stream<String> tagStream =
final Stream<UTF8BytesString> tagStream =
TAGS.entrySet().stream()
.map(entry -> entry.getKey() + ":" + TraceUtils.normalizeTag(entry.getValue()));
listForm = Collections.unmodifiableList(tagStream.collect(Collectors.toList()));
serializedForm = UTF8BytesString.create(String.join(",", listForm));
.map(
entry ->
UTF8BytesString.create(
entry.getKey() + ":" + TraceUtils.normalizeTag(entry.getValue())));
utf8ListForm = Collections.unmodifiableList(tagStream.collect(Collectors.toList()));
stringListForm =
Collections.unmodifiableList(
utf8ListForm.stream().map(UTF8BytesString::toString).collect(Collectors.toList()));
serializedForm = UTF8BytesString.create(String.join(",", utf8ListForm));
}
}
}
Expand All @@ -107,21 +116,34 @@ public static void addTag(String key, String value) {
synchronized (Lazy.TAGS) {
Lazy.TAGS.put(key, value);
Lazy.serializedForm = null;
Lazy.listForm = null;
Lazy.stringListForm = null;
Lazy.utf8ListForm = null;
}
}
}

public static List<String> getTagsAsList() {
public static List<UTF8BytesString> getTagsAsUTF8ByteStringList() {
if (!enabled) {
return null;
}
final List<String> listForm = Lazy.listForm;
final List<UTF8BytesString> listForm = Lazy.utf8ListForm;
if (listForm != null) {
return listForm;
}
Lazy.calculate();
return Lazy.listForm;
return Lazy.utf8ListForm;
}

public static List<String> getTagsAsStringList() {
if (!enabled) {
return null;
}
final List<String> listForm = Lazy.stringListForm;
if (listForm != null) {
return listForm;
}
Lazy.calculate();
return Lazy.stringListForm;
}

public static UTF8BytesString getTagsForSerialization() {
Expand All @@ -141,7 +163,8 @@ static void empty() {
synchronized (Lazy.TAGS) {
Lazy.TAGS.clear();
Lazy.serializedForm = null;
Lazy.listForm = null;
Lazy.stringListForm = null;
Lazy.utf8ListForm = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ProcessTagsForkedTest extends DDSpecification {
tags =~ expected
where:
jar | cls | expected
Paths.get("my test", "my.jar").toFile() | null | "entrypoint.name:my,entrypoint.basedir:my_test,entrypoint.workdir:[^,]+"
Paths.get("my test", "my.jar").toFile() | null | "entrypoint.basedir:my_test,entrypoint.name:my,entrypoint.workdir:[^,]+"
Paths.get("my.jar").toFile() | null | "entrypoint.name:my,entrypoint.workdir:[^,]+"
null | "com.test.Main" | "entrypoint.name:com.test.main,entrypoint.workdir:[^,]+"
null | null | "entrypoint.workdir:[^,]+"
Expand All @@ -56,9 +56,9 @@ class ProcessTagsForkedTest extends DDSpecification {
System.clearProperty("jboss.server.name")
where:
jbossHome | mode | serverName | expected
"/opt/jboss/myserver" | "[Standalone]" | "standalone" | "entrypoint.name:jboss-modules,entrypoint.basedir:somewhere,entrypoint.workdir:.+,jboss.home:myserver,server.name:standalone,jboss.mode:standalone"
"/opt/jboss/myserver" | "[server1:12345]" | "server1" | "entrypoint.name:jboss-modules,entrypoint.basedir:somewhere,entrypoint.workdir:.+,jboss.home:myserver,server.name:server1,jboss.mode:domain"
null | "[Standalone]" | "standalone" | "entrypoint.name:jboss-modules,entrypoint.basedir:somewhere,entrypoint.workdir:[^,]+" // don't expect jboss tags since home is missing
"/opt/jboss/myserver" | "[Standalone]" | "standalone" | "entrypoint.basedir:somewhere,entrypoint.name:jboss-modules,entrypoint.workdir:.+,jboss.home:myserver,jboss.mode:standalone,server.name:standalone"
"/opt/jboss/myserver" | "[server1:12345]" | "server1" | "entrypoint.basedir:somewhere,entrypoint.name:jboss-modules,entrypoint.workdir:.+,jboss.home:myserver,jboss.mode:domain,server.name:server1"
null | "[Standalone]" | "standalone" | "entrypoint.basedir:somewhere,entrypoint.name:jboss-modules,entrypoint.workdir:[^,]+" // don't expect jboss tags since home is missing
}

def 'should not calculate process tags by default'() {
Expand All @@ -72,7 +72,8 @@ class ProcessTagsForkedTest extends DDSpecification {
ProcessTags.addTag("test", "value")
then:
assert ProcessTags.tagsForSerialization == null
assert ProcessTags.tagsAsList == null
assert ProcessTags.tagsAsStringList == null
assert ProcessTags.tagsAsUTF8ByteStringList == null
}

def 'should lazily recalculate when a tag is added'() {
Expand All @@ -81,18 +82,24 @@ class ProcessTagsForkedTest extends DDSpecification {
ProcessTags.reset()
when:
def processTags = ProcessTags.tagsForSerialization
def tagsAsList = ProcessTags.tagsAsList
def tagsAsList = ProcessTags.tagsAsStringList
def tagsAsUtf8List = ProcessTags.tagsAsUTF8ByteStringList
then:
assert ProcessTags.enabled
assert processTags != null
assert tagsAsList != null
assert tagsAsList.size() > 0
assert tagsAsUtf8List != null
assert tagsAsUtf8List.size() == tagsAsList.size()
when:
ProcessTags.addTag("test", "value")
// add it as first pos since 0 < any other a-z
ProcessTags.addTag("0test", "value")
then:
assert ProcessTags.tagsForSerialization.toString() == "$processTags,test:value"
def size = ProcessTags.tagsAsList.size()
assert ProcessTags.tagsForSerialization.toString() == "0test:value,$processTags"
def size = ProcessTags.tagsAsStringList.size()
assert size == tagsAsList.size() + 1
assert ProcessTags.tagsAsList[size - 1] == "test:value"
assert size == ProcessTags.tagsAsUTF8ByteStringList.size()
assert ProcessTags.tagsAsStringList[0] == "0test:value"
assert ProcessTags.tagsAsUTF8ByteStringList[0].toString() == "0test:value"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static RemoteConfigRequest newRequest(
serviceEnv,
serviceVersion,
tags,
ProcessTags.getTagsAsList());
ProcessTags.getTagsAsStringList());

ClientInfo clientInfo =
new RemoteConfigRequest.ClientInfo(
Expand Down
Loading