diff --git a/.travis.yml b/.travis.yml index 8d424aaef8..f85a751b18 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,11 @@ sudo: required after_success: - bash <(curl -s https://codecov.io/bash) +before_cache: + - rm -f $HOME/.gradle/caches/modules-2/modules-2.lock + - rm -fr $HOME/.gradle/caches/*/plugin-resolution/ cache: directories: - $HOME/.m2 - - $HOME/.gradle \ No newline at end of file + - $HOME/.gradle/caches/ + - $HOME/.gradle/wrapper/ \ No newline at end of file diff --git a/build.gradle b/build.gradle index 9c2037b157..4bc1f0c80a 100644 --- a/build.gradle +++ b/build.gradle @@ -203,6 +203,15 @@ configure(rootProject) { sourceSets.test.resources.srcDirs = ["src/test/resources", "src/test/java"] + test { + testLogging { + events "passed", "failed" + showExceptions true + exceptionFormat "full" + maxGranularity 3 + } + } + if (!JavaVersion.current().isJava9Compatible()) { test { jvmArgs = ["-Xbootclasspath/p:" + configurations.jsr166backport.asPath] diff --git a/src/main/java/reactor/core/publisher/Mono.java b/src/main/java/reactor/core/publisher/Mono.java index 0f0b6c4887..2669b54554 100644 --- a/src/main/java/reactor/core/publisher/Mono.java +++ b/src/main/java/reactor/core/publisher/Mono.java @@ -1105,7 +1105,7 @@ public final Mono and(Mono other, BiFunction * @@ -1121,8 +1121,8 @@ public final Mono> and(Function> rightG /** * Wait for the result from this mono, use it to create a second mono via the - * provided {@param rightGenerator} function and combine both results into an arbitrary - * {@code O} object, as defined by the provided {@param combinator} function. + * provided {@code rightGenerator} function and combine both results into an arbitrary + * {@code O} object, as defined by the provided {@code combinator} function. * *

* diff --git a/src/test/java/reactor/core/publisher/MonoPublishOnTest.java b/src/test/java/reactor/core/publisher/MonoPublishOnTest.java index 38e3642ee3..90b7568a72 100644 --- a/src/test/java/reactor/core/publisher/MonoPublishOnTest.java +++ b/src/test/java/reactor/core/publisher/MonoPublishOnTest.java @@ -266,13 +266,13 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService() } @Test - public void rejectedExecutionSubsribeExecutorScheduler() { + public void rejectedExecutionSubscribeExecutorScheduler() { CountDownLatch latch = new CountDownLatch(1); ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, - new SynchronousQueue(), + new SynchronousQueue<>(), new AbortPolicy()); try { @@ -310,7 +310,7 @@ public void rejectedExecutionSubsribeExecutorServiceScheduler() { 1, 0L, MILLISECONDS, - new SynchronousQueue(), + new SynchronousQueue<>(), new AbortPolicy()); try { diff --git a/src/test/java/reactor/core/publisher/scenarios/CombinationTests.java b/src/test/java/reactor/core/publisher/scenarios/CombinationTests.java index 90ba6df480..c21223fa4b 100644 --- a/src/test/java/reactor/core/publisher/scenarios/CombinationTests.java +++ b/src/test/java/reactor/core/publisher/scenarios/CombinationTests.java @@ -125,7 +125,8 @@ public void testEmitter() throws Throwable { Scheduler c = Schedulers.single(); for (int i = 0; i < subs; i++) { processor.publishOn(c) - .subscribe(d -> latch.countDown(), null, latch::countDown, 1); + .limitRate(1) + .subscribe(d -> latch.countDown(), null, latch::countDown); } BlockingSink session = processor.connectSink(); diff --git a/src/test/java/reactor/core/publisher/scenarios/FluxTests.java b/src/test/java/reactor/core/publisher/scenarios/FluxTests.java index 88cf5ca6de..19b37f937f 100644 --- a/src/test/java/reactor/core/publisher/scenarios/FluxTests.java +++ b/src/test/java/reactor/core/publisher/scenarios/FluxTests.java @@ -1588,35 +1588,6 @@ public void testThrowWithoutOnErrorShowsUpInSchedulerHandler() { } } - @Test - public void testJvmFatalDoesntShowUpInSchedulerHandler() { - AtomicReference failure = new AtomicReference<>(null); - - Thread.setDefaultUncaughtExceptionHandler((t, e) -> failure.set("unexpected call to default" + - " UncaughtExceptionHandler with " + e)); - Schedulers.onHandleError((t, e) -> failure.set("Fatal JVM error was unexpectedly handled")); - - CountDownLatch latch = new CountDownLatch(1); - try { - Flux.intervalMillis(100) - .take(1) - .publishOn(Schedulers.parallel()) - .doOnTerminate(() -> latch.countDown()) - .subscribe(i -> { - throw new ThreadDeath(); - }); - latch.await(1, TimeUnit.SECONDS); - } catch (Throwable e) { - fail(e.toString()); - } finally { - Thread.setDefaultUncaughtExceptionHandler(null); - Schedulers.resetOnHandleError(); - } - if (failure.get() != null) { - fail(failure.get()); - } - } - @Test @Ignore public void splitBugEventuallyHappens() throws Exception { diff --git a/src/test/java/reactor/core/scheduler/SchedulersTest.java b/src/test/java/reactor/core/scheduler/SchedulersTest.java index 5fd76713e4..32acc04827 100644 --- a/src/test/java/reactor/core/scheduler/SchedulersTest.java +++ b/src/test/java/reactor/core/scheduler/SchedulersTest.java @@ -29,6 +29,8 @@ import org.junit.Test; import reactor.core.Exceptions; +import static org.junit.Assert.fail; + public class SchedulersTest { final static class TestSchedulers implements Schedulers.Factory { @@ -95,7 +97,7 @@ public void testShutdownOldOnSetFactory() { Schedulers.Factory ts1 = new Schedulers.Factory() { }; Schedulers.Factory ts2 = new TestSchedulers(false); Schedulers.setFactory(ts1); - TimedScheduler cachedTimerOld = ((Supplier) Schedulers.timer()).get(); + TimedScheduler cachedTimerOld = uncache(Schedulers.timer()); TimedScheduler standaloneTimer = Schedulers.newTimer("standaloneTimer"); Assert.assertNotEquals(cachedTimerOld, standaloneTimer); @@ -103,7 +105,7 @@ public void testShutdownOldOnSetFactory() { Assert.assertNotEquals(standaloneTimer.schedule(() -> {}), Scheduler.REJECTED); Schedulers.setFactory(ts2); - TimedScheduler cachedTimerNew = ((Supplier) Schedulers.timer()).get(); + TimedScheduler cachedTimerNew = uncache(Schedulers.timer()); Assert.assertEquals(cachedTimerNew, Schedulers.newTimer("unused")); Assert.assertNotEquals(cachedTimerNew, cachedTimerOld); @@ -115,6 +117,14 @@ public void testShutdownOldOnSetFactory() { Assert.assertNotEquals(cachedTimerNew.schedule(() -> {}), Scheduler.REJECTED); } + @SuppressWarnings("unchecked") + private T uncache(T scheduler) { + if (scheduler instanceof Supplier) { + return ((Supplier) scheduler).get(); + } + throw new IllegalArgumentException("not a cache scheduler, expected Supplier"); + } + @Test public void testUncaughtHookCalledWhenOnErrorNotImplemented() { AtomicBoolean handled = new AtomicBoolean(false); @@ -141,17 +151,32 @@ public void testUncaughtHookCalledWhenCommonException() { Assert.assertTrue("IllegalArgumentException not handled", handled.get()); } - @Test(expected = ThreadDeath.class) + @Test public void testUncaughtHookNotCalledWhenThreadDeath() { AtomicBoolean handled = new AtomicBoolean(false); - Schedulers.onHandleError((t, e) -> handled.set(true)); + AtomicReference failure = new AtomicReference<>(null); + Thread.setDefaultUncaughtExceptionHandler((t, e) -> failure.set("unexpected call to default" + + " UncaughtExceptionHandler from " + t.getName() + ": " + e)); + Schedulers.onHandleError((t, e) -> { + handled.set(true); + failure.set("Fatal JVM error was unexpectedly handled in " + t.getName() + ": " + e); + }); + ThreadDeath fatal = new ThreadDeath(); try { - Schedulers.handleError(new ThreadDeath()); - } finally { + Schedulers.handleError(fatal); + fail("expected fatal ThreadDeath exception"); + } + catch (ThreadDeath e) { + Assert.assertSame(e, fatal); + } + finally { Schedulers.resetOnHandleError(); } Assert.assertFalse("threadDeath not silenced", handled.get()); + if (failure.get() != null) { + fail(failure.get()); + } } //private final int BUFFER_SIZE = 8;