Skip to content

Simplified ListWriter await logic. #8992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,36 +1,29 @@
package datadog.trace.common.writer;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.trace.core.DDSpan;
import datadog.trace.core.MetadataConsumer;
import datadog.trace.core.tagprocessor.PeerServiceCalculator;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** List writer used by tests mostly */
public class ListWriter extends CopyOnWriteArrayList<List<DDSpan>> implements Writer {

private static final Logger log = LoggerFactory.getLogger(ListWriter.class);
private static final Filter ACCEPT_ALL = trace -> true;

public static final Filter ACCEPT_ALL =
new Filter() {
@Override
public boolean accept(List<DDSpan> trace) {
return true;
}
};

private final List<CountDownLatch> latches = new ArrayList<>();
private final AtomicInteger traceCount = new AtomicInteger();
private final TraceStructureWriter structureWriter = new TraceStructureWriter(true);
private final Object monitor = new Object();

private final PeerServiceCalculator peerServiceCalculator = new PeerServiceCalculator();
private Filter filter = ACCEPT_ALL;

public List<DDSpan> firstTrace() {
Expand All @@ -47,30 +40,44 @@ public void write(List<DDSpan> trace) {
// remotely realistic so the test actually test something
span.processTagsAndBaggage(MetadataConsumer.NO_OP);
}

add(trace);
structureWriter.write(trace);

traceCount.incrementAndGet();
synchronized (latches) {
add(trace);
for (final CountDownLatch latch : latches) {
if (size() >= latch.getCount()) {
while (latch.getCount() > 0) {
latch.countDown();
}
}
}
synchronized (monitor) {
monitor.notifyAll();
}
structureWriter.write(trace);
}

public boolean waitForTracesMax(final int number, int seconds)
throws InterruptedException, TimeoutException {
final CountDownLatch latch = new CountDownLatch(number);
synchronized (latches) {
if (size() >= number) {
private boolean awaitUntilDeadline(long timeout, TimeUnit unit, BooleanSupplier predicate)
throws InterruptedException {
final long deadline = System.nanoTime() + unit.toNanos(timeout);

while (true) {
if (predicate.getAsBoolean()) {
return true;
}
latches.add(latch);

long now = System.nanoTime();
long remaining = deadline - now;
if (remaining <= 0) {
break;
}

long millis = NANOSECONDS.toMillis(remaining);
long nanos = remaining - MILLISECONDS.toNanos(millis);

synchronized (monitor) {
monitor.wait(millis, (int) nanos);
}
}
return latch.await(seconds, TimeUnit.SECONDS);

return false;
}

public boolean waitForTracesMax(final int number, int seconds) throws InterruptedException {
return awaitUntilDeadline(seconds, SECONDS, () -> traceCount.get() >= number);
}

public void waitForTraces(final int number) throws InterruptedException, TimeoutException {
Expand All @@ -88,24 +95,17 @@ public void waitForTraces(final int number) throws InterruptedException, Timeout
}

public void waitUntilReported(final DDSpan span) throws InterruptedException, TimeoutException {
waitUntilReported(span, 20, TimeUnit.SECONDS);
waitUntilReported(span, 20, SECONDS);
}

public void waitUntilReported(final DDSpan span, int timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
while (true) {
final CountDownLatch latch = new CountDownLatch(size() + 1);
synchronized (latches) {
latches.add(latch);
}
if (isReported(span)) {
return;
}
if (!latch.await(timeout, unit)) {
String msg = "Timeout waiting for span to be reported: " + span;
log.warn(msg);
throw new TimeoutException(msg);
}
boolean reported = awaitUntilDeadline(timeout, unit, () -> isReported(span));

if (!reported) {
String msg = "Timeout waiting for span to be reported: " + span;
log.warn(msg);
throw new TimeoutException(msg);
}
}

Expand Down Expand Up @@ -142,17 +142,16 @@ public boolean flush() {
return true;
}

@Override
public void clear() {
super.clear();

traceCount.set(0);
}

@Override
public void close() {
clear();
synchronized (latches) {
for (final CountDownLatch latch : latches) {
while (latch.getCount() > 0) {
latch.countDown();
}
}
latches.clear();
}
}

@Override
Expand Down