Skip to content

Institute a 10 snapshot per probe per trace budget #8277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import com.datadog.debugger.sink.DebuggerSink;
import com.datadog.debugger.sink.Snapshot;
import com.datadog.debugger.util.MoshiHelper;
import com.datadog.debugger.util.WeakIdentityHashMap;
import com.squareup.moshi.Json;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import com.squareup.moshi.Types;
import datadog.trace.api.Config;
import datadog.trace.api.DDTraceId;
import datadog.trace.bootstrap.debugger.CapturedContext;
import datadog.trace.bootstrap.debugger.CorrelationAccess;
import datadog.trace.bootstrap.debugger.DebuggerContext;
Expand Down Expand Up @@ -49,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,6 +62,8 @@ public class LogProbe extends ProbeDefinition implements Sampled {
private static final Limits LIMITS = new Limits(1, 3, 8192, 5);
private static final int LOG_MSG_LIMIT = 8192;

public static final int PROBE_BUDGET = 10;

/** Stores part of a templated message either a str or an expression */
public static class Segment {
private final String str;
Expand Down Expand Up @@ -278,6 +283,8 @@ public String toString() {
private final Capture capture;
private final Sampling sampling;
private transient Consumer<Snapshot> snapshotProcessor;
protected transient Map<DDTraceId, AtomicInteger> budget =
Collections.synchronizedMap(new WeakIdentityHashMap<>());

// no-arg constructor is required by Moshi to avoid creating instance with unsafe and by-passing
// constructors, including field initializers.
Expand Down Expand Up @@ -568,10 +575,12 @@ public void commit(
CapturedContext exitContext,
List<CapturedContext.CapturedThrowable> caughtExceptions) {
Snapshot snapshot = createSnapshot();
boolean shouldCommit = fillSnapshot(entryContext, exitContext, caughtExceptions, snapshot);
boolean shouldCommit =
inBudget() && fillSnapshot(entryContext, exitContext, caughtExceptions, snapshot);
DebuggerSink sink = DebuggerAgent.getSink();
if (shouldCommit) {
commitSnapshot(snapshot, sink);
incrementBudget();
if (snapshotProcessor != null) {
snapshotProcessor.accept(snapshot);
}
Expand Down Expand Up @@ -855,6 +864,26 @@ public String toString() {
}
}

private boolean inBudget() {
AtomicInteger budgetLevel = getBudgetLevel();
return budgetLevel == null || budgetLevel.get() < PROBE_BUDGET;
}

private AtomicInteger getBudgetLevel() {
TracerAPI tracer = AgentTracer.get();
AgentSpan span = tracer != null ? tracer.activeSpan() : null;
return getDebugSessionId() == null || span == null
? null
: budget.computeIfAbsent(span.getLocalRootSpan().getTraceId(), id -> new AtomicInteger());
}

private void incrementBudget() {
AtomicInteger budgetLevel = getBudgetLevel();
if (budgetLevel != null) {
budgetLevel.incrementAndGet();
}
}

@Generated
@Override
public boolean equals(Object o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public SnapshotSink(Config config, String tags, BatchUploader snapshotUploader)
this.snapshotUploader = snapshotUploader;
}

public BlockingQueue<Snapshot> getLowRateSnapshots() {
return lowRateSnapshots;
}

public void start() {
if (started.compareAndSet(false, true)) {
highRateScheduled =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ public class LogProbeTest {

@Test
public void testCapture() {
LogProbe.Builder builder = createLog(null);
Builder builder = createLog(null);
LogProbe snapshotProbe = builder.capture(1, 420, 255, 20).build();
Assertions.assertEquals(1, snapshotProbe.getCapture().getMaxReferenceDepth());
Assertions.assertEquals(420, snapshotProbe.getCapture().getMaxCollectionSize());
Assertions.assertEquals(255, snapshotProbe.getCapture().getMaxLength());
assertEquals(1, snapshotProbe.getCapture().getMaxReferenceDepth());
assertEquals(420, snapshotProbe.getCapture().getMaxCollectionSize());
assertEquals(255, snapshotProbe.getCapture().getMaxLength());
}

@Test
public void testSampling() {
LogProbe.Builder builder = createLog(null);
Builder builder = createLog(null);
LogProbe snapshotProbe = builder.sampling(0.25).build();
Assertions.assertEquals(0.25, snapshotProbe.getSampling().getEventsPerSecond(), 0.01);
assertEquals(0.25, snapshotProbe.getSampling().getEventsPerSecond(), 0.01);
}

@Test
Expand All @@ -78,6 +78,44 @@ public void noDebugSession() {
"With no debug sessions, snapshots should get filled.");
}

@Test
public void budgets() {
DebuggerSink sink = new DebuggerSink(getConfig(), mock(ProbeStatusSink.class));
DebuggerAgentHelper.injectSink(sink);
assertEquals(0, sink.getSnapshotSink().getLowRateSnapshots().size());
TracerAPI tracer =
CoreTracer.builder().idGenerationStrategy(IdGenerationStrategy.fromName("random")).build();
AgentTracer.registerIfAbsent(tracer);
int runs = 100;
for (int i = 0; i < runs; i++) {
runTrace(tracer);
}
assertEquals(runs * LogProbe.PROBE_BUDGET, sink.getSnapshotSink().getLowRateSnapshots().size());
}

private void runTrace(TracerAPI tracer) {
AgentSpan span = tracer.startSpan("budget testing", "test span");
span.setTag(Tags.PROPAGATED_DEBUG, "12345:1");
try (AgentScope scope = tracer.activateSpan(span, ScopeSource.MANUAL)) {

LogProbe logProbe =
createLog("I'm in a debug session")
.probeId(ProbeId.newId())
.tags("session_id:12345")
.captureSnapshot(true)
.build();

CapturedContext entryContext = capturedContext(span, logProbe);
CapturedContext exitContext = capturedContext(span, logProbe);
logProbe.evaluate(entryContext, new LogStatus(logProbe), MethodLocation.ENTRY);
logProbe.evaluate(exitContext, new LogStatus(logProbe), MethodLocation.EXIT);

for (int i = 0; i < 20; i++) {
logProbe.commit(entryContext, exitContext, emptyList());
}
}
}

private boolean fillSnapshot(DebugSessionStatus status) {
DebuggerAgentHelper.injectSink(new DebuggerSink(getConfig(), mock(ProbeStatusSink.class)));
TracerAPI tracer =
Expand Down Expand Up @@ -152,12 +190,11 @@ public void fillSnapshot_shouldSend(String methodLocation) {
LogProbe logProbe = createLog(null).evaluateAt(MethodLocation.valueOf(methodLocation)).build();
CapturedContext entryContext = new CapturedContext();
CapturedContext exitContext = new CapturedContext();
LogProbe.LogStatus logEntryStatus =
prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
LogStatus logEntryStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
logEntryStatus.setSampled(true); // force sampled to avoid rate limiting executing tests!
LogProbe.LogStatus logExitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT);
LogStatus logExitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT);
logExitStatus.setSampled(true); // force sampled to avoid rate limiting executing tests!
Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10);
Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10);
assertTrue(logProbe.fillSnapshot(entryContext, exitContext, null, snapshot));
}

Expand All @@ -172,16 +209,16 @@ public void fillSnapshot(
LogProbe logProbe = createLog(null).evaluateAt(MethodLocation.EXIT).build();
CapturedContext entryContext = new CapturedContext();
CapturedContext exitContext = new CapturedContext();
LogProbe.LogStatus entryStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
LogStatus entryStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
fillStatus(entryStatus, sampled, condition, conditionErrors, logTemplateErrors);
LogProbe.LogStatus exitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT);
LogStatus exitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT);
fillStatus(exitStatus, sampled, condition, conditionErrors, logTemplateErrors);
Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10);
Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10);
assertEquals(shouldCommit, logProbe.fillSnapshot(entryContext, exitContext, null, snapshot));
}

private void fillStatus(
LogProbe.LogStatus entryStatus,
LogStatus entryStatus,
boolean sampled,
boolean condition,
boolean conditionErrors,
Expand All @@ -193,10 +230,10 @@ private void fillStatus(
entryStatus.setLogTemplateErrors(logTemplateErrors);
}

private LogProbe.LogStatus prepareContext(
private LogStatus prepareContext(
CapturedContext context, LogProbe logProbe, MethodLocation methodLocation) {
context.evaluate(PROBE_ID.getEncodedId(), logProbe, "", 0, methodLocation);
return (LogProbe.LogStatus) context.getStatus(PROBE_ID.getEncodedId());
return (LogStatus) context.getStatus(PROBE_ID.getEncodedId());
}

private static Stream<Arguments> statusValues() {
Expand All @@ -222,15 +259,15 @@ public void fillSnapshot_shouldSend_exit() {
prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
CapturedContext exitContext = new CapturedContext();
prepareContext(exitContext, logProbe, MethodLocation.EXIT);
Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10);
Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10);
assertTrue(logProbe.fillSnapshot(entryContext, exitContext, null, snapshot));
}

@Test
public void fillSnapshot_shouldSend_evalErrors() {
LogProbe logProbe = createLog(null).evaluateAt(MethodLocation.EXIT).build();
CapturedContext entryContext = new CapturedContext();
LogProbe.LogStatus logStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
LogStatus logStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
logStatus.addError(new EvaluationError("expr", "msg1"));
logStatus.setLogTemplateErrors(true);
entryContext.addThrowable(new RuntimeException("errorEntry"));
Expand All @@ -239,7 +276,7 @@ public void fillSnapshot_shouldSend_evalErrors() {
logStatus.addError(new EvaluationError("expr", "msg2"));
logStatus.setLogTemplateErrors(true);
exitContext.addThrowable(new RuntimeException("errorExit"));
Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10);
Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10);
assertTrue(logProbe.fillSnapshot(entryContext, exitContext, null, snapshot));
assertEquals(2, snapshot.getEvaluationErrors().size());
assertEquals("msg1", snapshot.getEvaluationErrors().get(0).getMessage());
Expand All @@ -250,7 +287,7 @@ public void fillSnapshot_shouldSend_evalErrors() {
"errorExit", snapshot.getCaptures().getReturn().getCapturedThrowable().getMessage());
}

private LogProbe.Builder createLog(String template) {
private Builder createLog(String template) {
return LogProbe.builder()
.language(LANGUAGE)
.probeId(PROBE_ID)
Expand Down
Loading