Skip to content

Commit

Permalink
Re-instated back-pressure handling for IntervalMetricReader and Batch…
Browse files Browse the repository at this point in the history
…SpanProcessor

This has been done so that we avoid overwhelming exporters.
  • Loading branch information
huntc committed Aug 3, 2020
1 parent 4e247e6 commit 6ed3e68
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.Immutable;

/**
* Wraps a list of {@link MetricProducer}s and automatically reads and exports the metrics every
* export interval.
* export interval. Metrics may also be dropped when it becomes time to export again, and there is
* an export in progress.
*
* <p>Configuration options for {@link IntervalMetricReader} can be read from system properties,
* environment variables, or {@link java.util.Properties} objects.
Expand Down Expand Up @@ -200,6 +202,7 @@ private IntervalMetricReader(InternalState internalState) {
private static final class Exporter implements Runnable {

private final InternalState internalState;
private final AtomicBoolean exportAvailable = new AtomicBoolean(true);

private Exporter(InternalState internalState) {
this.internalState = internalState;
Expand All @@ -208,24 +211,29 @@ private Exporter(InternalState internalState) {
@Override
@SuppressWarnings("BooleanParameter")
public void run() {
try {
List<MetricData> metricsList = new ArrayList<>();
for (MetricProducer metricProducer : internalState.getMetricProducers()) {
metricsList.addAll(metricProducer.collectAllMetrics());
}
final CompletableResultCode result =
internalState.getMetricExporter().export(Collections.unmodifiableList(metricsList));
result.whenComplete(
new Runnable() {
@Override
public void run() {
if (!result.isSuccess()) {
logger.log(Level.FINE, "Exporter failed");
if (exportAvailable.compareAndSet(true, false)) {
try {
List<MetricData> metricsList = new ArrayList<>();
for (MetricProducer metricProducer : internalState.getMetricProducers()) {
metricsList.addAll(metricProducer.collectAllMetrics());
}
final CompletableResultCode result =
internalState.getMetricExporter().export(Collections.unmodifiableList(metricsList));
result.whenComplete(
new Runnable() {
@Override
public void run() {
if (!result.isSuccess()) {
logger.log(Level.FINE, "Exporter failed");
}
exportAvailable.set(true);
}
}
});
} catch (Exception e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
});
} catch (Exception e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
}
} else {
logger.log(Level.FINE, "Exporter busy. Dropping metrics.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@
public interface MetricExporter {

/**
* Exports the collection of given {@link MetricData}. Note that many export operations can be
* called for simultaneously.
* Exports the collection of given {@link MetricData}. Note that export operations can be
* performed simultaneously depending on the type of metric reader being used. However, the {@link
* IntervalMetricReader} will ensure that only one export can occur at a time.
*
* @param metrics the collection of {@link MetricData} to be exported.
* @return the result of the export, which is often an asynchronous operation.
*/
CompletableResultCode export(Collection<MetricData> metrics);

/**
* Exports the collection of {@link MetricData} that have not yet been exported. Note that many
* flush operations can be called for simultaneously.
* Exports the collection of {@link MetricData} that have not yet been exported. Note that flush
* operations can be performed simultaneously depending on the type of metric reader being used.
* However, the {@link IntervalMetricReader} will ensure that only one export can occur at a time.
*
* @return the result of the flush, which is often an asynchronous operation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
Expand All @@ -46,7 +47,8 @@
* <p>All spans reported by the SDK implementation are first added to a synchronized queue (with a
* {@code maxQueueSize} maximum size, after the size is reached spans are dropped) and exported
* every {@code scheduleDelayMillis} to the exporter pipeline in batches of {@code
* maxExportBatchSize}.
* maxExportBatchSize}. Spans may also be dropped when it becomes time to export again, and there is
* an export in progress.
*
* <p>If the queue gets half full a preemptive notification is sent to the worker thread that
* exports the spans to wake up and start a new export cycle.
Expand Down Expand Up @@ -172,6 +174,7 @@ private static final class Worker implements Runnable {
private final int halfMaxQueueSize;
private final Object monitor = new Object();
private final int exporterTimeoutMillis;
private final AtomicBoolean exportAvailable = new AtomicBoolean(true);

@GuardedBy("monitor")
private final List<ReadableSpan> spansList;
Expand Down Expand Up @@ -275,27 +278,32 @@ private static List<SpanData> createSpanDataForExport(
// Exports the list of SpanData to the SpanExporter.
@SuppressWarnings("BooleanParameter")
private void onBatchExport(final List<SpanData> spans) {
try {
final CompletableResultCode result = spanExporter.export(spans);
result.whenComplete(
new Runnable() {
@Override
public void run() {
if (!result.isSuccess()) {
logger.log(Level.FINE, "Exporter failed");
if (exportAvailable.compareAndSet(true, false)) {
try {
final CompletableResultCode result = spanExporter.export(spans);
result.whenComplete(
new Runnable() {
@Override
public void run() {
if (!result.isSuccess()) {
logger.log(Level.FINE, "Exporter failed");
}
exportAvailable.set(true);
}
}
});
timer.schedule(
new TimerTask() {
@Override
public void run() {
result.fail();
}
},
exporterTimeoutMillis);
} catch (Exception e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
});
timer.schedule(
new TimerTask() {
@Override
public void run() {
result.fail();
}
},
exporterTimeoutMillis);
} catch (Exception e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
}
} else {
logger.log(Level.FINE, "Exporter busy. Dropping spans.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@

/**
* An implementation of the {@link SpanProcessor} that converts the {@link ReadableSpan} to {@link
* SpanData} and passes it to the configured exporter.
* SpanData} and passes it directly to the configured exporter. This processor should only be used
* where the exporter(s) are able to handle multiple exports simultaneously, as there is no back
* pressure consideration here.
*
* <p>Configuration options for {@link SimpleSpanProcessor} can be read from system properties,
* environment variables, or {@link java.util.Properties} objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,20 @@
public interface SpanExporter {

/**
* Called to export sampled {@code Span}s. Note that many export operations can be called for
* simultaneously.
* Called to export sampled {@code Span}s. Note that export operations can be performed
* simultaneously depending on the type of span processor being used. However, the {@link
* BatchSpanProcessor} will ensure that only one export can occur at a time.
*
* @param spans the collection of sampled Spans to be exported.
* @return the result of the export, which is often an asynchronous operation.
*/
CompletableResultCode export(Collection<SpanData> spans);

/**
* Exports the collection of sampled {@code Span}s that have not yet been exported. Note that many
* flush operations can be called for simultaneously.
* Exports the collection of sampled {@code Span}s that have not yet been exported. Note that
* export operations can be performed simultaneously depending on the type of span processor being
* used. However, the {@link BatchSpanProcessor} will ensure that only one export can occur at a
* time.
*
* @return the result of the flush, which is often an asynchronous operation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ public void intervalExport_exporterThrowsException() {
try {
assertThat(waitingMetricExporter.waitForNumberOfExports(1))
.containsExactly(Collections.singletonList(METRIC_DATA));

assertThat(waitingMetricExporter.waitForNumberOfExports(2))
.containsExactly(
Collections.singletonList(METRIC_DATA), Collections.singletonList(METRIC_DATA));
} finally {
intervalMetricReader.shutdown();
}
Expand Down

0 comments on commit 6ed3e68

Please sign in to comment.