Skip to content

Commit

Permalink
Makes Export methods async
Browse files Browse the repository at this point in the history
This commit recognises that the export and flush methods of span and trace exporters can be, and often are, implemented with long-lived operations over networks. We therefore see that these method return types are represented using Java's Future type to account for this common behaviour.
  • Loading branch information
huntc committed Jul 17, 2020
1 parent df14623 commit 3e662fd
Show file tree
Hide file tree
Showing 36 changed files with 341 additions and 252 deletions.
2 changes: 2 additions & 0 deletions exporters/inmemory/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ ext.moduleName = "io.opentelemetry.exporters.inmemory"
dependencies {
api project(':opentelemetry-sdk')

implementation libraries.guava

annotationProcessor libraries.auto_value

signature "org.codehaus.mojo.signature:java17:1.0@signature"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package io.opentelemetry.exporters.inmemory;

import com.google.common.util.concurrent.Futures;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

/**
Expand Down Expand Up @@ -103,12 +105,12 @@ public void reset() {
* <p>If this is called after {@code shutdown}, this will return {@code ResultCode.FAILURE}.
*/
@Override
public ResultCode export(Collection<MetricData> metrics) {
public Future<ResultCode> export(Collection<MetricData> metrics) {
if (isStopped) {
return ResultCode.FAILURE;
return Futures.immediateFuture(ResultCode.FAILURE);
}
finishedMetricItems.addAll(metrics);
return ResultCode.SUCCESS;
return Futures.immediateFuture(ResultCode.SUCCESS);
}

/**
Expand All @@ -118,8 +120,8 @@ public ResultCode export(Collection<MetricData> metrics) {
* @return always Success
*/
@Override
public ResultCode flush() {
return ResultCode.SUCCESS;
public Future<ResultCode> flush() {
return Futures.immediateFuture(ResultCode.SUCCESS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package io.opentelemetry.exporters.inmemory;

import com.google.common.util.concurrent.Futures;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;

/**
* A {@link SpanExporter} implementation that can be used to test OpenTelemetry integration.
Expand Down Expand Up @@ -85,14 +87,14 @@ public void reset() {
}

@Override
public ResultCode export(Collection<SpanData> spans) {
public Future<ResultCode> export(Collection<SpanData> spans) {
synchronized (this) {
if (isStopped) {
return ResultCode.FAILURE;
return Futures.immediateFuture(ResultCode.FAILURE);
}
finishedSpanItems.addAll(spans);
}
return ResultCode.SUCCESS;
return Futures.immediateFuture(ResultCode.SUCCESS);
}

/**
Expand All @@ -102,8 +104,8 @@ public ResultCode export(Collection<SpanData> spans) {
* @return always Success
*/
@Override
public ResultCode flush() {
return ResultCode.SUCCESS;
public Future<ResultCode> flush() {
return Futures.immediateFuture(ResultCode.SUCCESS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -51,26 +52,26 @@ private static MetricData generateFakeMetric() {
}

@Test
public void test_getFinishedMetricItems() {
public void test_getFinishedMetricItems() throws ExecutionException, InterruptedException {
List<MetricData> metrics = new ArrayList<MetricData>();
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());

assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.export(metrics).get()).isEqualTo(ResultCode.SUCCESS);
List<MetricData> metricItems = exporter.getFinishedMetricItems();
assertThat(metricItems).isNotNull();
assertThat(metricItems.size()).isEqualTo(3);
}

@Test
public void test_reset() {
public void test_reset() throws ExecutionException, InterruptedException {
List<MetricData> metrics = new ArrayList<MetricData>();
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());

assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.export(metrics).get()).isEqualTo(ResultCode.SUCCESS);
List<MetricData> metricItems = exporter.getFinishedMetricItems();
assertThat(metricItems).isNotNull();
assertThat(metricItems.size()).isEqualTo(3);
Expand All @@ -81,33 +82,33 @@ public void test_reset() {
}

@Test
public void test_shutdown() {
public void test_shutdown() throws ExecutionException, InterruptedException {
List<MetricData> metrics = new ArrayList<MetricData>();
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());

assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.export(metrics).get()).isEqualTo(ResultCode.SUCCESS);
exporter.shutdown();
List<MetricData> metricItems = exporter.getFinishedMetricItems();
assertThat(metricItems).isNotNull();
assertThat(metricItems.size()).isEqualTo(0);
}

@Test
public void testShutdown_export() {
public void testShutdown_export() throws ExecutionException, InterruptedException {
List<MetricData> metrics = new ArrayList<MetricData>();
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());
metrics.add(generateFakeMetric());

assertThat(exporter.export(metrics)).isEqualTo(ResultCode.SUCCESS);
assertThat(exporter.export(metrics).get()).isEqualTo(ResultCode.SUCCESS);
exporter.shutdown();
assertThat(exporter.export(metrics)).isEqualTo(ResultCode.FAILURE);
assertThat(exporter.export(metrics).get()).isEqualTo(ResultCode.FAILURE);
}

@Test
public void test_flush() {
assertThat(exporter.flush()).isEqualTo(ResultCode.SUCCESS);
public void test_flush() throws ExecutionException, InterruptedException {
assertThat(exporter.flush().get()).isEqualTo(ResultCode.SUCCESS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.opentelemetry.trace.Tracer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -91,16 +92,16 @@ public void shutdown() {
}

@Test
public void export_ReturnCode() {
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())))
public void export_ReturnCode() throws ExecutionException, InterruptedException {
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())).get())
.isEqualTo(ResultCode.SUCCESS);
exporter.shutdown();
// After shutdown no more export.
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())))
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())).get())
.isEqualTo(ResultCode.FAILURE);
exporter.reset();
// Reset does not do anything if already shutdown.
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())))
assertThat(exporter.export(Collections.singletonList(makeBasicSpan())).get())
.isEqualTo(ResultCode.FAILURE);
}

Expand Down
2 changes: 2 additions & 0 deletions exporters/jaeger/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ ext.moduleName = "io.opentelemetry.exporters.jaeger"
dependencies {
api project(':opentelemetry-sdk')

implementation libraries.guava

implementation project(':opentelemetry-sdk-extension-otproto'),
project(':opentelemetry-sdk'),
libraries.grpc_api,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.opentelemetry.exporters.jaeger;

import com.google.common.util.concurrent.Futures;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.exporters.jaeger.proto.api_v2.Collector;
Expand All @@ -28,6 +29,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -107,7 +109,7 @@ private JaegerGrpcSpanExporter(String serviceName, ManagedChannel channel, long
* @return the result of the operation
*/
@Override
public ResultCode export(Collection<SpanData> spans) {
public Future<ResultCode> export(Collection<SpanData> spans) {
Collector.PostSpansRequest request =
Collector.PostSpansRequest.newBuilder()
.setBatch(
Expand All @@ -126,9 +128,9 @@ public ResultCode export(Collection<SpanData> spans) {
// for now, there's nothing to check in the response object
//noinspection ResultOfMethodCallIgnored
stub.postSpans(request);
return ResultCode.SUCCESS;
return Futures.immediateFuture(ResultCode.SUCCESS);
} catch (Throwable e) {
return ResultCode.FAILURE;
return Futures.immediateFuture(ResultCode.FAILURE);
}
}

Expand All @@ -138,8 +140,8 @@ public ResultCode export(Collection<SpanData> spans) {
* @return always Success
*/
@Override
public ResultCode flush() {
return ResultCode.SUCCESS;
public Future<ResultCode> flush() {
return Futures.immediateFuture(ResultCode.SUCCESS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testExport() throws Exception {
// test
JaegerGrpcSpanExporter exporter =
JaegerGrpcSpanExporter.newBuilder().setServiceName("test").setChannel(channel).build();
exporter.export(Collections.singletonList(span));
exporter.export(Collections.singletonList(span)).get();

// verify
verify(service).postSpans(requestCaptor.capture(), ArgumentMatchers.any());
Expand Down
2 changes: 2 additions & 0 deletions exporters/logging/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ ext.moduleName = "io.opentelemetry.exporters.logging"
dependencies {
api project(':opentelemetry-sdk')

implementation libraries.guava

signature "org.codehaus.mojo.signature:java17:1.0@signature"
signature "net.sf.androidscents.signature:android-api-level-24:7.0_r2@signature"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package io.opentelemetry.exporters.logging;

import com.google.common.util.concurrent.Futures;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -27,12 +30,12 @@ public class LoggingMetricExporter implements MetricExporter {
private static final Logger logger = Logger.getLogger(LoggingMetricExporter.class.getName());

@Override
public ResultCode export(Collection<MetricData> metrics) {
public Future<ResultCode> export(Collection<MetricData> metrics) {
logger.info("Received a collection of " + metrics.size() + " metrics for export.");
for (MetricData metricData : metrics) {
logger.log(Level.INFO, "metric: {0}", metricData);
}
return ResultCode.SUCCESS;
return Futures.immediateFuture(ResultCode.SUCCESS);
}

/**
Expand All @@ -41,7 +44,7 @@ public ResultCode export(Collection<MetricData> metrics) {
* @return the result of the operation
*/
@Override
public ResultCode flush() {
public Future<ResultCode> flush() {
ResultCode resultCode = ResultCode.SUCCESS;
for (Handler handler : logger.getHandlers()) {
try {
Expand All @@ -50,12 +53,16 @@ public ResultCode flush() {
resultCode = ResultCode.FAILURE;
}
}
return resultCode;
return Futures.immediateFuture(resultCode);
}

@Override
public void shutdown() {
// no-op
this.flush();
try {
this.flush().get();
} catch (InterruptedException | ExecutionException e) {
logger.log(Level.WARNING, "Metric Exporter threw an Exception", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package io.opentelemetry.exporters.logging;

import com.google.common.util.concurrent.Futures;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -28,11 +31,11 @@ public class LoggingSpanExporter implements SpanExporter {
private static final Logger logger = Logger.getLogger(LoggingSpanExporter.class.getName());

@Override
public ResultCode export(Collection<SpanData> spans) {
public Future<ResultCode> export(Collection<SpanData> spans) {
for (SpanData span : spans) {
logger.log(Level.INFO, "span: {0}", span);
}
return ResultCode.SUCCESS;
return Futures.immediateFuture(ResultCode.SUCCESS);
}

/**
Expand All @@ -41,7 +44,7 @@ public ResultCode export(Collection<SpanData> spans) {
* @return the result of the operation
*/
@Override
public ResultCode flush() {
public Future<ResultCode> flush() {
ResultCode resultCode = ResultCode.SUCCESS;
for (Handler handler : logger.getHandlers()) {
try {
Expand All @@ -50,11 +53,15 @@ public ResultCode flush() {
resultCode = ResultCode.FAILURE;
}
}
return resultCode;
return Futures.immediateFuture(resultCode);
}

@Override
public void shutdown() {
this.flush();
try {
this.flush().get();
} catch (InterruptedException | ExecutionException e) {
logger.log(Level.WARNING, "Span Exporter threw an Exception", e);
}
}
}
Loading

0 comments on commit 3e662fd

Please sign in to comment.