From 3671d2b84db74eaa22212b8534a92524ddd7c3d8 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 26 Aug 2020 13:06:06 +0900 Subject: [PATCH] Add helpers to CompletableResultCode to wait for a result. (#1583) * Add helpers to CompletableResultCode to wait for a result. * Flaky? * Cleanup --- sdk/common/build.gradle | 3 +- .../common/export/CompletableResultCode.java | 38 +++++++++++++++ .../export/CompletableResultCodeTest.java | 47 +++++++++++++++++++ 3 files changed, 87 insertions(+), 1 deletion(-) diff --git a/sdk/common/build.gradle b/sdk/common/build.gradle index 388ed73b30b..9a7c929e85d 100644 --- a/sdk/common/build.gradle +++ b/sdk/common/build.gradle @@ -20,7 +20,8 @@ dependencies { testCompileOnly libraries.auto_value_annotation testImplementation project(':opentelemetry-testing-internal') - testImplementation libraries.junit_pioneer + testImplementation libraries.junit_pioneer, + libraries.awaitility signature "org.codehaus.mojo.signature:java17:1.0@signature" signature "net.sf.androidscents.signature:android-api-level-24:7.0_r2@signature" diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java index c94d76c60c8..f516566c228 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java @@ -18,6 +18,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -107,4 +109,40 @@ public CompletableResultCode whenComplete(Runnable action) { } return this; } + + /** Returns whether this {@link CompletableResultCode} has completed. */ + public boolean isDone() { + synchronized (lock) { + return succeeded != null; + } + } + + /** + * Waits for the specified amount of time for this {@link CompletableResultCode} to complete. If + * it times out or is interrupted, the {@link CompletableResultCode} is failed. + * + * @return this {@link CompletableResultCode} + */ + public CompletableResultCode join(long timeout, TimeUnit unit) { + if (isDone()) { + return this; + } + final CountDownLatch latch = new CountDownLatch(1); + whenComplete( + new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + try { + if (!latch.await(timeout, unit)) { + fail(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail(); + } + return this; + } } diff --git a/sdk/common/src/test/java/io/opentelemetry/sdk/common/export/CompletableResultCodeTest.java b/sdk/common/src/test/java/io/opentelemetry/sdk/common/export/CompletableResultCodeTest.java index 2b1f2a44ce5..101c6527bb9 100644 --- a/sdk/common/src/test/java/io/opentelemetry/sdk/common/export/CompletableResultCodeTest.java +++ b/sdk/common/src/test/java/io/opentelemetry/sdk/common/export/CompletableResultCodeTest.java @@ -17,7 +17,9 @@ package io.opentelemetry.sdk.common.export; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -189,4 +191,49 @@ public void run() { assertThat(resultCode.isSuccess()).isTrue(); } + + @Test + void isDone() { + CompletableResultCode result = new CompletableResultCode(); + assertThat(result.isDone()).isFalse(); + result.fail(); + assertThat(result.isDone()).isTrue(); + } + + @Test + void join() { + CompletableResultCode result = new CompletableResultCode(); + new Thread( + () -> { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + result.succeed(); + }) + .start(); + assertThat(result.join(500, TimeUnit.MILLISECONDS).isSuccess()).isTrue(); + // Already completed, synchronous call. + assertThat(result.join(0, TimeUnit.NANOSECONDS).isSuccess()).isTrue(); + } + + @Test + void joinTimesOut() { + CompletableResultCode result = new CompletableResultCode(); + assertThat(result.join(1, TimeUnit.MILLISECONDS).isSuccess()).isFalse(); + assertThat(result.isDone()).isTrue(); + } + + @Test + void joinInterrupted() { + CompletableResultCode result = new CompletableResultCode(); + Thread thread = + new Thread( + () -> { + result.join(10, TimeUnit.SECONDS); + }); + thread.start(); + thread.interrupt(); + assertThat(thread.isInterrupted()).isTrue(); + // Different thread so wait a bit for result to be propagated. + await().untilAsserted(() -> assertThat(result.isDone()).isTrue()); + assertThat(result.isSuccess()).isFalse(); + } }