diff --git a/build.gradle b/build.gradle index e264a782ea..89b2fc0204 100644 --- a/build.gradle +++ b/build.gradle @@ -334,15 +334,24 @@ project('reactor-core') { from("${project.buildDir}/docs/kdoc") } - task loops(type: Test) { - exclude '**/*' + task loops(type: Test, group: 'verification') { + includes.clear() include '**/*Loop.*' + doFirst { + println "Additional tests from `loops` ($includes)" + } } - task testNG(type: Test) { - exclude '**/*' - include '**/*Verification.*' + task testNG(type: Test, group: 'verification') { useTestNG() + + //FIXME smaldini the testNG test randomly fail on local, fail on CI + exclude '**/*.*' + includes.clear() + include '**/*Verification.*' + doFirst { + println "Additional tests from `testNG` ($includes)" + } } sourceSets.test.resources.srcDirs = ["src/test/resources", "src/test/java"] @@ -369,6 +378,7 @@ project('reactor-core') { archives kdocZip } + test.dependsOn testStaticInit jacocoTestReport.dependsOn testNG check.dependsOn jacocoTestReport jar.finalizedBy(japicmp) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxPeekFuseableTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxPeekFuseableTest.java index 3766edb1d8..63ce89287a 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxPeekFuseableTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxPeekFuseableTest.java @@ -414,7 +414,7 @@ public void afterTerminateCallbackFatalIsThrownDirectly() { @Test public void afterTerminateCallbackErrorAndErrorCallbackError() { - IllegalStateException err = new IllegalStateException("afterTerminate"); + IllegalStateException err = new IllegalStateException("expected afterTerminate"); IllegalArgumentException err2 = new IllegalArgumentException("error"); FluxPeekFuseable flux = new FluxPeekFuseable<>( @@ -793,7 +793,6 @@ public void should_reduce_to_10_events() { }) .blockLast(); - System.out.println(rs); assertEquals(10, count.get()); } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxSubscribeOnValueTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxSubscribeOnValueTest.java index 259b4db30f..b6439fb24f 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxSubscribeOnValueTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxSubscribeOnValueTest.java @@ -17,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; import org.junit.Test; import reactor.core.CoreSubscriber; @@ -45,7 +46,7 @@ public void testSubscribeOnValueFusion() { StepVerifier.create(Flux.range(1, 100) .flatMap(f -> Flux.just(f) .subscribeOn(Schedulers.parallel()) - .log() + .log("testSubscribeOnValueFusion", Level.FINE) .map(this::slow))) .expectFusion(Fuseable.ASYNC, Fuseable.NONE) .expectNextCount(100) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxWindowPredicateTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxWindowPredicateTest.java index 67191ebbb1..cbfbff1a58 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxWindowPredicateTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxWindowPredicateTest.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import java.util.logging.Level; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -646,12 +647,12 @@ public void manualRequestWindowUntilOverRequestingSourceByPrefetch() { Flux source = Flux.range(1, 20) .doOnRequest(req::addAndGet) - .log() + .log("source", Level.FINE) .hide(); StepVerifier.create(source.windowUntil(i -> i % 5 == 0, false, prefetch) .concatMap(w -> w, 1) - .log("downstream"), 0) + .log("downstream", Level.FINE), 0) .thenRequest(2) .expectNext(1, 2) .thenRequest(6) @@ -670,12 +671,12 @@ public void manualRequestWindowWhileOverRequestingSourceByPrefetch() { Flux source = Flux.range(1, 20) .doOnRequest(req::addAndGet) - .log("source") + .log("source", Level.FINE) .hide(); StepVerifier.create(source.windowWhile(i -> i % 5 != 0, prefetch) - .concatMap(w -> w.log(), 1) - .log("downstream"), 0) + .concatMap(w -> w.log("window", Level.FINE), 1) + .log("downstream", Level.FINE), 0) .thenRequest(2) .expectNext(1, 2) .thenRequest(6) @@ -696,13 +697,13 @@ public void windowWhileOneByOneStartingDelimiterReplenishes() { StepVerifier.create( source .doOnRequest(req::addAndGet) - .log("source") + .log("source", Level.FINE) .windowWhile(s -> !"#".equals(s), 2) - .log("windowWhile") + .log("windowWhile", Level.FINE) .concatMap(w -> w.collectList() - .log("window") + .log("window", Level.FINE) , 1) - .log("downstream") + .log("downstream", Level.FINE) , StepVerifierOptions.create().checkUnderRequesting(false).initialRequest(1)) .expectNextMatches(List::isEmpty) .thenRequest(1) @@ -726,13 +727,13 @@ public void windowWhileUnboundedStartingDelimiterReplenishes() { StepVerifier.create( source .doOnRequest(req::addAndGet) - .log("source") + .log("source", Level.FINE) .windowWhile(s -> !"#".equals(s), 2) - .log("windowWhile") + .log("windowWhile", Level.FINE) .concatMap(w -> w.collectList() - .log("window") + .log("window", Level.FINE) , 1) - .log("downstream") + .log("downstream", Level.FINE) ) .expectNextMatches(List::isEmpty) .assertNext(l -> assertThat(l).containsExactly("1A", "1B", "1C")) @@ -752,13 +753,13 @@ public void windowUntilUnboundedStartingDelimiterReplenishes() { StepVerifier.create( source .doOnRequest(req::addAndGet) - .log("source") + .log("source", Level.FINE) .windowUntil(s -> "#".equals(s), false, 2) - .log("windowUntil") + .log("windowUntil", Level.FINE) .concatMap(w -> w.collectList() - .log("window") + .log("window", Level.FINE) , 1) - .log("downstream") + .log("downstream", Level.FINE) ) .assertNext(l -> assertThat(l).containsExactly("#")) .assertNext(l -> assertThat(l).containsExactly("1A", "1B", "1C", "#")) diff --git a/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java b/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java index e47f0f4244..bb71b0a84b 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java @@ -805,20 +805,20 @@ public void onComplete() { })); StepVerifier.create(Flux.just(1, 2, 3) - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(2, 3, 4) .verifyComplete(); StepVerifier.create(Mono.just(1) - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(2) .verifyComplete(); StepVerifier.create(ParallelFlux.from(Mono.just(1), Mono.just(1)) - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(2, 2) .verifyComplete(); } @@ -852,40 +852,40 @@ public void onComplete() { StepVerifier.create(Flux.just(1, 2, 3) .tag("metric", "test") - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(2, 3, 4) .verifyComplete(); StepVerifier.create(Mono.just(1) .tag("metric", "test") - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(2) .verifyComplete(); StepVerifier.create(ParallelFlux.from(Mono.just(1), Mono.just(1)) .tag("metric", "test") - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(2, 2) .verifyComplete(); StepVerifier.create(Flux.just(1, 2, 3) - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(1, 2, 3) .verifyComplete(); StepVerifier.create(Mono.just(1) - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(1) .verifyComplete(); StepVerifier.create(ParallelFlux.from(Mono.just(1), Mono.just(1)) - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(1, 1) .verifyComplete(); } @@ -916,20 +916,20 @@ public void onComplete() { })); StepVerifier.create(Flux.just(1, 2, 3) - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(4, 5, 6) .verifyComplete(); StepVerifier.create(Mono.just(1) - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(4) .verifyComplete(); StepVerifier.create(ParallelFlux.from(Mono.just(1), Mono.just(1)) - .log() - .log()) + .log("log", Level.FINE) + .log("log", Level.FINE)) .expectNext(6, 6) .verifyComplete(); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java index 1549e7def9..4854b8b61f 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoPeekAfterTest.java @@ -26,12 +26,16 @@ import reactor.core.Fuseable; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import reactor.util.Logger; +import reactor.util.Loggers; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; public class MonoPeekAfterTest { + private static final Logger LOG = Loggers.getLogger(MonoPeekAfterTest.class); + @Test public void onSuccessNormal() { LongAdder invoked = new LongAdder(); @@ -726,7 +730,7 @@ public void should_reduce_to_10_events() { .subscribeOn(Schedulers.parallel()) .reduce((l, r) -> l + "_" + r) .doOnSuccess(s -> { - System.out.println("success " + x + ": " + s); + LOG.debug("success " + x + ": " + s); count.incrementAndGet(); })) .blockLast(); diff --git a/reactor-core/src/test/java/reactor/core/publisher/ParallelCollectTest.java b/reactor-core/src/test/java/reactor/core/publisher/ParallelCollectTest.java index 3f16a1d9be..f1d8a48f39 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/ParallelCollectTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/ParallelCollectTest.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.List; import java.util.function.Supplier; +import java.util.logging.Level; import org.junit.Test; import org.reactivestreams.Subscription; @@ -45,7 +46,7 @@ public void collect() { .collect(as, (a, b) -> a.add(b)) .sequential() .flatMapIterable(v -> v) - .log() + .log("ParallelCollectTest#collect", Level.FINE) .subscribe(ts); ts.assertContainValues(new HashSet<>(Arrays.asList(1, diff --git a/reactor-core/src/test/java/reactor/core/publisher/WorkQueueProcessorTest.java b/reactor-core/src/test/java/reactor/core/publisher/WorkQueueProcessorTest.java index 37e96d8b4d..a50eef1451 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/WorkQueueProcessorTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/WorkQueueProcessorTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import java.util.function.Function; +import java.util.logging.Level; import org.assertj.core.api.Assertions; import org.assertj.core.api.Condition; @@ -258,7 +259,7 @@ public void retryErrorPropagatedFromWorkQueueSubscriberCold() throws Exception { wq.onNext(2); wq.onNext(3); wq.onComplete(); - StepVerifier.create(wq.log() + StepVerifier.create(wq.log("wq", Level.FINE) .doOnNext(e -> onNextSignals.incrementAndGet()).handle( (s1, sink) -> { if (errors.decrementAndGet() > 0) { @@ -286,7 +287,7 @@ public void retryErrorPropagatedFromWorkQueueSubscriberHot() throws Exception { StepVerifier.create(wq.doOnNext(e -> onNextSignals.incrementAndGet()).handle( (s1, sink) -> { if (errors.decrementAndGet() > 0) { - sink.error(new RuntimeException()); + sink.error(new RuntimeException("expected")); } else { sink.next(s1); @@ -616,7 +617,7 @@ public void retryErrorPropagatedFromWorkQueueSubscriberHotPublishOn() WorkQueueProcessor wq = WorkQueueProcessor.builder().autoCancel(false).build(); AtomicInteger onNextSignals = new AtomicInteger(); - StepVerifier.create(wq.log() + StepVerifier.create(wq.log("wq", Level.FINE) .publishOn(Schedulers.parallel()) .publish() .autoConnect() @@ -629,7 +630,7 @@ public void retryErrorPropagatedFromWorkQueueSubscriberHotPublishOn() return s1; } }) - .log() + .log("afterMap", Level.FINE) .retry()) .then(() -> { wq.onNext(1); @@ -994,10 +995,6 @@ public void testCustomRequestTaskThreadNameCreate() { customTaskExecutor.shutdownNow(); processor.forceShutdown(); - for (Thread thread : threads) { - System.out.println(thread.getName()); - } - Condition customRequestTaskThread = new Condition<>( thread -> expectedName.equals(thread.getName()), "a thread named \"%s\"", expectedName); @@ -1029,10 +1026,6 @@ public void testCustomRequestTaskThreadNameShare() { customTaskExecutor.shutdownNow(); processor.forceShutdown(); - for (Thread thread : threads) { - System.out.println(thread.getName()); - } - Condition customRequestTaskThread = new Condition<>( thread -> expectedName.equals(thread.getName()), "a thread named \"%s\"", expectedName); diff --git a/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxTests.java b/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxTests.java index 95c8e9dcf3..0a37db2bc4 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxTests.java +++ b/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxTests.java @@ -43,6 +43,7 @@ import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import java.util.function.Function; +import java.util.logging.Level; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -82,6 +83,8 @@ public class FluxTests extends AbstractReactorTest { + static final Logger LOG = Loggers.getLogger(FluxTests.class); + static final String2Integer STRING_2_INTEGER = new String2Integer(); @Test @@ -246,7 +249,7 @@ public void testComposedErrorHandlingWithMultipleValues() throws InterruptedExce @Override public Integer apply(Integer i) { if (i >= 5) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("expected"); } sum += i; return sum; @@ -843,8 +846,8 @@ public void testParallelAsyncStream2() throws InterruptedException { return s; })) .take(Duration.ofSeconds(2)) - .log("parallelStream") - .subscribe(System.out::println); + .log("parallelStream", Level.FINE) + .subscribe(LOG::debug); } latch.await(15, TimeUnit.SECONDS); @@ -882,13 +885,13 @@ public void testBeyondLongMaxMicroBatching() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); Flux worker = Flux.fromIterable(tasks) - .log("before") + .log("before", Level.FINE) .publishOn(asyncGroup); - /*Disposable tail = */worker.log("after") + /*Disposable tail = */worker.log("after", Level.FINE) .parallel(2) .groups() - .subscribe(s -> s.log("w"+s.key()) + .subscribe(s -> s.log("w"+s.key(), Level.FINE) .publishOn(asyncGroup) .map(v -> v) .subscribe(v -> countDownLatch.countDown(), Throwable::printStackTrace)); @@ -1130,7 +1133,7 @@ public void subscribeOnDispatchOn() throws InterruptedException { CountDownLatch latch = new CountDownLatch(100); Flux.range(1, 100) - .log("testOn") + .log("testOn", Level.FINE) .subscribeOn(ioGroup) .publishOn(asyncGroup) .limitRate(1) diff --git a/reactor-core/src/test/java/reactor/core/publisher/tck/FluxWithProcessorVerification.java b/reactor-core/src/test/java/reactor/core/publisher/tck/FluxWithProcessorVerification.java index b4ac92a20c..6af6714a79 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/tck/FluxWithProcessorVerification.java +++ b/reactor-core/src/test/java/reactor/core/publisher/tck/FluxWithProcessorVerification.java @@ -29,7 +29,7 @@ /** * @author Stephane Maldini */ -@org.testng.annotations.Test +@org.testng.annotations.Test //FIXME public class FluxWithProcessorVerification extends AbstractProcessorVerification { final AtomicLong cumulated = new AtomicLong(0);