Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout fix in OTLP exporters #1585

Merged
merged 3 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public final class OtlpGrpcMetricExporter implements MetricExporter {

private final MetricsServiceFutureStub metricsService;
private final ManagedChannel managedChannel;
private final long deadlineMs;

/**
* Creates a new OTLP gRPC Metric Reporter with the given name, using the given channel.
Expand All @@ -83,11 +84,8 @@ public final class OtlpGrpcMetricExporter implements MetricExporter {
*/
private OtlpGrpcMetricExporter(ManagedChannel channel, long deadlineMs) {
this.managedChannel = channel;
MetricsServiceFutureStub stub = MetricsServiceGrpc.newFutureStub(channel);
if (deadlineMs > 0) {
stub = stub.withDeadlineAfter(deadlineMs, TimeUnit.MILLISECONDS);
}
metricsService = stub;
this.deadlineMs = deadlineMs;
metricsService = MetricsServiceGrpc.newFutureStub(channel);
}

/**
Expand All @@ -104,8 +102,15 @@ public CompletableResultCode export(Collection<MetricData> metrics) {
.build();

final CompletableResultCode result = new CompletableResultCode();
MetricsServiceFutureStub exporter;
if (deadlineMs > 0) {
exporter = metricsService.withDeadlineAfter(deadlineMs, TimeUnit.MILLISECONDS);
} else {
exporter = metricsService;
}

Futures.addCallback(
metricsService.export(exportMetricsServiceRequest),
exporter.export(exportMetricsServiceRequest),
new FutureCallback<ExportMetricsServiceResponse>() {
@Override
public void onSuccess(@Nullable ExportMetricsServiceResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {

private final TraceServiceFutureStub traceService;
private final ManagedChannel managedChannel;
private final long deadlineMs;

/**
* Creates a new OTLP gRPC Span Reporter with the given name, using the given channel.
Expand All @@ -91,11 +92,8 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
*/
private OtlpGrpcSpanExporter(ManagedChannel channel, long deadlineMs) {
this.managedChannel = channel;
TraceServiceFutureStub stub = TraceServiceGrpc.newFutureStub(channel);
if (deadlineMs > 0) {
stub = stub.withDeadlineAfter(deadlineMs, TimeUnit.MILLISECONDS);
}
this.traceService = stub;
this.deadlineMs = deadlineMs;
this.traceService = TraceServiceGrpc.newFutureStub(channel);
}

/**
Expand All @@ -112,8 +110,16 @@ public CompletableResultCode export(Collection<SpanData> spans) {
.build();

final CompletableResultCode result = new CompletableResultCode();

TraceServiceFutureStub exporter;
if (deadlineMs > 0) {
exporter = traceService.withDeadlineAfter(deadlineMs, TimeUnit.MILLISECONDS);
} else {
exporter = traceService;
}

Futures.addCallback(
traceService.export(exportTraceServiceRequest),
exporter.export(exportTraceServiceRequest),
new FutureCallback<ExportTraceServiceResponse>() {
@Override
public void onSuccess(@Nullable ExportTraceServiceResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ void testExport_MultipleMetrics() {
}
}

@Test
void testExport_DeadlineSetPerExport() throws InterruptedException {
int deadlineMs = 500;
OtlpGrpcMetricExporter exporter =
OtlpGrpcMetricExporter.newBuilder()
.setChannel(inProcessChannel)
.setDeadlineMs(deadlineMs)
.build();

try {
TimeUnit.MILLISECONDS.sleep(2 * deadlineMs);
assertThat(exporter.export(Collections.singletonList(generateFakeMetric())).isSuccess())
.isTrue();

TimeUnit.MILLISECONDS.sleep(2 * deadlineMs);
assertThat(exporter.export(Collections.singletonList(generateFakeMetric())).isSuccess())
.isTrue();
} finally {
exporter.shutdown();
}
}

@Test
void testExport_AfterShutdown() {
MetricData span = generateFakeMetric();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,28 @@ void testExport_MultipleSpans() {
}
}

@Test
void testExport_DeadlineSetPerExport() throws InterruptedException {
int deadlineMs = 500;
OtlpGrpcSpanExporter exporter =
OtlpGrpcSpanExporter.newBuilder()
.setChannel(inProcessChannel)
.setDeadlineMs(deadlineMs)
.build();

try {
TimeUnit.MILLISECONDS.sleep(2 * deadlineMs);
assertThat(exporter.export(Collections.singletonList(generateFakeSpan())).isSuccess())
.isTrue();

TimeUnit.MILLISECONDS.sleep(2 * deadlineMs);
assertThat(exporter.export(Collections.singletonList(generateFakeSpan())).isSuccess())
.isTrue();
} finally {
exporter.shutdown();
}
}

@Test
void testExport_AfterShutdown() {
SpanData span = generateFakeSpan();
Expand Down