Skip to content

Commit

Permalink
Re-instated batch span processor cancellation
Browse files Browse the repository at this point in the history
Cancellation now provides the exporter with the opportunity to handle it. An exporter could, for example, cancel activity in flight in the way it knows how.
  • Loading branch information
huntc committed Aug 3, 2020
1 parent d3e3d8d commit b7cfcab
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public CompletableResultCode() {}
private Boolean succeeded = null;

@GuardedBy("lock")
private final ArrayList<Runnable> actions = new ArrayList<>();
private final ArrayList<Runnable> completionActions = new ArrayList<>();

private final Object lock = new Object();

Expand All @@ -41,7 +41,7 @@ public CompletableResultCode succeed() {
synchronized (lock) {
if (succeeded == null) {
succeeded = true;
for (Runnable action : actions) {
for (Runnable action : completionActions) {
action.run();
}
}
Expand All @@ -54,7 +54,7 @@ public CompletableResultCode fail() {
synchronized (lock) {
if (succeeded == null) {
succeeded = false;
for (Runnable action : actions) {
for (Runnable action : completionActions) {
action.run();
}
}
Expand All @@ -77,17 +77,15 @@ public boolean isSuccess() {
/**
* Perform an action on completion. Actions are guaranteed to be called only once.
*
* <p>There should only be one action for this class instance.
*
* @param action the action to perform
* @return this completable result so that it may be further composed
*/
public CompletableResultCode thenRun(Runnable action) {
public CompletableResultCode whenComplete(Runnable action) {
synchronized (lock) {
if (succeeded != null) {
action.run();
} else {
this.actions.add(action);
this.completionActions.add(action);
}
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void run() {
}
final CompletableResultCode result =
internalState.getMetricExporter().export(Collections.unmodifiableList(metricsList));
result.thenRun(
result.whenComplete(
new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -81,6 +83,8 @@ public final class BatchSpanProcessor implements SpanProcessor {

private static final String WORKER_THREAD_NAME =
BatchSpanProcessor.class.getSimpleName() + "_WorkerThread";
private static final String TIMER_THREAD_NAME =
BatchSpanProcessor.class.getSimpleName() + "_TimerThread";
private final Worker worker;
private final Thread workerThread;
private final boolean sampled;
Expand All @@ -90,8 +94,15 @@ private BatchSpanProcessor(
boolean sampled,
long scheduleDelayMillis,
int maxQueueSize,
int maxExportBatchSize) {
this.worker = new Worker(spanExporter, scheduleDelayMillis, maxQueueSize, maxExportBatchSize);
int maxExportBatchSize,
int exporterTimeoutMillis) {
this.worker =
new Worker(
spanExporter,
scheduleDelayMillis,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutMillis);
this.workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
this.workerThread.start();
this.sampled = sampled;
Expand Down Expand Up @@ -152,13 +163,16 @@ private static final class Worker implements Runnable {

private static final BoundLongCounter droppedSpans;

private final Timer timer = new Timer(TIMER_THREAD_NAME);

private static final Logger logger = Logger.getLogger(Worker.class.getName());
private final SpanExporter spanExporter;
private final long scheduleDelayMillis;
private final int maxQueueSize;
private final int maxExportBatchSize;
private final int halfMaxQueueSize;
private final Object monitor = new Object();
private final int exporterTimeoutMillis;
private final AtomicBoolean exportAvailable = new AtomicBoolean(true);

@GuardedBy("monitor")
Expand All @@ -168,12 +182,14 @@ private Worker(
SpanExporter spanExporter,
long scheduleDelayMillis,
int maxQueueSize,
int maxExportBatchSize) {
int maxExportBatchSize,
int exporterTimeoutMillis) {
this.spanExporter = spanExporter;
this.scheduleDelayMillis = scheduleDelayMillis;
this.maxQueueSize = maxQueueSize;
this.halfMaxQueueSize = maxQueueSize >> 1;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterTimeoutMillis = exporterTimeoutMillis;
this.spansList = new ArrayList<>(maxQueueSize);
}

Expand Down Expand Up @@ -224,6 +240,7 @@ public void run() {

private void shutdown() {
forceFlush();
timer.cancel();
spanExporter.shutdown();
}

Expand Down Expand Up @@ -263,7 +280,7 @@ private void onBatchExport(final List<SpanData> spans) {
if (exportAvailable.compareAndSet(true, false)) {
try {
final CompletableResultCode result = spanExporter.export(spans);
result.thenRun(
result.whenComplete(
new Runnable() {
@Override
public void run() {
Expand All @@ -273,6 +290,14 @@ public void run() {
exportAvailable.set(true);
}
});
timer.schedule(
new TimerTask() {
@Override
public void run() {
result.fail();
}
},
exporterTimeoutMillis);
} catch (Exception e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
}
Expand Down Expand Up @@ -464,12 +489,13 @@ int getMaxExportBatchSize() {
* @throws NullPointerException if the {@code spanExporter} is {@code null}.
*/
public BatchSpanProcessor build() {
/*
* Note that setting an export timeout has no effect - there's no sure way to cancel a
* thread of execution, even by asking an export to cancel any associated threads.
*/
return new BatchSpanProcessor(
spanExporter, exportOnlySampled, scheduleDelayMillis, maxQueueSize, maxExportBatchSize);
spanExporter,
exportOnlySampled,
scheduleDelayMillis,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutMillis);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ private static void mergeResultCode(
final CompletableResultCode compositeResultCode,
final CompletableResultCode singleResultCode,
final CountDownLatch completionsToProcess) {
singleResultCode.thenRun(
singleResultCode.whenComplete(
new Runnable() {
@Override
public void run() {
compositeResultCode.thenRun(
compositeResultCode.whenComplete(
new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void onEnd(ReadableSpan span) {
try {
List<SpanData> spans = Collections.singletonList(span.toSpanData());
final CompletableResultCode result = spanExporter.export(spans);
result.thenRun(
result.whenComplete(
new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void configTest() {
options.put("otel.bsp.export.timeout", "78");
options.put("otel.bsp.export.sampled", "false");
BatchSpanProcessor.Builder config =
BatchSpanProcessor.newBuilder(new WaitingSpanExporter(0))
BatchSpanProcessor.newBuilder(new WaitingSpanExporter(0, CompletableResultCode.ofSuccess()))
.fromConfigMap(options, ConfigTester.getNamingDot());
assertThat(config.getScheduleDelayMillis()).isEqualTo(12);
assertThat(config.getMaxQueueSize()).isEqualTo(34);
Expand All @@ -114,7 +114,7 @@ void configTest() {
@Test
void configTest_EmptyOptions() {
BatchSpanProcessor.Builder config =
BatchSpanProcessor.newBuilder(new WaitingSpanExporter(0))
BatchSpanProcessor.newBuilder(new WaitingSpanExporter(0, CompletableResultCode.ofSuccess()))
.fromConfigMap(Collections.emptyMap(), ConfigTester.getNamingDot());
assertThat(config.getScheduleDelayMillis())
.isEqualTo(BatchSpanProcessor.Builder.DEFAULT_SCHEDULE_DELAY_MILLIS);
Expand All @@ -131,14 +131,16 @@ void configTest_EmptyOptions() {
@Test
void startEndRequirements() {
BatchSpanProcessor spansProcessor =
BatchSpanProcessor.newBuilder(new WaitingSpanExporter(0)).build();
BatchSpanProcessor.newBuilder(new WaitingSpanExporter(0, CompletableResultCode.ofSuccess()))
.build();
assertThat(spansProcessor.isStartRequired()).isFalse();
assertThat(spansProcessor.isEndRequired()).isTrue();
}

@Test
void exportDifferentSampledSpans() {
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(2);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(2, CompletableResultCode.ofSuccess());
tracerSdkFactory.addSpanProcessor(
BatchSpanProcessor.newBuilder(waitingSpanExporter)
.setScheduleDelayMillis(MAX_SCHEDULE_DELAY_MILLIS)
Expand All @@ -152,7 +154,8 @@ void exportDifferentSampledSpans() {

@Test
void exportMoreSpansThanTheBufferSize() {
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(6);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(6, CompletableResultCode.ofSuccess());
BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.newBuilder(waitingSpanExporter)
.setMaxQueueSize(6)
Expand Down Expand Up @@ -181,7 +184,8 @@ void exportMoreSpansThanTheBufferSize() {

@Test
void forceExport() {
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(1, 1);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1, CompletableResultCode.ofSuccess(), 1);
BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.newBuilder(waitingSpanExporter)
.setMaxQueueSize(10_000)
Expand All @@ -204,8 +208,10 @@ void forceExport() {

@Test
void exportSpansToMultipleServices() {
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(2);
WaitingSpanExporter waitingSpanExporter2 = new WaitingSpanExporter(2);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(2, CompletableResultCode.ofSuccess());
WaitingSpanExporter waitingSpanExporter2 =
new WaitingSpanExporter(2, CompletableResultCode.ofSuccess());
tracerSdkFactory.addSpanProcessor(
BatchSpanProcessor.newBuilder(
MultiSpanExporter.create(Arrays.asList(waitingSpanExporter, waitingSpanExporter2)))
Expand All @@ -223,7 +229,8 @@ void exportSpansToMultipleServices() {
@Test
void exportMoreSpansThanTheMaximumLimit() {
final int maxQueuedSpans = 8;
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(maxQueuedSpans);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(maxQueuedSpans, CompletableResultCode.ofSuccess());
BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.newBuilder(
MultiSpanExporter.create(Arrays.asList(blockingSpanExporter, waitingSpanExporter)))
Expand Down Expand Up @@ -284,7 +291,8 @@ void exportMoreSpansThanTheMaximumLimit() {

@Test
void serviceHandlerThrowsException() {
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(1);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1, CompletableResultCode.ofSuccess());
doThrow(new IllegalArgumentException("No export for you."))
.when(mockServiceHandler)
.export(ArgumentMatchers.anyList());
Expand All @@ -305,17 +313,36 @@ void serviceHandlerThrowsException() {

@Test
@Timeout(5)
public void exporterTimesOut() {
public void exporterTimesOut() throws InterruptedException {
final CountDownLatch interruptMarker = new CountDownLatch(1);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1) {
new WaitingSpanExporter(1, new CompletableResultCode()) {
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
CompletableResultCode result = super.export(spans);
try {
// sleep longer than the configured timeout of 100ms
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
Thread exporterThread =
new Thread(
new Runnable() {
@Override
public void run() {
try {
// sleep longer than the configured timeout of 100ms
Thread.sleep(1000);
} catch (InterruptedException e) {
interruptMarker.countDown();
}
}
});
exporterThread.start();
result.whenComplete(
new Runnable() {
@Override
public void run() {
if (!result.isSuccess()) {
exporterThread.interrupt();
}
}
});
return result;
}
};
Expand All @@ -333,11 +360,16 @@ public CompletableResultCode export(Collection<SpanData> spans) {
ReadableSpan span = createSampledEndedSpan(SPAN_NAME_1);
List<SpanData> exported = waitingSpanExporter.waitForExport();
assertThat(exported).containsExactly(span.toSpanData());

// since the interrupt happens outside the execution of the test method, we'll block to make
// sure that the thread was actually interrupted due to the timeout.
interruptMarker.await();
}

@Test
void exportNotSampledSpans() {
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(1);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1, CompletableResultCode.ofSuccess());
BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.newBuilder(waitingSpanExporter)
.setScheduleDelayMillis(MAX_SCHEDULE_DELAY_MILLIS)
Expand Down Expand Up @@ -393,7 +425,8 @@ void exportNotSampledSpans_reportOnlySampled() {
@Test
@Timeout(10)
public void shutdownFlushes() {
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(1);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1, CompletableResultCode.ofSuccess());
// Set the export delay to zero, for no timeout, in order to confirm the #flush() below works

tracerSdkFactory.addSpanProcessor(
Expand Down Expand Up @@ -473,17 +506,19 @@ static class WaitingSpanExporter implements SpanExporter {

private final List<SpanData> spanDataList = new ArrayList<>();
private final int numberToWaitFor;
private final CompletableResultCode exportResultCode;
private CountDownLatch countDownLatch;
private int timeout = 10;
private final AtomicBoolean shutDownCalled = new AtomicBoolean(false);

WaitingSpanExporter(int numberToWaitFor) {
WaitingSpanExporter(int numberToWaitFor, CompletableResultCode exportResultCode) {
countDownLatch = new CountDownLatch(numberToWaitFor);
this.numberToWaitFor = numberToWaitFor;
this.exportResultCode = exportResultCode;
}

WaitingSpanExporter(int numberToWaitFor, int timeout) {
this(numberToWaitFor);
WaitingSpanExporter(int numberToWaitFor, CompletableResultCode exportResultCode, int timeout) {
this(numberToWaitFor, exportResultCode);
this.timeout = timeout;
}

Expand Down Expand Up @@ -514,7 +549,7 @@ public CompletableResultCode export(Collection<SpanData> spans) {
for (int i = 0; i < spans.size(); i++) {
countDownLatch.countDown();
}
return CompletableResultCode.ofSuccess();
return exportResultCode;
}

@Override
Expand Down
Loading

0 comments on commit b7cfcab

Please sign in to comment.